ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [07/43] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches
Date Thu, 08 Jun 2017 12:33:00 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2cd5629..0c5cdd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscovery
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.ClusterState;
@@ -193,8 +194,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
 
-    /** Cache validation results. */
-    private volatile Map<Integer, CacheValidation> cacheValidRes;
+    /** Cache groups validation results. */
+    private volatile Map<Integer, CacheValidation> grpValidRes;
 
     /** Skip preload flag. */
     private boolean skipPreload;
@@ -379,8 +380,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param cacheId Cache ID to check.
-     * @param rcvdFrom Topology version.
+     * @param cacheId Cache ID.
+     * @param rcvdFrom Node ID cache was received from.
      * @return {@code True} if cache was added during this exchange.
      */
     public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
@@ -388,14 +389,32 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param grpId Cache group ID.
+     * @param rcvdFrom Node ID cache group was received from.
+     * @return {@code True} if cache group was added during this exchange.
+     */
+    public boolean cacheGroupAddedOnExchange(int grpId, UUID rcvdFrom) {
+        return dynamicCacheGroupStarted(grpId) ||
+            (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
+    }
+
+    /**
      * @param cacheId Cache ID.
      * @return {@code True} if non-client cache was added during this exchange.
      */
-    public boolean dynamicCacheStarted(int cacheId) {
+    private boolean dynamicCacheStarted(int cacheId) {
         return exchActions != null && exchActions.cacheStarted(cacheId);
     }
 
     /**
+     * @param grpId Cache group ID.
+     * @return {@code True} if non-client cache group was added during this exchange.
+     */
+    public boolean dynamicCacheGroupStarted(int grpId) {
+        return exchActions != null && exchActions.cacheGroupStarting(grpId);
+    }
+
+    /**
      * @return {@code True}
      */
     public boolean onAdded() {
@@ -598,11 +617,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         try {
             if (crd != null) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (cacheCtx.isLocal())
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (grp.isLocal())
                         continue;
 
-                    cacheCtx.topology().beforeExchange(this, !centralizedAff);
+                    grp.topology().beforeExchange(this, !centralizedAff);
                 }
             }
         }
@@ -616,28 +635,28 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void updateTopologies(boolean crd) throws IgniteCheckedException {
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(cacheCtx.cacheId());
+            GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId());
 
             long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence();
 
-            GridDhtPartitionTopology top = cacheCtx.topology();
+            GridDhtPartitionTopology top = grp.topology();
 
             if (crd) {
-                boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion());
 
                 if (updateTop && clientTop != null)
                     top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
             }
 
-            top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
+            top.updateTopologyVersion(exchId, this, updSeq, cacheGroupStopping(grp.groupId()));
         }
 
         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
-            top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId()));
+            top.updateTopologyVersion(exchId, this, -1, cacheGroupStopping(top.groupId()));
     }
 
     /**
@@ -731,16 +750,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         if (crd != null) {
             if (crd.isLocal()) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    boolean updateTop = !cacheCtx.isLocal() &&
-                        exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    boolean updateTop = !grp.isLocal() &&
+                        exchId.topologyVersion().equals(grp.localStartVersion());
 
                     if (updateTop) {
                         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-                            if (top.cacheId() == cacheCtx.cacheId()) {
-                                cacheCtx.topology().update(exchId,
-                                    top.partitionMap(true),
-                                    top.updateCounters(false));
+                            if (top.groupId() == grp.groupId()) {
+                                GridDhtPartitionFullMap fullMap = top.partitionMap(true);
+
+                                assert fullMap != null;
+
+                                grp.topology().update(exchId, fullMap, top.updateCounters(false));
 
                                 break;
                             }
@@ -759,8 +780,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
         else {
             if (centralizedAff) { // Last server node failed.
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    GridAffinityAssignmentCache aff = grp.affinity();
 
                     aff.initialize(topologyVersion(), aff.idealAssignment());
                 }
@@ -778,11 +799,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         assert !cctx.kernalContext().clientNode();
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            cacheCtx.preloader().onTopologyChanged(this);
+            grp.preloader().onTopologyChanged(this);
         }
 
         waitPartitionRelease();
@@ -790,25 +811,30 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
+            if (cacheCtx.isLocal() || cacheStopping(cacheCtx.cacheId()))
                 continue;
 
             if (topChanged) {
                 // Partition release future is done so we can flush the write-behind store.
                 cacheCtx.store().forceFlush();
             }
+        }
+
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                continue;
 
-            cacheCtx.topology().beforeExchange(this, !centralizedAff);
+            grp.topology().beforeExchange(this, !centralizedAff);
         }
 
         cctx.database().beforeExchange(this);
 
         // If a backup request, synchronously wait for backup start.
         if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
-            DiscoveryCustomMessage customMessage = ((DiscoveryCustomEvent)discoEvt).customMessage();
+            DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)discoEvt).customMessage();
 
-            if (customMessage instanceof StartFullSnapshotAckDiscoveryMessage) {
-                StartFullSnapshotAckDiscoveryMessage backupMsg = (StartFullSnapshotAckDiscoveryMessage)customMessage;
+            if (customMsg instanceof StartFullSnapshotAckDiscoveryMessage) {
+                StartFullSnapshotAckDiscoveryMessage backupMsg = (StartFullSnapshotAckDiscoveryMessage)customMsg;
 
                 if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
                     ClusterNode node = cctx.discovery().node(backupMsg.initiatorNodeId());
@@ -917,11 +943,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      *
      */
     private void onLeft() {
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            cacheCtx.preloader().unwindUndeploys();
+            grp.preloader().unwindUndeploys();
         }
 
         cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
