ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/4] ignite git commit: ignite-5075
Date Fri, 05 May 2017 14:47:42 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6634e98..e942b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -549,7 +549,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 cntrMap.clear();
 
                 // If this is the oldest node.
-                if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+                if (oldest != null && (loc.equals(oldest) || grp.localStartVersion().equals(exchId.topologyVersion()))) {
                     if (node2part == null) {
                         node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -598,6 +598,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         waitForRent();
     }
 
+    private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
+        return grp.affinity().nodes(p, topVer).contains(ctx.localNode());
+    }
+
     /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
         treatAllPartAsLoc = false;
@@ -631,7 +635,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
 
-                if (grp.affinity().partitionLocalNode(p, topVer)) {
+                if (partitionLocalNode(p, topVer)) {
                     // This partition will be created during next topology event,
                     // which obviously has not happened at this point.
                     if (locPart == null) {
@@ -724,11 +728,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         GridDhtLocalPartition loc = locParts.get(p);
 
         if (loc == null || loc.state() == EVICTED) {
-            locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+            locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory));
 
-            if (cctx.shared().pageStore() != null) {
+            if (ctx.pageStore() != null) {
                 try {
-                    cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+                    // TODO IGNITE-5075.
+                    ctx.pageStore().onPartitionCreated(grp.groupId(), p);
                 }
                 catch (IgniteCheckedException e) {
                     // TODO ignite-db
@@ -758,7 +763,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         GridDhtPartitionState state = loc != null ? loc.state() : null;
 
-        if (loc != null && state != EVICTED && (state != RENTING || !cctx.allowFastEviction()))
+        if (loc != null && state != EVICTED && (state != RENTING || !grp.allowFastEviction()))
             return loc;
 
         if (!create)
@@ -773,7 +778,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             state = loc != null ? loc.state() : null;
 
-            boolean belongs = cctx.affinity().partitionLocalNode(p, topVer);
+            boolean belongs = partitionLocalNode(p, topVer);
 
             if (loc != null && state == EVICTED) {
                 locParts.set(p, loc = null);
@@ -783,7 +788,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         "(often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
             }
-            else if (loc != null && state == RENTING && cctx.allowFastEviction())
+            else if (loc != null && state == RENTING && grp.allowFastEviction())
                 throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted.");
 
             if (loc == null) {
@@ -792,7 +797,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
 
-                locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+                locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory));
 
                 if (updateSeq)
                     this.updateSeq.incrementAndGet();
@@ -807,9 +812,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             lock.writeLock().unlock();
         }
 
-        if (created && cctx.shared().pageStore() != null) {
+        if (created && ctx.pageStore() != null) {
             try {
-                cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+                // TODO IGNITE-5075.
+                ctx.pageStore().onPartitionCreated(grp.groupId(), p);
             }
             catch (IgniteCheckedException e) {
                 // TODO ignite-db
@@ -834,8 +840,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
-        return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
+    @Override public GridDhtLocalPartition localPartition(int part) {
+        return locParts.get(part);
     }
 
     /** {@inheritDoc} */
@@ -891,7 +897,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 map.put(i, part.state());
             }
 
-            return new GridDhtPartitionMap(cctx.nodeId(),
+            return new GridDhtPartitionMap(ctx.localNodeId(),
                 updateSeq.get(),
                 topVer,
                 Collections.unmodifiableMap(map),
@@ -931,7 +937,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
-        AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+        AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 
@@ -954,8 +960,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer +
                 ", topVer2=" + this.topVer +
-                ", node=" + cctx.igniteInstanceName() +
-                ", cache=" + cctx.name() +
+                ", node=" + ctx.igniteInstanceName() +
+                ", grp=" + grp.name() +
                 ", node2part=" + node2part + ']';
 
             List<ClusterNode> nodes = null;
@@ -967,7 +973,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     HashSet<UUID> affIds = affAssignment.getIds(p);
 
                     if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) {
-                        ClusterNode n = cctx.discovery().node(nodeId);
+                        ClusterNode n = ctx.discovery().node(nodeId);
 
                         if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
                             if (nodes == null) {
@@ -1001,7 +1007,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         GridDhtPartitionState state,
         GridDhtPartitionState... states) {
         Collection<UUID> allIds = topVer.topologyVersion() > 0 ?
-            F.nodeIds(discoCache.cacheGroupAffinityNodes(cctx.group().groupId())) :
+            F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())) :
             null;
 
         lock.readLock().lock();
@@ -1010,7 +1016,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
                 ", allIds=" + allIds +
                 ", node2part=" + node2part +
-                ", cache=" + cctx.name() + ']';
+                ", grp=" + grp.name() + ']';
 
             Collection<UUID> nodeIds = part2node.get(p);
 
@@ -1027,7 +1033,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     continue;
 
                 if (hasState(p, id, state, states)) {
-                    ClusterNode n = cctx.discovery().node(id);
+                    ClusterNode n = ctx.discovery().node(id);
 
                     if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
                         nodes.add(n);
@@ -1043,7 +1049,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
-        if (!cctx.rebalanceEnabled())
+        if (!grp.rebalanceEnabled())
             return ownersAndMoving(p, topVer);
 
         return nodes(p, topVer, OWNING);
@@ -1056,7 +1062,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> moving(int p) {
-        if (!cctx.rebalanceEnabled())
+        if (!grp.rebalanceEnabled())
             return ownersAndMoving(p, AffinityTopologyVersion.NONE);
 
         return nodes(p, AffinityTopologyVersion.NONE, MOVING);
@@ -1082,11 +1088,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", cache=" + cctx.name() +
-                ", started=" + cctx.started() +
+                ", grp=" + grp.name() +
                 ", stopping=" + stopping +
-                ", locNodeId=" + cctx.localNode().id() +
-                ", locName=" + cctx.igniteInstanceName() + ']';
+                ", locNodeId=" + ctx.localNode().id() +
+                ", locName=" + ctx.igniteInstanceName() + ']';
 
             GridDhtPartitionFullMap m = node2part;
 
@@ -1168,7 +1173,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // then we keep the newer value.
                     if (newPart != null &&
                         (newPart.updateSequence() < part.updateSequence() ||
-                        (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+                        (grp.groupStartVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1182,7 +1187,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
                     UUID nodeId = it.next();
 
-                    if (!cctx.discovery().alive(nodeId)) {
+                    if (!ctx.discovery().alive(nodeId)) {
                         if (log.isDebugEnabled())
                             log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" +
                                 partMap + ']');
@@ -1194,7 +1199,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             node2part = partMap;
 
-            Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f);
+            Map<Integer, Set<UUID>> p2n = new HashMap<>(grp.affinity().partitions(), 1.0f);
 
             for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
                 for (Integer p : e.getValue().keySet()) {
@@ -1213,11 +1218,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             boolean changed = false;
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
-            GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId());
+            GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
 
-            if (nodeMap != null && cctx.shared().database().persistenceEnabled()) {
+            if (nodeMap != null && ctx.database().persistenceEnabled()) {
                 for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) {
                     int p = e.getKey();
                     GridDhtPartitionState state = e.getValue();
@@ -1244,7 +1249,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             }
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+                List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
                 changed |= checkEvictions(updateSeq, aff);
 
@@ -1257,7 +1262,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 log.debug("Partition map after full update: " + fullMapString());
 
             if (changed)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
             return changed ? localPartitionMap() : null;
         }
@@ -1276,7 +1281,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
-        if (!cctx.discovery().alive(parts.nodeId())) {
+        if (!ctx.discovery().alive(parts.nodeId())) {
             if (log.isDebugEnabled())
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
@@ -1371,10 +1376,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+                List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
                 changed |= checkEvictions(updateSeq, aff);
 
@@ -1387,7 +1392,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 log.debug("Partition map after single update: " + fullMapString());
 
             if (changed)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
             return changed ? localPartitionMap() : null;
         }
@@ -1401,7 +1406,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         lock.writeLock().lock();
 
         try {
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
 
             Collection<Integer> lost = null;
 
@@ -1435,7 +1440,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             boolean changed = false;
 
             if (lost != null) {
-                PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+                PartitionLossPolicy plc = grp.config().getPartitionLossPolicy();
 
                 assert plc != null;
 
@@ -1467,13 +1472,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         }
                     }
 
-                    if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
-                        cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
-                            discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+// TODO: IGNITE-5075.
+//                    if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
+//                        cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
+//                            discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
                 }
 
                 if (plc != PartitionLossPolicy.IGNORE)
-                    cctx.needsRecovery(true);
+                    grp.needsRecovery(true);
             }
 
             return changed;
@@ -1488,7 +1494,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         lock.writeLock().lock();
 
         try {
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
             long updSeq = updateSeq.incrementAndGet();
 
             for (int part = 0; part < parts; part++) {
@@ -1527,9 +1533,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            checkEvictions(updSeq, cctx.affinity().assignments(topVer));
+            checkEvictions(updSeq, grp.affinity().assignments(topVer));
 
-            cctx.needsRecovery(false);
+            grp.needsRecovery(false);
         }
         finally {
             lock.writeLock().unlock();
@@ -1543,7 +1549,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             Collection<Integer> res = null;
 
-            int parts = cctx.affinity().partitions();
+            int parts = grp.affinity().partitions();
 
             for (int part = 0; part < parts; part++) {
                 Set<UUID> nodeIds = part2node.get(part);
@@ -1580,7 +1586,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             GridDhtLocalPartition locPart = locParts.get(p);
 
             if (locPart != null) {
-                if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId()))
+                if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId()))
                     locPart.moving();
             }
 
@@ -1605,12 +1611,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return {@code True} if state changed.
      */
     private boolean checkEvictions(long updateSeq) {
-        AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion affVer = grp.affinity().lastVersion();
 
         boolean changed = false;
 
         if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-            List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+            List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
             changed = checkEvictions(updateSeq, aff);
 
@@ -1642,12 +1648,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return Checks if any of the local partitions need to be evicted.
      */
     private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
-        if (!cctx.kernalContext().state().active())
+        if (!ctx.kernalContext().state().active())
             return false;
 
         boolean changed = false;
 
-        UUID locId = cctx.nodeId();
+        UUID locId = ctx.localNodeId();
 
         for (int p = 0; p < locParts.length(); p++) {
             GridDhtLocalPartition part = locParts.get(p);
@@ -1660,7 +1666,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (state.active()) {
                 List<ClusterNode> affNodes = aff.get(p);
 
-                if (!affNodes.contains(cctx.localNode())) {
+                if (!affNodes.contains(ctx.localNode())) {
                     List<ClusterNode> nodes = nodes(p, topVer, OWNING);
                     Collection<UUID> nodeIds = F.nodeIds(nodes);
 
@@ -1723,10 +1729,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
-        assert oldest != null || cctx.kernalContext().clientNode();
+        assert oldest != null || ctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
-        if (cctx.localNode().equals(oldest)) {
+        if (ctx.localNode().equals(oldest)) {
             long seq = node2part.updateSequence();
 
             if (seq != updateSeq) {
@@ -1753,7 +1759,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         }
 
         if (node2part != null) {
-            UUID locNodeId = cctx.localNodeId();
+            UUID locNodeId = ctx.localNodeId();
 
         GridDhtPartitionMap map = node2part.get(locNodeId);
 
@@ -1790,9 +1796,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         ClusterNode oldest = discoCache.oldestAliveServerNode();
 
-        assert oldest != null || cctx.kernalContext().clientNode();
+        assert oldest != null || ctx.kernalContext().clientNode();
 
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
         if (node2part != null) {
             if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) {
@@ -1937,11 +1943,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", cache=" + cctx.name() +
-                ", started=" + cctx.started() +
+                ", grp=" + grp.name() +
                 ", stopping=" + stopping +
-                ", locNodeId=" + cctx.localNode().id() +
-                ", locName=" + cctx.igniteInstanceName() + ']';
+                ", locNodeId=" + ctx.localNodeId() +
+                ", locName=" + ctx.igniteInstanceName() + ']';
 
             for (GridDhtPartitionMap map : node2part.values()) {
                 if (map.hasMovingPartitions())
@@ -1957,8 +1962,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
-        X.println(">>>  Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
-            ", cache=" + cctx.name() + ']');
+        X.println(">>>  Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() +
+            ", grp=" + grp.name() + ']');
 
         lock.readLock().lock();
 
@@ -1986,7 +1991,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return {@code True} if given partition belongs to local node.
      */
     private boolean localNode(int part, List<List<ClusterNode>> aff) {
-        return aff.get(part).contains(cctx.localNode());
+        return aff.get(part).contains(ctx.localNode());
     }
 
     /**
@@ -1997,7 +2002,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (node2part == null || !node2part.valid())
                 return;
 
-            for (int i = 0; i < cctx.affinity().partitions(); i++) {
+            for (int i = 0; i < grp.affinity().partitions(); i++) {
                 List<ClusterNode> affNodes = aff.get(i);
 
                 // Topology doesn't contain server nodes (just clients).
@@ -2013,7 +2018,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             rebalancedTopVer = topVer;
 
             if (log.isDebugEnabled())
-                log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+                log.debug("Updated rebalanced version [cache=" + grp.name() + ", ver=" + rebalancedTopVer + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index c91eb7a..9b61f14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -118,10 +118,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        preldr = new GridDhtPreloader(ctx);
-
-        preldr.start();
-
         ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
             @Override public void apply(UUID nodeId, GridNearGetRequest req) {
                 processNearGetRequest(nodeId, req);
@@ -382,7 +378,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         }
 
         IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
-            ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion());
+            ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion());
 
         if (keyFut == null || keyFut.isDone()) {
             if (keyFut != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 2292cb2..10ed584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -899,7 +899,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     ) {
         assert keys != null;
 
-        IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
+        IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer);
 
         // Prevent embedded future creation if possible.
         if (keyFut == null || keyFut.isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 75cbd00..505df91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -38,13 +38,16 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -78,7 +81,10 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
 public class GridDhtPartitionDemander {
     /** */
-    private final GridCacheContext<?, ?> cctx;
+    private final GridCacheSharedContext<?, ?> ctx;
+
+    /** */
+    private final CacheGroupInfrastructure grp;
 
     /** */
     private final IgniteLogger log;
@@ -116,16 +122,18 @@ public class GridDhtPartitionDemander {
     private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
 
     /**
-     * @param cctx Cctx.
+     * @param grp Ccahe group.
      */
-    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
-        assert cctx != null;
+    public GridDhtPartitionDemander(CacheGroupInfrastructure grp) {
+        assert grp != null;
+
+        this.grp = grp;
 
-        this.cctx = cctx;
+        ctx = grp.shared();
 
-        log = cctx.logger(getClass());
+        log = ctx.logger(getClass());
 
-        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+        boolean enabled = grp.rebalanceEnabled() && !ctx.kernalContext().clientNode();
 
         rebalanceFut = new RebalanceFuture();//Dummy.
 
@@ -137,7 +145,7 @@ public class GridDhtPartitionDemander {
 
         Map<Integer, Object> tops = new HashMap<>();
 
-        for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++)
+        for (int idx = 0; idx < grp.shared().kernalContext().config().getRebalanceThreadPoolSize(); idx++)
             tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
         rebalanceTopics = tops;
@@ -196,7 +204,7 @@ public class GridDhtPartitionDemander {
         GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
 
         if (obj != null)
-            cctx.time().removeTimeoutObject(obj);
+            ctx.time().removeTimeoutObject(obj);
 
         final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
 
@@ -208,7 +216,7 @@ public class GridDhtPartitionDemander {
 
             exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                 @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                    IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut);
+                    IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut);
 
                     fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                         @Override public void apply(IgniteInternalFuture<Boolean> future) {
@@ -237,7 +245,7 @@ public class GridDhtPartitionDemander {
      */
     private boolean topologyChanged(RebalanceFuture fut) {
         return
-            !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+            !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed.
                 fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
     }
 
@@ -249,7 +257,8 @@ public class GridDhtPartitionDemander {
     private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
         assert discoEvt != null;
 
-        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+        // TODO IGNITE-5075.
+        // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
     }
 
     /**
@@ -279,12 +288,12 @@ public class GridDhtPartitionDemander {
 
         assert force == (forcedRebFut != null);
 
-        long delay = cctx.config().getRebalanceDelay();
+        long delay = grp.config().getRebalanceDelay();
 
         if (delay == 0 || force) {
             final RebalanceFuture oldFut = rebalanceFut;
 
-            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt);
+            final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, startedEvtSent, stoppedEvtSent, cnt);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
@@ -330,7 +339,7 @@ public class GridDhtPartitionDemander {
 
                 fut.onDone(true);
 
-                ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+                ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
 
                 fut.sendRebalanceFinishedEvent();
 
@@ -381,7 +390,7 @@ public class GridDhtPartitionDemander {
             GridTimeoutObject obj = lastTimeoutObj.get();
 
             if (obj != null)
-                cctx.time().removeTimeoutObject(obj);
+                ctx.time().removeTimeoutObject(obj);
 
             final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
 
@@ -391,7 +400,7 @@ public class GridDhtPartitionDemander {
                 @Override public void onTimeout() {
                     exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            cctx.shared().exchange().forceRebalance(exchFut);
+                            ctx.exchange().forceRebalance(exchFut);
                         }
                     });
                 }
@@ -399,7 +408,7 @@ public class GridDhtPartitionDemander {
 
             lastTimeoutObj.set(obj);
 
-            cctx.time().addTimeoutObject(obj);
+            ctx.time().addTimeoutObject(obj);
         }
 
         return null;
@@ -433,17 +442,19 @@ public class GridDhtPartitionDemander {
 
                 Collection<Integer> parts= e.getValue().partitions();
 
-                assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]";
+                assert parts != null : "Partitions are null [grp=" + grp.name() + ", fromNode=" + nodeId + "]";
 
                 fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
             }
         }
 
+        final CacheConfiguration cfg = grp.config();
+
+        int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize();
+
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
             final ClusterNode node = e.getKey();
 
-            final CacheConfiguration cfg = cctx.config();
-
             final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
 
             GridDhtPartitionDemandMessage d = e.getValue();
@@ -452,8 +463,6 @@ public class GridDhtPartitionDemander {
                 ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
                 ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
 
-            int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
             List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
             for (int cnt = 0; cnt < lsnrCnt; cnt++)
@@ -473,15 +482,15 @@ public class GridDhtPartitionDemander {
 
                     initD.topic(rebalanceTopics.get(cnt));
                     initD.updateSequence(fut.updateSeq);
-                    initD.timeout(cctx.config().getRebalanceTimeout());
+                    initD.timeout(grp.config().getRebalanceTimeout());
 
                     synchronized (fut) {
                         if (fut.isDone())
                             return;// Future can be already cancelled at this moment and all failovers happened.
 
                         // New requests will not be covered by failovers.
-                        cctx.io().sendOrderedMessage(node,
-                            rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
+                        ctx.io().sendOrderedMessage(node,
+                            rebalanceTopics.get(cnt), initD, grp.ioPolicy(), initD.timeout());
                     }
 
 
@@ -505,11 +514,11 @@ public class GridDhtPartitionDemander {
 
         for (Integer part : parts) {
             try {
-                if (cctx.shared().database().persistenceEnabled()) {
+                if (ctx.database().persistenceEnabled()) {
                     if (partCntrs == null)
                         partCntrs = new HashMap<>(parts.size(), 1.0f);
 
-                    GridDhtLocalPartition p = cctx.topology().localPartition(part, old.topologyVersion(), false);
+                    GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false);
 
                     partCntrs.put(part, p.initialUpdateCounter());
                 }
@@ -585,7 +594,7 @@ public class GridDhtPartitionDemander {
 
         final RebalanceFuture fut = rebalanceFut;
 
-        ClusterNode node = cctx.node(id);
+        ClusterNode node = ctx.node(id);
 
         if (node == null)
             return;
@@ -609,14 +618,16 @@ public class GridDhtPartitionDemander {
             return;
         }
 
-        final GridDhtPartitionTopology top = cctx.dht().topology();
+        final GridDhtPartitionTopology top = grp.topology();
 
         try {
+            AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
                 int p = e.getKey();
 
-                if (cctx.affinity().partitionLocalNode(p, topVer)) {
+                if (aff.get(p).contains(ctx.localNode())) {
                     GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
                     assert part != null;
@@ -627,7 +638,7 @@ public class GridDhtPartitionDemander {
                         boolean reserved = part.reserve();
 
                         assert reserved : "Failed to reserve partition [igniteInstanceName=" +
-                            cctx.igniteInstanceName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+                            ctx.igniteInstanceName() + ", grp=" + grp.name() + ", part=" + part + ']';
 
                         part.lock();
 
@@ -686,7 +697,7 @@ public class GridDhtPartitionDemander {
 
             // Only request partitions based on latest topology version.
             for (Integer miss : supply.missed()) {
-                if (cctx.affinity().partitionLocalNode(miss, topVer))
+                if (aff.get(miss).contains(ctx.localNode()))
                     fut.partitionMissed(id, miss);
             }
 
@@ -696,14 +707,14 @@ public class GridDhtPartitionDemander {
             GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
                 supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
 
-            d.timeout(cctx.config().getRebalanceTimeout());
+            d.timeout(grp.config().getRebalanceTimeout());
 
             d.topic(rebalanceTopics.get(idx));
 
             if (!topologyChanged(fut) && !fut.isDone()) {
                 // Send demand message.
-                cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
-                    d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
+                    d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
             }
         }
         catch (IgniteCheckedException e) {
@@ -732,12 +743,15 @@ public class GridDhtPartitionDemander {
         GridCacheEntryInfo entry,
         AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
-        cctx.shared().database().checkpointReadLock();
+        ctx.database().checkpointReadLock();
 
         try {
             GridCacheEntryEx cached = null;
 
             try {
+                // TODO IGNITE-5075.
+                GridCacheContext cctx = grp.cacheContext();
+
                 cached = cctx.dht().entryEx(entry.key());
 
                 if (log.isDebugEnabled())
@@ -789,10 +803,10 @@ public class GridDhtPartitionDemander {
         }
         catch (IgniteCheckedException e) {
             throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
-                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+                ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
         }
         finally {
-            cctx.shared().database().checkpointReadUnlock();
+            ctx.database().checkpointReadUnlock();
         }
 
         return true;
@@ -810,6 +824,12 @@ public class GridDhtPartitionDemander {
         /** */
         private static final long serialVersionUID = 1L;
 
+        /** */
+        private final GridCacheSharedContext<?, ?> ctx;
+
+        /** */
+        private final CacheGroupInfrastructure grp;
+
         /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
         private final AtomicBoolean startedEvtSent;
 
@@ -817,9 +837,6 @@ public class GridDhtPartitionDemander {
         private final AtomicBoolean stoppedEvtSent;
 
         /** */
-        private final GridCacheContext<?, ?> cctx;
-
-        /** */
         private final IgniteLogger log;
 
         /** Remaining. T2: startTime, partitions */
@@ -840,14 +857,15 @@ public class GridDhtPartitionDemander {
 
         /**
          * @param assigns Assigns.
-         * @param cctx Context.
+         * @param grp Cache group.
          * @param log Logger.
          * @param startedEvtSent Start event sent flag.
          * @param stoppedEvtSent Stop event sent flag.
          * @param updateSeq Update sequence.
          */
-        RebalanceFuture(GridDhtPreloaderAssignments assigns,
-            GridCacheContext<?, ?> cctx,
+        RebalanceFuture(
+            CacheGroupInfrastructure grp,
+            GridDhtPreloaderAssignments assigns,
             IgniteLogger log,
             AtomicBoolean startedEvtSent,
             AtomicBoolean stoppedEvtSent,
@@ -856,20 +874,23 @@ public class GridDhtPartitionDemander {
 
             this.exchFut = assigns.exchangeFuture();
             this.topVer = assigns.topologyVersion();
-            this.cctx = cctx;
+            this.grp = grp;
             this.log = log;
             this.startedEvtSent = startedEvtSent;
             this.stoppedEvtSent = stoppedEvtSent;
             this.updateSeq = updateSeq;
+
+            ctx= grp.shared();
         }
 
         /**
          * Dummy future. Will be done by real one.
          */
-        public RebalanceFuture() {
+        RebalanceFuture() {
             this.exchFut = null;
             this.topVer = null;
-            this.cctx = null;
+            this.ctx = null;
+            this.grp = null;
             this.log = null;
             this.startedEvtSent = null;
             this.stoppedEvtSent = null;
@@ -910,7 +931,7 @@ public class GridDhtPartitionDemander {
 
                 U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
 
-                if (!cctx.kernalContext().isStopping()) {
+                if (!ctx.kernalContext().isStopping()) {
                     for (UUID nodeId : remaining.keySet())
                         cleanupRemoteContexts(nodeId);
                 }
@@ -931,7 +952,7 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return;
 
-                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                U.log(log, ("Cancelled rebalancing [grp=" + grp.name() +
                     ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                     ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
@@ -966,7 +987,7 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          */
         private void cleanupRemoteContexts(UUID nodeId) {
-            ClusterNode node = cctx.discovery().node(nodeId);
+            ClusterNode node = ctx.discovery().node(nodeId);
 
             if (node == null)
                 return;
@@ -974,14 +995,14 @@ public class GridDhtPartitionDemander {
             GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
                 -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
 
-            d.timeout(cctx.config().getRebalanceTimeout());
+            d.timeout(grp.config().getRebalanceTimeout());
 
             try {
-                for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+                for (int idx = 0; idx < ctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
                     d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
-                    cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
-                        d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                    ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                        d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
                 }
             }
             catch (IgniteCheckedException ignored) {
@@ -999,20 +1020,21 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return;
 
-                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                        exchFut.discoveryEvent());
+                // TODO IGNITE-5075.
+//                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+//                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+//                        exchFut.discoveryEvent());
 
                 T2<Long, Collection<Integer>> t = remaining.get(nodeId);
 
-                assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId +
+                assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId +
                     ", part=" + p + "]";
 
                 Collection<Integer> parts = t.get2();
 
                 boolean rmvd = parts.remove(p);
 
-                assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
+                assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId +
                     ", part=" + p + ", left=" + parts + "]";
 
                 if (parts.isEmpty()) {
@@ -1035,7 +1057,8 @@ public class GridDhtPartitionDemander {
         private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
             assert discoEvt != null;
 
-            cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+            // TODO IGNITE-5075.
+            // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
         }
 
         /**
@@ -1063,7 +1086,7 @@ public class GridDhtPartitionDemander {
                 if (log.isDebugEnabled())
                     log.debug("Completed rebalance future: " + this);
 
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();
 
@@ -1077,13 +1100,13 @@ public class GridDhtPartitionDemander {
 
                     onDone(false); //Finished but has missed partitions, will force dummy exchange
 
-                    cctx.shared().exchange().forceDummyExchange(true, exchFut);
+                    ctx.exchange().forceDummyExchange(true, exchFut);
 
                     return;
                 }
 
-                if (!cancelled && !cctx.preloader().syncFuture().isDone())
-                    ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+                if (!cancelled && !grp.preloader().syncFuture().isDone())
+                    ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
 
                 onDone(!cancelled);
             }
@@ -1093,24 +1116,26 @@ public class GridDhtPartitionDemander {
          *
          */
         private void sendRebalanceStartedEvent() {
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
-                (!cctx.isReplicated() || !startedEvtSent.get())) {
-                preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
-
-                startedEvtSent.set(true);
-            }
+            // TODO IGNITE-5075.
+//            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
+//                (!cctx.isReplicated() || !startedEvtSent.get())) {
+//                preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
+//
+//                startedEvtSent.set(true);
+//            }
         }
 
         /**
          *
          */
         private void sendRebalanceFinishedEvent() {
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
-                (!cctx.isReplicated() || !stoppedEvtSent.get())) {
-                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
-                stoppedEvtSent.set(true);
-            }
+            // TODO IGNITE-5075.
+//            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
+//                (!cctx.isReplicated() || !stoppedEvtSent.get())) {
+//                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+//
+//                stoppedEvtSent.set(true);
+//            }
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index f7f0aff..84c3d23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -26,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
@@ -47,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  */
 class GridDhtPartitionSupplier {
     /** */
-    private final GridCacheContext<?, ?> cctx;
+    private final CacheGroupInfrastructure grp;
 
     /** */
     private final IgniteLogger log;
@@ -65,18 +67,18 @@ class GridDhtPartitionSupplier {
     private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
 
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
-        assert cctx != null;
+    GridDhtPartitionSupplier(CacheGroupInfrastructure grp) {
+        assert grp != null;
 
-        this.cctx = cctx;
+        this.grp = grp;
 
-        log = cctx.logger(getClass());
+        log = grp.shared().logger(getClass());
 
-        top = cctx.dht().topology();
+        top = grp.topology();
 
-        depEnabled = cctx.gridDeploy().enabled();
+        depEnabled = grp.shared().gridDeploy().enabled();
     }
 
     /**
@@ -171,7 +173,7 @@ class GridDhtPartitionSupplier {
         assert d != null;
         assert id != null;
 
-        AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion cutTop = grp.affinity().lastVersion();
         AffinityTopologyVersion demTop = d.topologyVersion();
 
         T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
@@ -199,7 +201,7 @@ class GridDhtPartitionSupplier {
         GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(
             d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
 
-        ClusterNode node = cctx.discovery().node(id);
+        ClusterNode node = grp.shared().discovery().node(id);
 
         if (node == null)
             return; // Context will be cleaned at topology change.
@@ -225,7 +227,7 @@ class GridDhtPartitionSupplier {
 
             boolean newReq = true;
 
-            long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount();
+            long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount();
 
             if (sctx != null) {
                 phase = sctx.phase;
@@ -234,7 +236,7 @@ class GridDhtPartitionSupplier {
             }
             else {
                 if (log.isDebugEnabled())
-                    log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+                    log.debug("Starting supplying rebalancing [grp=" + grp.name() +
                         ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
                         ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
                         ", idx=" + idx + "]");
@@ -280,7 +282,7 @@ class GridDhtPartitionSupplier {
                         IgniteRebalanceIterator iter;
 
                         if (sctx == null || sctx.entryIt == null) {
-                            iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
+                            iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
 
                             if (!iter.historical())
                                 s.clean(part);
@@ -289,7 +291,9 @@ class GridDhtPartitionSupplier {
                             iter = (IgniteRebalanceIterator)sctx.entryIt;
 
                         while (iter.hasNext()) {
-                            if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
+                            List<ClusterNode> nodes = grp.affinity().cachedAffinity(d.topologyVersion()).get(part);
+
+                            if (!nodes.contains(node)) {
                                 // Demander no longer needs this partition,
                                 // so we send '-1' partition and move on.
                                 s.missed(part);
@@ -313,7 +317,7 @@ class GridDhtPartitionSupplier {
                                 break;
                             }
 
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                            if (s.messageSize() >= grp.config().getRebalanceBatchSize()) {
                                 if (++bCnt >= maxBatchesCnt) {
                                     saveSupplyContext(scId,
                                         phase,
@@ -400,7 +404,7 @@ class GridDhtPartitionSupplier {
             reply(node, d, s, scId);
 
             if (log.isDebugEnabled())
-                log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+                log.debug("Finished supplying rebalancing [grp=" + grp.name() +
                     ", fromNode=" + node.id() +
                     ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
                     ", idx=" + idx + "]");
@@ -427,16 +431,15 @@ class GridDhtPartitionSupplier {
         GridDhtPartitionSupplyMessage s,
         T3<UUID, Integer, AffinityTopologyVersion> scId)
         throws IgniteCheckedException {
-
         try {
             if (log.isDebugEnabled())
                 log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
 
-            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+            grp.shared().io().sendOrderedMessage(n, d.topic(), s, grp.ioPolicy(), d.timeout());
 
             // Throttle preloading.
-            if (cctx.config().getRebalanceThrottle() > 0)
-                U.sleep(cctx.config().getRebalanceThrottle());
+            if (grp.config().getRebalanceThrottle() > 0)
+                U.sleep(grp.config().getRebalanceThrottle());
 
             return true;
         }
@@ -469,7 +472,7 @@ class GridDhtPartitionSupplier {
         AffinityTopologyVersion topVer,
         long updateSeq) {
         synchronized (scMap) {
-            if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+            if (grp.affinity().lastVersion().equals(topVer)) {
                 assert scMap.get(t) == null;
 
                 scMap.put(t,

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index ee461ab..016ec6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -206,32 +206,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
      * @param ctx Cache context.
      * @throws IgniteCheckedException If failed.
      */
-    void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
-        assert info != null;
-
-        marshalInfo(info, ctx);
-
-        msgSize += info.marshalledSize(ctx);
-
-        CacheEntryInfoCollection infoCol = infos().get(p);
-
-        if (infoCol == null) {
-            msgSize += 4;
-
-            infos().put(p, infoCol = new CacheEntryInfoCollection());
-
-            infoCol.init();
-        }
-
-        infoCol.add(info);
-    }
-
-    /**
-     * @param p Partition.
-     * @param info Entry to add.
-     * @param ctx Cache context.
-     * @throws IgniteCheckedException If failed.
-     */
     void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
         assert info != null;
         assert (info.key() != null || info.keyBytes() != null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 74bbcb0..3f74f75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -87,10 +87,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Parition update counters.
      */
-    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId);
+    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId);
 
     /**
      * @return Last used version among all nodes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 499537d..1e656b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscovery
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -644,7 +645,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
 
         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
-            top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId()));
+            top.updateTopologyVersion(exchId, this, -1, cacheGroupStopping(top.groupId()));
     }
 
     /**
@@ -738,14 +739,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         if (crd != null) {
             if (crd.isLocal()) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    boolean updateTop = !cacheCtx.isLocal() &&
-                        exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+                    boolean updateTop = !grp.isLocal() &&
+                        exchId.topologyVersion().equals(grp.localStartVersion());
 
                     if (updateTop) {
                         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-                            if (top.cacheId() == cacheCtx.cacheId()) {
-                                cacheCtx.topology().update(exchId,
+                            if (top.groupId() == grp.groupId()) {
+                                grp.topology().update(exchId,
                                     top.partitionMap(true),
                                     top.updateCounters(false));
 
@@ -766,8 +767,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
         else {
             if (centralizedAff) { // Last server node failed.
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+                for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+                    GridAffinityAssignmentCache aff = grp.affinity();
 
                     aff.initialize(topologyVersion(), aff.idealAssignment());
                 }
@@ -1009,6 +1010,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             U.dumpThreads(log);
     }
 
+    public boolean cacheGroupStopping(int grpId) {
+        return exchActions != null && exchActions.cacheGroupStopping(grpId);
+    }
+
     /**
      * @param cacheId Cache ID to check.
      * @return {@code True} if cache is stopping by this exchange.
@@ -1456,9 +1461,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
 
         for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : msgs.entrySet()) {
-            assert e.getValue().partitionUpdateCounters(top.cacheId()) != null;
+            assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
 
-            for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) {
+            for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
                 int p = e0.getKey();
 
                 UUID uuid = e.getKey();
@@ -1755,19 +1760,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         cctx.versions().onExchange(msg.lastVersion().order());
 
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
-            Integer cacheId = entry.getKey();
+            Integer grpId = entry.getKey();
 
-            Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(cacheId);
+            Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId);
 
-            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
 
-            if (cacheCtx != null)
-                cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
+            if (grp != null)
+                grp.topology().update(exchId, entry.getValue(), cntrMap);
             else {
                 ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal())
-                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
+                    cctx.exchange().clientTopology(grpId, this).update(exchId, entry.getValue(), cntrMap);
             }
         }
     }
@@ -1781,13 +1786,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         msgs.put(node.id(), msg);
 
         for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
-            Integer cacheId = entry.getKey();
-            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            Integer grpId = entry.getKey();
+            CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
 
-            GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
-                cctx.exchange().clientTopology(cacheId, this);
+            GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                cctx.exchange().clientTopology(grpId, this);
 
-            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(grpId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 73a3481..b327ad1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -122,24 +122,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return {@code True} if message contains full map for given cache.
      */
-    public boolean containsCache(int cacheId) {
-        return parts != null && parts.containsKey(cacheId);
+    public boolean containsGroup(int grpId) {
+        return parts != null && parts.containsKey(grpId);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param fullMap Full partitions map.
      * @param dupDataCache Optional ID of cache with the same partition state map.
      */
-    public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
+    public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
         if (parts == null)
             parts = new HashMap<>();
 
-        if (!parts.containsKey(cacheId)) {
-            parts.put(cacheId, fullMap);
+        if (!parts.containsKey(grpId)) {
+            parts.put(grpId, fullMap);
 
             if (dupDataCache != null) {
                 assert compress;
@@ -148,30 +148,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 if (dupPartsData == null)
                     dupPartsData = new HashMap<>();
 
-                dupPartsData.put(cacheId, dupDataCache);
+                dupPartsData.put(grpId, dupDataCache);
             }
         }
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
         if (partCntrs == null)
             partCntrs = new HashMap<>();
 
-        if (!partCntrs.containsKey(cacheId))
-            partCntrs.put(cacheId, cntrMap);
+        if (!partCntrs.containsKey(grpId))
+            partCntrs.put(grpId, cntrMap);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
+    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
         if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId);
+            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
 
             return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
         }
@@ -427,7 +427,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         return 46;
     }
 
-    //todo
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
         return 11;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index e197864..f2c9158 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -127,7 +128,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             DiscoveryEvent e = (DiscoveryEvent)evt;
 
             try {
-                ClusterNode loc = cctx.localNode();
+                ClusterNode loc = ctx.localNode();
 
                 assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED;
 
@@ -148,12 +149,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     };
 
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
-        super(cctx);
+    public GridDhtPreloader(CacheGroupInfrastructure grp) {
+        super(grp);
 
-        top = cctx.dht().topology();
+        top = grp.topology();
 
         startFut = new GridFutureAdapter<>();
     }
@@ -163,26 +164,26 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("Starting DHT rebalancer...");
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysRequest.class,
+        ctx.io().addHandler(grp.groupId(), GridDhtForceKeysRequest.class,
             new MessageHandler<GridDhtForceKeysRequest>() {
                 @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
                     processForceKeysRequest(node, msg);
                 }
             });
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysResponse.class,
+        ctx.io().addHandler(grp.groupId(), GridDhtForceKeysResponse.class,
             new MessageHandler<GridDhtForceKeysResponse>() {
                 @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
                     processForceKeyResponse(node, msg);
                 }
             });
 
-        supplier = new GridDhtPartitionSupplier(cctx);
+        supplier = new GridDhtPartitionSupplier(grp);
         demander = new GridDhtPartitionDemander(cctx);
 
         demander.start();
 
-        cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+        ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
@@ -203,7 +204,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         stopping = true;
 
-        cctx.events().removeListener(discoLsnr);
+        ctx.gridEvents().removeLocalEventListener(discoLsnr);
 
         // Acquire write busy lock.
         busyLock.writeLock().lock();
@@ -253,25 +254,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         // No assignments for disabled preloader.
-        GridDhtPartitionTopology top = cctx.dht().topology();
+        GridDhtPartitionTopology top = grp.topology();
 
-        if (!cctx.rebalanceEnabled() || !cctx.shared().kernalContext().state().active())
+        if (!grp.rebalanceEnabled() || !grp.shared().kernalContext().state().active())
             return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
-        int partCnt = cctx.affinity().partitions();
+        int partCnt = grp.affinity().partitions();
 
         assert exchFut.forcePreload() || exchFut.dummyReassign() ||
             exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-            ", cache=" + cctx.name() +
+            ", grp=" + grp.name() +
             ", topVer=" + top.topologyVersion() + ']';
 
         GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         AffinityTopologyVersion topVer = assigns.topologyVersion();
 
+        AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
         for (int p = 0; p < partCnt; p++) {
-            if (cctx.shared().exchange().hasPendingExchange()) {
+            if (ctx.exchange().hasPendingExchange()) {
                 if (log.isDebugEnabled())
                     log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
                         exchFut.exchangeId());
@@ -282,7 +285,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             }
 
             // If partition belongs to local node.
-            if (cctx.affinity().partitionLocalNode(p, topVer)) {
+            if (aff.get(p).contains(ctx.localNode())) {
                 GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
                 assert part != null;
@@ -300,13 +303,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 if (picked.isEmpty()) {
                     top.own(part);
 
-                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-                        cctx.events().addPreloadEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                            discoEvt.type(), discoEvt.timestamp());
-                    }
+// TODO IGNITE-5075.
+//                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+//                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+//
+//                        cctx.events().addPreloadEvent(p,
+//                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+//                            discoEvt.type(), discoEvt.timestamp());
+//                    }
 
                     if (log.isDebugEnabled())
                         log.debug("Owning partition as there are no other owners: " + part);
@@ -342,7 +346,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @return Picked owners.
      */
     private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodesByPartition(p, topVer);
+        Collection<ClusterNode> affNodes = grp.affinity().cachedAffinity(topVer).get(p);
 
         int affCnt = affNodes.size();
 
@@ -368,7 +372,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @return Nodes owning this partition.
      */
     private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
-        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
+        return F.view(grp.topology().owners(p, topVer), F.remoteNodes(ctx.localNodeId()));
     }
 
     /** {@inheritDoc} */
@@ -423,12 +427,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
+        return ctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
-        return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
+        return ctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
     }
 
     /**
@@ -459,7 +463,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @param msg Force keys message.
      */
     private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) {
-        IgniteInternalFuture<?> fut = cctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
+        IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
 
         if (fut.isDone())
             processForceKeysRequest0(node, msg);
@@ -480,6 +484,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             return;
 
         try {
+            GridCacheContext cctx = ctx.cacheContext(msg.cacheId());
+
             ClusterNode loc = cctx.localNode();
 
             GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
@@ -591,11 +597,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         try {
             top.onEvicted(part, updateSeq);
 
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
-                cctx.events().addUnloadEvent(part.id());
+// TODO IGNITE-5075.
+//            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+//                cctx.events().addUnloadEvent(part.id());
 
             if (updateSeq)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
         }
         finally {
             leaveBusy();
@@ -604,7 +611,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public boolean needForceKeys() {
-        if (cctx.rebalanceEnabled()) {
+        if (grp.rebalanceEnabled()) {
             IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
 
             if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
@@ -615,12 +622,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+    @Override public IgniteInternalFuture<Object> request(GridCacheContext cctx,
+        GridNearAtomicAbstractUpdateRequest req,
         AffinityTopologyVersion topVer) {
         if (!needForceKeys())
             return null;
 
-        return request0(req.keys(), topVer);
+        return request0(cctx, req.keys(), topVer);
     }
 
     /**
@@ -628,11 +636,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @return Future for request.
      */
     @SuppressWarnings({"unchecked", "RedundantCast"})
-    @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+    @Override public GridDhtFuture<Object> request(GridCacheContext cctx,
+        Collection<KeyCacheObject> keys,
+        AffinityTopologyVersion topVer) {
         if (!needForceKeys())
             return null;
 
-        return request0(keys, topVer);
+        return request0(cctx, keys, topVer);
     }
 
     /**
@@ -641,7 +651,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @return Future for request.
      */
     @SuppressWarnings({"unchecked", "RedundantCast"})
-    private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+    private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
         final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
 
         IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
@@ -652,7 +662,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             if (topReadyFut == null)
                 startFut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> syncFut) {
-                        cctx.kernalContext().closure().runLocalSafe(
+                        ctx.kernalContext().closure().runLocalSafe(
                             new GridPlainRunnable() {
                                 @Override public void run() {
                                     fut.init();
@@ -689,7 +699,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         demandLock.writeLock().lock();
 
         try {
-            cctx.deploy().unwind(cctx);
+            // TODO IGNITE-5075.
+            // cctx.deploy().unwind(cctx);
         }
         finally {
             demandLock.writeLock().unlock();
@@ -728,7 +739,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         partsToEvict.putIfAbsent(part.id(), part);
 
         if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) {
-            cctx.closures().callLocalSafe(new GPC<Boolean>() {
+            ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
                 @Override public Boolean call() {
                     boolean locked = true;
 
@@ -749,7 +760,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                                         partsToEvict.put(part.id(), part);
                                 }
                                 catch (Throwable ex) {
-                                    if (cctx.kernalContext().isStopping()) {
+                                    if (ctx.kernalContext().isStopping()) {
                                         LT.warn(log, ex, "Partition eviction failed (current node is stopping).",
                                             false,
                                             true);
@@ -785,7 +796,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     @Override public void dumpDebugInfo() {
         if (!forceKeyFuts.isEmpty()) {
-            U.warn(log, "Pending force key futures [cache=" + cctx.name() + "]:");
+            U.warn(log, "Pending force key futures [grp=" + grp.name() + "]:");
 
             for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
                 U.warn(log, ">>> " + fut);
@@ -803,7 +814,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         /** {@inheritDoc} */
         @Override public void apply(UUID nodeId, M msg) {
-            ClusterNode node = cctx.node(nodeId);
+            ClusterNode node = ctx.node(nodeId);
 
             if (node == null) {
                 if (log.isDebugEnabled())


Mime
View raw message