@@ -933,17 +959,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private void warnNoAffinityNodes() {
         List<String> cachesWithoutNodes = null;
 
-        for (String name : cctx.cache().cacheNames()) {
-            if (discoCache.cacheAffinityNodes(name).isEmpty()) {
+        for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors().values()) {
+            if (discoCache.cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) {
                 if (cachesWithoutNodes == null)
                     cachesWithoutNodes = new ArrayList<>();
 
-                cachesWithoutNodes.add(name);
+                cachesWithoutNodes.add(cacheDesc.cacheName());
 
                 // Fire event even if there is no client cache started.
                 if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
                     Event evt = new CacheEvent(
-                        name,
+                        cacheDesc.cacheName(),
                         cctx.localNode(),
                         cctx.localNode(),
                         "All server nodes have left the cluster.",
@@ -1005,10 +1031,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param grpId Cache group ID to check.
+     * @return {@code True} if cache group us stopping by this exchange.
+     */
+    private boolean cacheGroupStopping(int grpId) {
+        return exchActions != null && exchActions.cacheGroupStopping(grpId);
+    }
+
+    /**
      * @param cacheId Cache ID to check.
      * @return {@code True} if cache is stopping by this exchange.
      */
-    public boolean stopping(int cacheId) {
+    private boolean cacheStopping(int cacheId) {
         return exchActions != null && exchActions.cacheStopped(cacheId);
     }
 
@@ -1124,18 +1158,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
        }
 
         if (err == null && realExchange) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
                 try {
                     if (centralizedAff)
-                        cacheCtx.topology().initPartitions(this);
+                        grp.topology().initPartitions(this);
                 }
                 catch (IgniteInterruptedCheckedException e) {
                     U.error(log, "Failed to initialize partitions.", e);
                 }
+            }
 
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 GridCacheContext drCacheCtx = cacheCtx.isNear() ? cacheCtx.near().dht().context() : cacheCtx;
 
                 if (drCacheCtx.isDrEnabled()) {
@@ -1153,21 +1189,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 discoEvt.type() == EVT_NODE_JOINED)
                 detectLostPartitions();
 
-            Map<Integer, CacheValidation> m = new HashMap<>(cctx.cacheContexts().size());
+            Map<Integer, CacheValidation> m = new HashMap<>(cctx.cache().cacheGroups().size());
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                Collection<Integer> lostParts = cacheCtx.isLocal() ?
-                    Collections.<Integer>emptyList() : cacheCtx.topology().lostPartitions();
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                Collection<Integer> lostParts = grp.isLocal() ?
+                    Collections.<Integer>emptyList() : grp.topology().lostPartitions();
 
                 boolean valid = true;
 
-                if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name()))
-                    valid = cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes());
+                if (grp.topologyValidator() != null && !grp.systemCache())
+                    valid = grp.topologyValidator().validate(discoEvt.topologyNodes());
 
-                m.put(cacheCtx.cacheId(), new CacheValidation(valid, lostParts));
+                m.put(grp.groupId(), new CacheValidation(valid, lostParts));
             }
 
-            cacheValidRes = m;
+            grpValidRes = m;
         }
 
         cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err, false);
@@ -1188,8 +1224,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             initFut.onDone(err == null);
 
             if (exchId.isLeft()) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                    cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
+                for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                    grp.affinityFunction().removeNode(exchId.nodeId());
             }
 
             exchActions = null;
@@ -1224,16 +1260,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             return new CacheInvalidStateException(
                 "Failed to perform cache operation (cluster is not activated): " + cctx.name());
 
-        PartitionLossPolicy partLossPlc = cctx.config().getPartitionLossPolicy();
+        CacheGroupContext grp = cctx.group();
+
+        PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy();
 
-        if (cctx.needsRecovery() && !recovery) {
+        if (grp.needsRecovery() && !recovery) {
             if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL))
                 return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " +
                     cctx.name());
         }
 
-        if (cctx.needsRecovery() || cctx.config().getTopologyValidator() != null) {
-            CacheValidation validation = cacheValidRes.get(cctx.cacheId());
+        if (grp.needsRecovery() || grp.topologyValidator() != null) {
+            CacheValidation validation = grpValidRes.get(grp.groupId());
 
             if (validation == null)
                 return null;
@@ -1242,7 +1280,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 return new IgniteCheckedException("Failed to perform cache operation " +
                     "(cache topology is not valid): " + cctx.name());
 
-            if (recovery || !cctx.needsRecovery())
+            if (recovery || !grp.needsRecovery())
                 return null;
 
             if (key != null) {
@@ -1472,9 +1510,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
 
         for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : msgs.entrySet()) {
-            assert e.getValue().partitionUpdateCounters(top.cacheId()) != null;
+            assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
 
-            for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) {
+            for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
                 int p = e0.getKey();
 
                 UUID uuid = e.getKey();
@@ -1536,24 +1574,32 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (Thread.currentThread().isInterrupted())
                 return;
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (!cacheCtx.isLocal())
-                    cacheCtx.topology().detectLostPartitions(discoEvt);
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (!grp.isLocal())
+                    grp.topology().detectLostPartitions(discoEvt);
             }
         }
     }
 
     /**
-     *
+     * @param cacheNames Cache names.
      */
     private void resetLostPartitions(Collection<String> cacheNames) {
         synchronized (cctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 return;
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (!cacheCtx.isLocal() && cacheNames.contains(cacheCtx.name()))
-                    cacheCtx.topology().resetLostPartitions();
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
+                    continue;
+
+                for (String cacheName : cacheNames) {
+                    if (grp.hasCache(cacheName)) {
+                        grp.topology().resetLostPartitions();
+
+                        break;
+                    }
+                }
             }
         }
     }
@@ -1566,9 +1612,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             assert crd.isLocal();
 
             if (!crd.equals(discoCache.serverNodes().get(0))) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (!cacheCtx.isLocal())
-                        cacheCtx.topology().beforeExchange(this, !centralizedAff);
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (!grp.isLocal())
+                        grp.topology().beforeExchange(this, !centralizedAff);
                 }
             }
 
@@ -1577,13 +1623,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;
 
                     for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) {
-                        Integer cacheId = entry.getKey();
-                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                        Integer grpId = entry.getKey();
+                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                        GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
-                            cctx.exchange().clientTopology(cacheId, this);
+                        GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                            cctx.exchange().clientTopology(grpId, this);
 
-                        Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(cacheId);
+                        Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId);
 
                         if (cntrs != null)
                             top.applyUpdateCounters(cntrs);
@@ -1661,11 +1707,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      */
     private void assignPartitionsStates() {
         if (cctx.database().persistenceEnabled()) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
-                assignPartitionStates(cacheCtx.topology());
+                assignPartitionStates(grp.topology());
             }
         }
     }
@@ -1790,19 +1836,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         cctx.versions().onExchange(msg.lastVersion().order());
 
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
-            Integer cacheId = entry.getKey();
+            Integer grpId = entry.getKey();
 
-            Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(cacheId);
+            Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId);
 
-            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-            if (cacheCtx != null)
-                cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
+            if (grp != null)
+                grp.topology().update(exchId, entry.getValue(), cntrMap);
             else {
                 ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal())
-                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
+                    cctx.exchange().clientTopology(grpId, this).update(exchId, entry.getValue(), cntrMap);
             }
         }
     }
@@ -1816,11 +1862,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         msgs.put(node.id(), msg);
 
         for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
-            Integer cacheId = entry.getKey();
-            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            Integer grpId = entry.getKey();
+            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-            GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
-                cctx.exchange().clientTopology(cacheId, this);
+            GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                cctx.exchange().clientTopology(grpId, this);
 
             top.update(exchId, entry.getValue());
         }
@@ -1972,13 +2018,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                             List<ClusterNode> empty = Collections.emptyList();
 
-                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                List<List<ClusterNode>> affAssignment = new ArrayList<>(cacheCtx.affinity().partitions());
+                            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                                List<List<ClusterNode>> affAssignment = new ArrayList<>(grp.affinity().partitions());
 
-                                for (int i = 0; i < cacheCtx.affinity().partitions(); i++)
+                                for (int i = 0; i < grp.affinity().partitions(); i++)
                                     affAssignment.add(empty);
 
-                                cacheCtx.affinity().affinityCache().initialize(topologyVersion(), affAssignment);
+                                grp.affinity().initialize(topologyVersion(), affAssignment);
                             }
 
                             onDone(topologyVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 73a3481..f9bc5df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -104,6 +104,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         this.topVer = topVer;
     }
 
+    /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
     /**
      * @param compress {@code True} if it is possible to use compression for message.
      */
@@ -122,24 +127,26 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return {@code True} if message contains full map for given cache.
      */
-    public boolean containsCache(int cacheId) {
-        return parts != null && parts.containsKey(cacheId);
+    public boolean containsGroup(int grpId) {
+        return parts != null && parts.containsKey(grpId);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param fullMap Full partitions map.
      * @param dupDataCache Optional ID of cache with the same partition state map.
      */
-    public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
+    public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
+        assert fullMap != null;
+
         if (parts == null)
             parts = new HashMap<>();
 
-        if (!parts.containsKey(cacheId)) {
-            parts.put(cacheId, fullMap);
+        if (!parts.containsKey(grpId)) {
+            parts.put(grpId, fullMap);
 
             if (dupDataCache != null) {
                 assert compress;
@@ -148,30 +155,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 if (dupPartsData == null)
                     dupPartsData = new HashMap<>();
 
-                dupPartsData.put(cacheId, dupDataCache);
+                dupPartsData.put(grpId, dupDataCache);
             }
         }
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
         if (partCntrs == null)
             partCntrs = new HashMap<>();
 
-        if (!partCntrs.containsKey(cacheId))
-            partCntrs.put(cacheId, cntrMap);
+        if (!partCntrs.containsKey(grpId))
+            partCntrs.put(grpId, cntrMap);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
+    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
         if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId);
+            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
 
             return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
         }
@@ -331,31 +338,31 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
-            case 6:
+            case 5:
                 if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeByteArray("exsBytes", exsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 9:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -377,7 +384,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
-            case 6:
+            case 5:
                 dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
@@ -385,7 +392,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 exsBytes = reader.readByteArray("exsBytes");
 
                 if (!reader.isLastRead())
@@ -393,7 +400,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -401,7 +408,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -409,7 +416,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 10:
+            case 9:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -427,10 +434,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         return 46;
     }
 
-    //todo
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 9003f3a..416b127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -103,6 +103,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         this.compress = compress;
     }
 
+    /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
     /**
      * @return {@code True} if sent from client node.
      */
@@ -134,23 +139,23 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void partitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
         if (partCntrs == null)
             partCntrs = new HashMap<>();
 
-        partCntrs.put(cacheId, cntrMap);
+        partCntrs.put(grpId, cntrMap);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
+    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
         if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId);
+            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
 
             return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
         }
@@ -291,31 +296,31 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         }
 
         switch (writer.state()) {
-            case 6:
+            case 5:
                 if (!writer.writeBoolean("client", client))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeByteArray("exBytes", exBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 9:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -337,7 +342,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             return false;
 
         switch (reader.state()) {
-            case 6:
+            case 5:
                 client = reader.readBoolean("client");
 
                 if (!reader.isLastRead())
@@ -345,7 +350,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
@@ -353,7 +358,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 exBytes = reader.readByteArray("exBytes");
 
                 if (!reader.isLastRead())
@@ -361,7 +366,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -369,7 +374,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 10:
+            case 9:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -387,10 +392,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         return 47;
     }
 
-    //todo add ex
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index b4ce939..4b80ee0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -48,6 +48,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
     }
 
     /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
     @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
         return Collections.emptyMap();
     }
@@ -89,7 +94,7 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 5;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 57616ed..a931ef4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -23,30 +23,20 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -60,23 +50,14 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
-import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
  * DHT cache preloader.
@@ -88,9 +69,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** */
     private GridDhtPartitionTopology top;
 
-    /** Force key futures. */
-    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
-
     /** Partition suppliers. */
     private GridDhtPartitionSupplier supplier;
 
@@ -113,47 +91,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private final AtomicInteger partsEvictOwning = new AtomicInteger();
 
     /** */
-    private volatile boolean stopping;
-
-    /** */
     private boolean stopped;
 
-    /** Discovery listener. */
-    private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
-        @Override public void onEvent(Event evt) {
-            if (!enterBusy())
-                return;
-
-            DiscoveryEvent e = (DiscoveryEvent)evt;
-
-            try {
-                ClusterNode loc = cctx.localNode();
-
-                assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED;
-
-                final ClusterNode n = e.eventNode();
-
-                assert !loc.id().equals(n.id());
-
-                for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
-                    f.onDiscoveryEvent(e);
-
-                assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " +
-                    "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
-            }
-            finally {
-                leaveBusy();
-            }
-        }
-    };
-
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
-        super(cctx);
+    public GridDhtPreloader(CacheGroupContext grp) {
+        super(grp);
 
-        top = cctx.dht().topology();
+        top = grp.topology();
 
         startFut = new GridFutureAdapter<>();
     }
@@ -163,37 +109,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("Starting DHT rebalancer...");
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysRequest.class,
-            new MessageHandler<GridDhtForceKeysRequest>() {
-                @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
-                    processForceKeysRequest(node, msg);
-                }
-            });
-
-        cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysResponse.class,
-            new MessageHandler<GridDhtForceKeysResponse>() {
-                @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
-                    processForceKeyResponse(node, msg);
-                }
-            });
-
-        if (!cctx.kernalContext().clientNode()) {
-            cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class,
-                new MessageHandler<GridDhtAffinityAssignmentRequest>() {
-                    @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) {
-                        processAffinityAssignmentRequest(node, msg);
-                    }
-                });
-        }
-
-        cctx.shared().affinity().onCacheCreated(cctx);
-
-        supplier = new GridDhtPartitionSupplier(cctx);
-        demander = new GridDhtPartitionDemander(cctx);
+        supplier = new GridDhtPartitionSupplier(grp);
+        demander = new GridDhtPartitionDemander(grp);
 
         demander.start();
-
-        cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
@@ -212,10 +131,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("DHT rebalancer onKernalStop callback.");
 
-        stopping = true;
-
-        cctx.events().removeListener(discoLsnr);
-
         // Acquire write busy lock.
         busyLock.writeLock().lock();
 
@@ -226,11 +141,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             if (demander != null)
                 demander.stop();
 
-            IgniteCheckedException err = stopError();
-
-            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
-                fut.onDone(err);
-
             top = null;
 
             stopped = true;
@@ -239,12 +149,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             busyLock.writeLock().unlock();
         }
     }
-    /**
-     * @return Node stop exception.
-     */
-    private IgniteCheckedException stopError() {
-        return new NodeStoppingException("Operation has been cancelled (cache or node is stopping).");
-    }
 
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
@@ -264,25 +168,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         // No assignments for disabled preloader.
-        GridDhtPartitionTopology top = cctx.dht().topology();
+        GridDhtPartitionTopology top = grp.topology();
 
-        if (!cctx.rebalanceEnabled() || !cctx.shared().kernalContext().state().active())
+        if (!grp.rebalanceEnabled() || !grp.shared().kernalContext().state().active())
             return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
-        int partCnt = cctx.affinity().partitions();
+        int partCnt = grp.affinity().partitions();
 
         assert exchFut.forcePreload() || exchFut.dummyReassign() ||
             exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-            ", cache=" + cctx.name() +
+            ", grp=" + grp.name() +
             ", topVer=" + top.topologyVersion() + ']';
 
         GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         AffinityTopologyVersion topVer = assigns.topologyVersion();
 
+        AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
         for (int p = 0; p < partCnt; p++) {
-            if (cctx.shared().exchange().hasPendingExchange()) {
+            if (ctx.exchange().hasPendingExchange()) {
                 if (log.isDebugEnabled())
                     log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
                         exchFut.exchangeId());
@@ -293,7 +199,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             }
 
             // If partition belongs to local node.
-            if (cctx.affinity().partitionLocalNode(p, topVer)) {
+            if (aff.get(p).contains(ctx.localNode())) {
                 GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
                 assert part != null;
@@ -311,12 +217,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 if (picked.isEmpty()) {
                     top.own(part);
 
-                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                    if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
                         DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
-                        cctx.events().addPreloadEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                            discoEvt.type(), discoEvt.timestamp());
+                        grp.addRebalanceEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                            discoEvt.eventNode(),
+                            discoEvt.type(),
+                            discoEvt.timestamp());
                     }
 
                     if (log.isDebugEnabled())
@@ -331,7 +239,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                         assigns.put(n, msg = new GridDhtPartitionDemandMessage(
                             top.updateSequence(),
                             exchFut.exchangeId().topologyVersion(),
-                            cctx.cacheId()));
+                            grp.groupId()));
                     }
 
                     msg.addPartition(p);
@@ -353,7 +261,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @return Picked owners.
      */
     private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodesByPartition(p, topVer);
+        Collection<ClusterNode> affNodes = grp.affinity().cachedAffinity(topVer).get(p);
 
         int affCnt = affNodes.size();
 
@@ -379,7 +287,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @return Nodes owning this partition.
      */
     private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
-        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
+        return F.view(grp.topology().owners(p, topVer), F.remoteNodes(ctx.localNodeId()));
     }
 
     /** {@inheritDoc} */
@@ -418,7 +326,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
         boolean forceRebalance,
-        Collection<String> caches,
         int cnt,
         Runnable next,
         @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
@@ -434,12 +341,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
+        return ctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
-        return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
+        return ctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
     }
 
     /**
@@ -466,171 +373,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /**
-     * @param node Node originated request.
-     * @param msg Force keys message.
-     */
-    private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) {
-        IgniteInternalFuture<?> fut = cctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
-
-        if (fut.isDone())
-            processForceKeysRequest0(node, msg);
-        else
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> t) {
-                    processForceKeysRequest0(node, msg);
-                }
-            });
-    }
-
-    /**
-     * @param node Node originated request.
-     * @param msg Force keys message.
-     */
-    private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) {
-        if (!enterBusy())
-            return;
-
-        try {
-            ClusterNode loc = cctx.localNode();
-
-            GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
-                cctx.cacheId(),
-                msg.futureId(),
-                msg.miniId(),
-                cctx.deploymentEnabled());
-
-            for (KeyCacheObject k : msg.keys()) {
-                int p = cctx.affinity().partition(k);
-
-                GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
-
-                // If this node is no longer an owner.
-                if (locPart == null && !top.owners(p).contains(loc)) {
-                    res.addMissed(k);
-
-                    continue;
-                }
-
-                GridCacheEntryEx entry = null;
-
-                while (true) {
-                    try {
-                        entry = cctx.dht().entryEx(k);
-
-                        entry.unswap();
-
-                        GridCacheEntryInfo info = entry.info();
-
-                        if (info == null) {
-                            assert entry.obsolete() : entry;
-
-                            continue;
-                        }
-
-                        if (!info.isNew())
-                            res.addInfo(info);
-
-                        cctx.evicts().touch(entry, msg.topologyVersion());
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry: " + k);
-                    }
-                    catch (GridDhtInvalidPartitionException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Local node is no longer an owner: " + p);
-
-                        res.addMissed(k);
-
-                        break;
-                    }
-                }
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']');
-
-            cctx.io().send(node, res, cctx.ioPolicy());
-        }
-        catch (ClusterTopologyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() +
-                    ", req=" + msg + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param node Node.
-     * @param msg Message.
-     */
-    private void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) {
-        if (!enterBusy())
-            return;
-
-        try {
-            GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
-
-            if (f != null)
-                f.onResult(msg);
-            else if (log.isDebugEnabled())
-                log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() +
-                    ", res=" + msg + ']');
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param node Node.
-     * @param req Request.
-     */
-    private void processAffinityAssignmentRequest(final ClusterNode node,
-        final GridDhtAffinityAssignmentRequest req) {
-        final AffinityTopologyVersion topVer = req.topologyVersion();
-
-        if (log.isDebugEnabled())
-            log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']');
-
-        cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                if (log.isDebugEnabled())
-                    log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
-                        ", node=" + node + ']');
-
-                AffinityAssignment assignment = cctx.affinity().assignment(topVer);
-
-                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
-                    req.futureId(),
-                    cctx.cacheId(),
-                    topVer,
-                    assignment.assignment());
-
-                if (cctx.affinity().affinityCache().centralizedAffinityFunction()) {
-                    assert assignment.idealAssignment() != null;
-
-                    res.idealAffinityAssignment(assignment.idealAssignment());
-                }
-
-                try {
-                    cctx.io().send(node, res, AFFINITY_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send affinity assignment response to remote node [node=" + node + ']', e);
-                }
-            }
-        });
-    }
-
-    /**
      * Resends partitions on partition evict within configured timeout.
      *
      * @param part Evicted partition.
@@ -643,11 +385,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         try {
             top.onEvicted(part, updateSeq);
 
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
-                cctx.events().addUnloadEvent(part.id());
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+                grp.addUnloadEvent(part.id());
 
             if (updateSeq)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
         }
         finally {
             leaveBusy();
@@ -656,7 +398,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public boolean needForceKeys() {
-        if (cctx.rebalanceEnabled()) {
+        if (grp.rebalanceEnabled()) {
             IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
 
             if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
@@ -667,12 +409,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+    @Override public GridDhtFuture<Object> request(GridCacheContext cctx,
+        GridNearAtomicAbstractUpdateRequest req,
         AffinityTopologyVersion topVer) {
         if (!needForceKeys())
             return null;
 
-        return request0(req.keys(), topVer);
+        return request0(cctx, req.keys(), topVer);
     }
 
     /**
@@ -680,21 +423,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @return Future for request.
      */
     @SuppressWarnings({"unchecked", "RedundantCast"})
-    @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+    @Override public GridDhtFuture<Object> request(GridCacheContext cctx,
+        Collection<KeyCacheObject> keys,
+        AffinityTopologyVersion topVer) {
         if (!needForceKeys())
             return null;
 
-        return request0(keys, topVer);
+        return request0(cctx, keys, topVer);
     }
 
     /**
+     * @param cctx Cache context.
      * @param keys Keys to request.
      * @param topVer Topology version.
      * @return Future for request.
      */
     @SuppressWarnings({"unchecked", "RedundantCast"})
-    private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
-        final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
+    private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+        if (cctx.isNear())
+            cctx = cctx.near().dht().context();
+
+        final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys);
 
         IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
 
@@ -704,7 +453,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             if (topReadyFut == null)
                 startFut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> syncFut) {
-                        cctx.kernalContext().closure().runLocalSafe(
+                        ctx.kernalContext().closure().runLocalSafe(
                             new GridPlainRunnable() {
                                 @Override public void run() {
                                     fut.init();
@@ -741,46 +490,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         demandLock.writeLock().lock();
 
         try {
-            cctx.deploy().unwind(cctx);
+            grp.unwindUndeploys();
         }
         finally {
             demandLock.writeLock().unlock();
         }
     }
 
-    /**
-     * Adds future to future map.
-     *
-     * @param fut Future to add.
-     * @return {@code False} if node cache is stopping and future was completed with error.
-     */
-    boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
-        forceKeyFuts.put(fut.futureId(), fut);
-
-        if (stopping) {
-            fut.onDone(stopError());
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * Removes future from future map.
-     *
-     * @param fut Future to remove.
-     */
-    void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) {
-        forceKeyFuts.remove(fut.futureId(), fut);
-    }
-
     /** {@inheritDoc} */
     @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
         partsToEvict.putIfAbsent(part.id(), part);
 
         if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) {
-            cctx.closures().callLocalSafe(new GPC<Boolean>() {
+            ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
                 @Override public Boolean call() {
                     boolean locked = true;
 
@@ -801,7 +523,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                                         partsToEvict.put(part.id(), part);
                                 }
                                 catch (Throwable ex) {
-                                    if (cctx.kernalContext().isStopping()) {
+                                    if (ctx.kernalContext().isStopping()) {
                                         LT.warn(log, ex, "Partition eviction failed (current node is stopping).",
                                             false,
                                             true);
@@ -836,44 +558,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void dumpDebugInfo() {
-        if (!forceKeyFuts.isEmpty()) {
-            U.warn(log, "Pending force key futures [cache=" + cctx.name() + "]:");
-
-            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
-                U.warn(log, ">>> " + fut);
-        }
-
         supplier.dumpDebugInfo();
     }
-
-    /**
-     *
-     */
-    private abstract class MessageHandler<M> implements IgniteBiInClosure<UUID, M> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void apply(UUID nodeId, M msg) {
-            ClusterNode node = cctx.node(nodeId);
-
-            if (node == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']');
-
-                return;
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']');
-
-            onMessage(node, msg);
-        }
-
-        /**
-         * @param node Node.
-         * @param msg Message.
-         */
-        protected abstract void onMessage(ClusterNode node, M msg);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2d5c8a5..e6c5c10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -103,7 +103,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processGetResponse(nodeId, res);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index f8240f3..5b53935 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -86,11 +86,23 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
      * @param ctx Context.
      */
     protected GridNearCacheAdapter(GridCacheContext<K, V> ctx) {
-        super(ctx, ctx.config().getNearConfiguration().getNearStartSize());
+        super(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
+    @Override public void start() throws IgniteCheckedException {
+        if (map == null) {
+            map = new GridCacheLocalConcurrentMap(
+                ctx,
+                entryFactory(),
+                ctx.config().getNearConfiguration().getNearStartSize());
+        }
+    }
+
+    /**
+     * @return Entry factory.
+     */
+    private GridCacheMapEntryFactory entryFactory() {
         return new GridCacheMapEntryFactory() {
             @Override public GridCacheMapEntry create(
                 GridCacheContext ctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 757bfbd..6092511 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -50,7 +50,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  * Get request.
  */
-public class GridNearGetRequest extends GridCacheMessage implements GridCacheDeployable,
+public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheDeployable,
     GridCacheVersionable {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 50af754..b4e4424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -28,7 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
@@ -45,7 +45,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  * Get response.
  */
-public class GridNearGetResponse extends GridCacheMessage implements GridCacheDeployable,
+public class GridNearGetResponse extends GridCacheIdMessage implements GridCacheDeployable,
     GridCacheVersionable {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index f4ce1ac..edddf7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
@@ -159,12 +160,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
             if (e instanceof IgniteTxRollbackCheckedException) {
                 if (marked) {
-                    try {
-                        tx.rollback();
-                    }
-                    catch (IgniteCheckedException ex) {
-                        U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
-                    }
+                    tx.rollbackAsync().listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                        @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                            try {
+                                fut.get();
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(log, "Failed to automatically rollback transaction: " + tx, e);
+                            }
+
+                            onComplete();
+                        }
+                    });
+
+                    return;
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 0faa8ec..00ff4bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -23,7 +23,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -35,7 +35,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  *
  */
-public class GridNearSingleGetRequest extends GridCacheMessage implements GridCacheDeployable {
+public class GridNearSingleGetRequest extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index 554fca1..2cb75c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -37,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public class GridNearSingleGetResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridNearSingleGetResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -115,7 +115,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
      * @return Topology version.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
+        return topVer != null ? topVer : super.topologyVersion();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index cc90be0..a691cbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -87,13 +87,13 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() {
             @Override public void apply(UUID nodeId, GridNearLockResponse res) {
                 processLockResponse(nodeId, res);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index e3dcbf8..a1a2b57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -133,19 +133,19 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
         }
 
         switch (writer.state()) {
-            case 7:
+            case 6:
                 if (!writer.writeByteArray("errBytes", errBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeLong("nearThreadId", nearThreadId))
                     return false;
 
@@ -167,7 +167,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
             return false;
 
         switch (reader.state()) {
-            case 7:
+            case 6:
                 errBytes = reader.readByteArray("errBytes");
 
                 if (!reader.isLastRead())
@@ -175,7 +175,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
@@ -183,7 +183,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 nearThreadId = reader.readLong("nearThreadId");
 
                 if (!reader.isLastRead())
@@ -203,7 +203,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 8b538ee..69d1260 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheLocalConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
@@ -65,9 +66,15 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
      * @param ctx Cache registry.
      */
     public GridLocalCache(GridCacheContext<K, V> ctx) {
-        super(ctx, DFLT_START_CACHE_SIZE);
+        super(ctx);
 
-        preldr = new GridCachePreloaderAdapter(ctx);
+        preldr = new GridCachePreloaderAdapter(ctx.group());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (map == null)
+            map = new GridCacheLocalConcurrentMap(ctx, entryFactory(), DFLT_START_CACHE_SIZE);
     }
 
     /** {@inheritDoc} */
@@ -80,8 +87,10 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
         return preldr;
     }
 
-    /** {@inheritDoc} */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
+    /**
+     * @return Entry factory.
+     */
+    private GridCacheMapEntryFactory entryFactory() {
         return new GridCacheMapEntryFactory() {
             @Override public GridCacheMapEntry create(
                 GridCacheContext ctx,
@@ -97,7 +106,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
      * @param key Key of entry.
      * @return Cache entry.
      */
-    @Nullable GridLocalCacheEntry peekExx(KeyCacheObject key) {
+    @Nullable private GridLocalCacheEntry peekExx(KeyCacheObject key) {
         return (GridLocalCacheEntry)peekEx(key);
     }
 
@@ -215,7 +224,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
         modes.backup = true;
 
         if (modes.offheap)
-            return ctx.offheap().entriesCount();
+            return ctx.offheap().cacheEntriesCount(ctx.cacheId());
         else if (modes.heap)
             return size();
         else

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 56041ee..384cd41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -104,7 +104,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
     public GridLocalAtomicCache(GridCacheContext<K, V> ctx) {
         super(ctx);
 
-        preldr = new GridCachePreloaderAdapter(ctx);
+        preldr = new GridCachePreloaderAdapter(ctx.group());
     }
 
     /** {@inheritDoc} */
@@ -405,7 +405,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
             boolean skipEntry = readNoEntry;
 
             if (readNoEntry) {
-                CacheDataRow row = ctx.offheap().read(cacheKey);
+                CacheDataRow row = ctx.offheap().read(ctx, cacheKey);
 
                 if (row != null) {
                     long expireTime = row.expireTime();


Mime
View raw message