ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/22] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches
Date Tue, 06 Jun 2017 15:06:04 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 9063a12..dc4a91f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -72,7 +72,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private boolean lateAffAssign;
 
     /** Affinity information for all started caches (initialized on coordinator). */
-    private ConcurrentMap<Integer, CacheHolder> caches = new ConcurrentHashMap<>();
+    private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<>();
 
     /** Last topology version when affinity was calculated (updated from exchange thread). */
     private AffinityTopologyVersion affCalcVer;
@@ -81,7 +81,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private AffinityTopologyVersion lastAffVer;
 
     /** Registered caches (updated from exchange thread). */
-    private final Map<Integer, DynamicCacheDescriptor> registeredCaches = new HashMap<>();
+    private final Map<Integer, CacheGroupDescriptor> registeredGrps = new HashMap<>();
 
     /** */
     private WaitRebalanceInfo waitInfo;
@@ -126,14 +126,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
         if (type == EVT_NODE_JOINED && node.isLocal()) {
             // Clean-up in case of client reconnect.
-            registeredCaches.clear();
+            registeredGrps.clear();
 
             affCalcVer = null;
 
             lastAffVer = null;
 
-            for (DynamicCacheDescriptor desc : cctx.cache().cacheDescriptors())
-                registeredCaches.put(desc.cacheId(), desc);
+            for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors().values())
+                registeredGrps.put(desc.groupId(), desc);
         }
 
         if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
@@ -154,7 +154,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         if (msg.exchangeId() != null) {
             if (log.isDebugEnabled()) {
-                log.debug("Need process affinity change message [lastAffVer=" + lastAffVer +
+                log.debug("Ignore affinity change message [lastAffVer=" + lastAffVer +
                     ", msgExchId=" + msg.exchangeId() +
                     ", msgVer=" + msg.topologyVersion() + ']');
             }
@@ -188,14 +188,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * @param topVer Expected topology version.
      */
-    private void onCacheStopped(AffinityTopologyVersion topVer) {
+    private void onCacheGroupStopped(AffinityTopologyVersion topVer) {
         CacheAffinityChangeMessage msg = null;
 
         synchronized (mux) {
             if (waitInfo == null || !waitInfo.topVer.equals(topVer))
                 return;
 
-            if (waitInfo.waitCaches.isEmpty()) {
+            if (waitInfo.waitGrps.isEmpty()) {
                 msg = affinityChangeMessage(waitInfo);
 
                 waitInfo = null;
@@ -213,9 +213,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
     /**
      * @param top Topology.
-     * @param checkCacheId Cache ID.
+     * @param checkGrpId Group ID.
      */
-    void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
+    void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) {
         if (!lateAffAssign)
             return;
 
@@ -229,14 +229,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             assert affCalcVer.equals(waitInfo.topVer) : "Invalid affinity version [calcVer=" + affCalcVer +
                 ", waitVer=" + waitInfo.topVer + ']';
 
-            Map<Integer, UUID> partWait = waitInfo.waitCaches.get(checkCacheId);
+            Map<Integer, UUID> partWait = waitInfo.waitGrps.get(checkGrpId);
 
             boolean rebalanced = true;
 
             if (partWait != null) {
-                CacheHolder cache = caches.get(checkCacheId);
+                CacheGroupHolder grpHolder = grpHolders.get(checkGrpId);
 
-                if (cache != null) {
+                if (grpHolder != null) {
                     for (Iterator<Map.Entry<Integer, UUID>> it = partWait.entrySet().iterator(); it.hasNext(); ) {
                         Map.Entry<Integer, UUID> e = it.next();
 
@@ -256,9 +256,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
 
                 if (rebalanced) {
-                    waitInfo.waitCaches.remove(checkCacheId);
+                    waitInfo.waitGrps.remove(checkGrpId);
 
-                    if (waitInfo.waitCaches.isEmpty()) {
+                    if (waitInfo.waitGrps.isEmpty()) {
                         msg = affinityChangeMessage(waitInfo);
 
                         waitInfo = null;
@@ -287,7 +287,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         Map<Integer, Map<Integer, List<UUID>>> assignmentsChange = U.newHashMap(waitInfo.assignments.size());
 
         for (Map.Entry<Integer, Map<Integer, List<ClusterNode>>> e : waitInfo.assignments.entrySet()) {
-            Integer cacheId = e.getKey();
+            Integer grpId = e.getKey();
 
             Map<Integer, List<ClusterNode>> assignment = e.getValue();
 
@@ -296,23 +296,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             for (Map.Entry<Integer, List<ClusterNode>> e0 : assignment.entrySet())
                 assignment0.put(e0.getKey(), toIds0(e0.getValue()));
 
-            assignmentsChange.put(cacheId, assignment0);
+            assignmentsChange.put(grpId, assignment0);
         }
 
         return new CacheAffinityChangeMessage(waitInfo.topVer, assignmentsChange, waitInfo.deploymentIds);
     }
 
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    public void onCacheCreated(GridCacheContext cctx) {
-        final Integer cacheId = cctx.cacheId();
+    void onCacheGroupCreated(CacheGroupContext grp) {
+        final Integer grpId = grp.groupId();
 
-        if (!caches.containsKey(cctx.cacheId())) {
-            cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class,
+        if (!grpHolders.containsKey(grp.groupId())) {
+            cctx.io().addCacheGroupHandler(grpId, GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(nodeId, res);
+                        processAffinityAssignmentResponse(grpId, nodeId, res);
                     }
                 });
         }
@@ -322,28 +322,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param exchActions Cache change requests to execute on exchange.
      */
     private void updateCachesInfo(ExchangeActions exchActions) {
-        for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
-            DynamicCacheDescriptor desc = registeredCaches.remove(action.descriptor().cacheId());
+        for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
+            CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
 
-            assert desc != null : action.request().cacheName();
+            assert rmvd != null : stopDesc.cacheOrGroupName();
         }
 
-        for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) {
-            DynamicCacheChangeRequest req = action.request();
-
-            Integer cacheId = action.descriptor().cacheId();
-
-            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
-                req.startCacheConfiguration(),
-                req.cacheType(),
-                false,
-                action.descriptor().receivedFrom(),
-                action.descriptor().staticallyConfigured(),
-                action.descriptor().sql(),
-                req.deploymentId(),
-                req.schema());
-
-            DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
+        for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
+            CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
 
             assert old == null : old;
         }
@@ -360,7 +346,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
         boolean crd,
-        ExchangeActions exchActions)
+        final ExchangeActions exchActions)
         throws IgniteCheckedException
     {
         assert exchActions != null && !exchActions.empty() : exchActions;
@@ -368,9 +354,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
-        forAllCaches(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+        forAllCacheGroups(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                if (fut.stopping(aff.cacheId()))
+                if (exchActions.cacheGroupStopping(aff.groupId()))
                     return;
 
                 aff.clientEventTopologyChange(fut.discoveryEvent(), fut.topologyVersion());
@@ -392,8 +378,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 nearCfg = req.nearCacheConfiguration();
             }
             else {
-                startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
-                    CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
+                startCache = cctx.cacheContext(cacheDesc.cacheId()) == null &&
+                    CU.affinityNode(cctx.localNode(), cacheDesc.groupDescriptor().config().getNodeFilter());
             }
 
             try {
@@ -401,102 +387,122 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());
 
                     if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
-                        if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
+                        if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
                             U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
                     }
                 }
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " +
+                    "[cacheName=" + req.cacheName() + ']', e);
 
-                if (!crd || !lateAffAssign) {
-                    GridCacheContext cacheCtx = cctx.cacheContext(cacheDesc.cacheId());
-
-                    if (cacheCtx != null && !cacheCtx.isLocal()) {
-                        boolean clientCacheStarted =
-                            req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
+                cctx.cache().forceCloseCache(fut.topologyVersion(), action, e);
+            }
+        }
 
-                        if (clientCacheStarted)
-                            initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
-                        else if (!req.clientStartOnly()) {
-                            assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
+            Set<Integer> gprs = new HashSet<>();
 
-                            GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+                for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+                    Integer grpId = action.descriptor().groupId();
 
-                            assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
+                    if (gprs.add(grpId)) {
+                        if (crd && lateAffAssign)
+                    initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());else  {
+                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                                fut.discoveryEvent(), fut.discoCache());
+                        if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) {
+                        assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
 
-                            aff.initialize(fut.topologyVersion(), assignment);
-                        }
+                        initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut);
                     }
                 }
-                else
-                    initStartedCacheOnCoordinator(fut, cacheDesc.cacheId());
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " +
-                    "[cacheName=" + req.cacheName() + ']', e);
-
-                cctx.cache().forceCloseCache(fut.topologyVersion(), action, e);
             }
         }
 
-        for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) {
-            Integer cacheId = CU.cacheId(req.cacheName());
+        List<ExchangeActions.ActionData> closeReqs = exchActions.closeRequests(cctx.localNodeId());
 
-            cctx.cache().blockGateway(req);
+        for (ExchangeActions.ActionData req : closeReqs) {
+            cctx.cache().blockGateway(req.request());
 
             if (crd) {
-                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                CacheGroupContext grp = cctx.cache().cacheGroup(req.descriptor().groupId());
+
+                assert grp != null;
+
+                if (grp.affinityNode())
+                    continue;
+
+                boolean grpClosed = false;
 
-                // Client cache was stopped, need create 'client' CacheHolder.
-                if (cacheCtx != null && !cacheCtx.affinityNode()) {
-                    CacheHolder cache = caches.remove(cacheId);
+                if (grp.sharedGroup()) {
+                    boolean cacheRemaining = false;
 
-                    assert !cache.client() : cache;
+                    for (GridCacheContext ctx : cctx.cacheContexts()) {
+                        if (ctx.group() == grp && !cacheClosed(ctx.cacheId(), closeReqs)) {
+                            cacheRemaining = true;
 
-                    cache = CacheHolder2.create(cctx,
-                        cctx.cache().cacheDescriptor(cacheId),
-                        fut,
-                        cache.affinity());
+                            break;
+                        }
+                    }
 
-                    caches.put(cacheId, cache);
+                    if (!cacheRemaining)
+                        grpClosed = true;
                 }
-            }
-        }
+                else
+                    grpClosed = true;
+
+                // All client cache groups were stopped, need create 'client' CacheGroupHolder.
+                if (grpClosed) {
+                    CacheGroupHolder grpHolder = grpHolders.remove(grp.groupId());
+
+                    if (grpHolder != null) {
+                        assert !grpHolder.client() : grpHolder;
 
-        Set<Integer> stoppedCaches = null;
+                        grpHolder = CacheGroupHolder2.create(cctx,
+                            registeredGrps.get(grp.groupId()),
+                            fut,
+                            grp.affinity());
 
-        for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
-            DynamicCacheDescriptor desc = action.descriptor();
+                        grpHolders.put(grp.groupId(), grpHolder);
+                    }
+                }
+            }
+        }
 
+        for (ExchangeActions.ActionData action : exchActions.cacheStopRequests())
             cctx.cache().blockGateway(action.request());
 
-            if (crd && lateAffAssign && desc.cacheConfiguration().getCacheMode() != LOCAL) {
-                CacheHolder cache = caches.remove(desc.cacheId());
+        Set<Integer> stoppedGrps = null;
 
-                assert cache != null : action.request();
+        if (crd && lateAffAssign) {
+            for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) {
+                if (grpDesc.config().getCacheMode() != LOCAL) {
+                    CacheGroupHolder cacheGrp = grpHolders.remove(grpDesc.groupId());
 
-                if (stoppedCaches == null)
-                    stoppedCaches = new HashSet<>();
+                    assert cacheGrp != null : grpDesc;
 
-                stoppedCaches.add(cache.cacheId());
+                    if (stoppedGrps == null)
+                        stoppedGrps = new HashSet<>();
 
-                cctx.io().removeHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class);
+                    stoppedGrps.add(cacheGrp.groupId());
+
+                    cctx.io().removeHandler(true, cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class);
+                }
             }
         }
 
-        if (stoppedCaches != null) {
+        if (stoppedGrps != null) {
             boolean notify = false;
 
             synchronized (mux) {
                 if (waitInfo != null) {
-                    for (Integer cacheId : stoppedCaches) {
-                        boolean rmv = waitInfo.waitCaches.remove(cacheId) != null;
+                    for (Integer grpId : stoppedGrps) {
+                        boolean rmv = waitInfo.waitGrps.remove(grpId) != null;
 
                         if (rmv) {
                             notify = true;
 
-                            waitInfo.assignments.remove(cacheId);
+                            waitInfo.assignments.remove(grpId);
                         }
                     }
                 }
@@ -507,7 +513,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                     @Override public void run() {
-                        onCacheStopped(topVer);
+                        onCacheGroupStopped(topVer);
                     }
                 });
             }
@@ -517,12 +523,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @param closeReqs Close requests.
+     * @return {@code True} if requests contain request for given cache ID.
+     */
+    private boolean cacheClosed(int cacheId, List<ExchangeActions.ActionData> closeReqs) {
+        for (ExchangeActions.ActionData req : closeReqs) {
+            if (req.descriptor().cacheId() == cacheId)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      *
      */
     public void removeAllCacheInfo(){
-        caches.clear();
+        grpHolders.clear();
 
-        registeredCaches.clear();
+        registeredGrps.clear();
     }
 
     /**
@@ -550,13 +570,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
-        forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+        forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                 List<List<ClusterNode>> idealAssignment = aff.idealAssignment();
 
                 assert idealAssignment != null;
 
-                Map<Integer, List<UUID>> cacheAssignment = assignment.get(aff.cacheId());
+                Map<Integer, List<UUID>> cacheAssignment = assignment.get(aff.groupId());
 
                 List<List<ClusterNode>> newAssignment;
 
@@ -606,25 +626,25 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
-        forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+        forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                 AffinityTopologyVersion affTopVer = aff.lastVersion();
 
                 assert affTopVer.topologyVersion() > 0 : affTopVer;
 
-                DynamicCacheDescriptor desc = registeredCaches.get(aff.cacheId());
+                CacheGroupDescriptor desc = registeredGrps.get(aff.groupId());
 
-                assert desc != null : aff.cacheName();
+                assert desc != null : aff.cacheOrGroupName();
 
                 IgniteUuid deploymentId = desc.deploymentId();
 
-                if (!deploymentId.equals(deploymentIds.get(aff.cacheId()))) {
+                if (!deploymentId.equals(deploymentIds.get(aff.groupId()))) {
                     aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
 
                     return;
                 }
 
-                Map<Integer, List<UUID>> change = affChange.get(aff.cacheId());
+                Map<Integer, List<UUID>> change = affChange.get(aff.groupId());
 
                 if (change != null) {
                     assert !change.isEmpty() : msg;
@@ -639,7 +659,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         List<ClusterNode> nodes = toNodes(topVer, e.getValue());
 
                         assert !nodes.equals(assignment.get(part)) : "Assignment did not change " +
-                            "[cache=" + aff.cacheName() +
+                            "[cacheGrp=" + aff.cacheOrGroupName() +
                             ", part=" + part +
                             ", cur=" + F.nodeIds(assignment.get(part)) +
                             ", new=" + F.nodeIds(nodes) +
@@ -675,7 +695,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         if (lateAffAssign) {
             if (!locJoin) {
-                forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+                forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                     @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                         AffinityTopologyVersion topVer = fut.topologyVersion();
 
@@ -688,7 +708,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
         else {
             if (!locJoin) {
-                forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+                forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                     @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                         AffinityTopologyVersion topVer = fut.topologyVersion();
 
@@ -721,10 +741,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param grpId Cache group ID.
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processAffinityAssignmentResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
+    private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, GridDhtAffinityAssignmentResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
 
@@ -738,11 +759,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param c Cache closure.
      * @throws IgniteCheckedException If failed
      */
-    private void forAllRegisteredCaches(IgniteInClosureX<DynamicCacheDescriptor> c) throws IgniteCheckedException {
+    private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException {
         assert lateAffAssign;
 
-        for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) {
-            if (cacheDesc.cacheConfiguration().getCacheMode() == LOCAL)
+        for (CacheGroupDescriptor cacheDesc : registeredGrps.values()) {
+            if (cacheDesc.config().getCacheMode() == LOCAL)
                 continue;
 
             c.applyx(cacheDesc);
@@ -753,56 +774,60 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param crd Coordinator flag.
      * @param c Closure.
      */
-    private void forAllCaches(boolean crd, IgniteInClosureX<GridAffinityAssignmentCache> c) {
+    private void forAllCacheGroups(boolean crd, IgniteInClosureX<GridAffinityAssignmentCache> c) {
         if (crd) {
-            for (CacheHolder cache : caches.values())
-                c.apply(cache.affinity());
+            for (CacheGroupHolder grp : grpHolders.values())
+                c.apply(grp.affinity());
         }
         else {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupContext grp : cctx.kernalContext().cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
-                c.apply(cacheCtx.affinity().affinityCache());
+                c.apply(grp.affinity());
             }
         }
     }
 
     /**
      * @param fut Exchange future.
-     * @param cacheId Cache ID.
+     * @param grpDesc Cache group descriptor.
      * @throws IgniteCheckedException If failed.
      */
-    private void initStartedCacheOnCoordinator(GridDhtPartitionsExchangeFuture fut, final Integer cacheId)
+    private void initStartedGroupOnCoordinator(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor grpDesc)
         throws IgniteCheckedException {
-        CacheHolder cache = caches.get(cacheId);
+        assert grpDesc != null && grpDesc.groupId() != 0 : grpDesc;
 
-        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+        if (grpDesc.config().getCacheMode() == LOCAL)
+            return;
 
-        if (cache == null) {
-            DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId);
+        Integer grpId = grpDesc.groupId();
 
-            assert desc != null : cacheId;
+        CacheGroupHolder grpHolder = grpHolders.get(grpId);
 
-            if (desc.cacheConfiguration().getCacheMode() == LOCAL)
-                return;
+        CacheGroupContext grp = cctx.kernalContext().cache().cacheGroup(grpId);
 
-            cache = cacheCtx != null ? new CacheHolder1(cacheCtx, null) : CacheHolder2.create(cctx, desc, fut, null);
+        if (grpHolder == null) {
+            grpHolder = grp != null ?
+                new CacheGroupHolder1(grp, null) :
+                CacheGroupHolder2.create(cctx, grpDesc, fut, null);
 
-            CacheHolder old = caches.put(cacheId, cache);
+            CacheGroupHolder old = grpHolders.put(grpId, grpHolder);
 
             assert old == null : old;
 
-            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+            List<List<ClusterNode>> newAff = grpHolder.affinity().calculate(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache());
 
-            cache.affinity().initialize(fut.topologyVersion(), newAff);
+            grpHolder.affinity().initialize(fut.topologyVersion(), newAff);
         }
-        else if (cache.client() && cacheCtx != null) {
-            assert cache.affinity().idealAssignment() != null;
+        else if (grpHolder.client() && grp != null) {
+            assert grpHolder.affinity().idealAssignment() != null;
 
-            cache = new CacheHolder1(cacheCtx, cache.affinity());
+            grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
 
-            caches.put(cacheId, cache);
+            grpHolders.put(grpId, grpHolder);
         }
     }
 
@@ -818,14 +843,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         final GridDhtPartitionsExchangeFuture fut,
         Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
         for (DynamicCacheDescriptor desc : descs) {
-            if (!registeredCaches.containsKey(desc.cacheId()))
-                registeredCaches.put(desc.cacheId(), desc);
+            CacheGroupDescriptor grpDesc = desc.groupDescriptor();
+
+            if (!registeredGrps.containsKey(grpDesc.groupId()))
+                registeredGrps.put(grpDesc.groupId(), grpDesc);
         }
 
         if (crd && lateAffAssign) {
-            forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
-                @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException {
-                    CacheHolder cache = cache(fut, desc);
+            forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+                @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+                    CacheGroupHolder cache = groupHolder(fut, desc);
 
                     if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         List<List<ClusterNode>> assignment =
@@ -837,30 +864,28 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             });
         }
         else {
-            forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+            forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(registeredCaches.get(aff.cacheId()), aff, fut, false);
+                        initAffinity(registeredGrps.get(aff.groupId()), aff, fut);
                 }
             });
         }
     }
 
     /**
-     * @param desc Cache descriptor.
+     * @param desc Cache group descriptor.
      * @param aff Affinity.
      * @param fut Exchange future.
-     * @param fetch Force fetch flag.
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinity(DynamicCacheDescriptor desc,
+    private void initAffinity(CacheGroupDescriptor desc,
         GridAffinityAssignmentCache aff,
-        GridDhtPartitionsExchangeFuture fut,
-        boolean fetch)
+        GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
-        assert desc != null;
+        assert desc != null : aff.cacheOrGroupName();
 
-        if (!fetch && canCalculateAffinity(desc, aff, fut)) {
+        if (canCalculateAffinity(desc, aff, fut)) {
             List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
             aff.initialize(fut.topologyVersion(), assignment);
@@ -878,26 +903,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param desc Cache descriptor.
+     * @param desc Cache group descriptor.
      * @param aff Affinity.
      * @param fut Exchange future.
      * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
      */
-    private boolean canCalculateAffinity(DynamicCacheDescriptor desc,
+    private boolean canCalculateAffinity(CacheGroupDescriptor desc,
         GridAffinityAssignmentCache aff,
         GridDhtPartitionsExchangeFuture fut) {
-        assert desc != null : aff.cacheName();
+        assert desc != null : aff.cacheOrGroupName();
 
         // Do not request affinity from remote nodes if affinity function is not centralized.
-        if (!aff.centralizedAffinityFunction())
+        if (!lateAffAssign && !aff.centralizedAffinityFunction())
             return true;
 
         // If local node did not initiate exchange or local node is the only cache node in grid.
-        Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion());
+        Collection<ClusterNode> affNodes = fut.discoCache().cacheGroupAffinityNodes(aff.groupId());
 
-        return fut.cacheAddedOnExchange(aff.cacheId(), desc.receivedFrom()) ||
+        return fut.cacheGroupAddedOnExchange(aff.groupId(), desc.receivedFrom()) ||
             !fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
-            (affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
+            (affNodes.isEmpty() || (affNodes.size() == 1 && affNodes.contains(cctx.localNode())));
     }
 
     /**
@@ -917,13 +942,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (lateAffAssign) {
             if (locJoin) {
                 if (crd) {
-                    forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
-                        @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
+                    forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+                        @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                             AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                            CacheHolder cache = cache(fut, cacheDesc);
+                            CacheGroupHolder cache = groupHolder(fut, desc);
 
-                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
+                                fut.discoveryEvent(),
+                                fut.discoCache());
 
                             cache.affinity().initialize(topVer, newAff);
                         }
@@ -948,21 +975,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (crd && lateAffAssign) {
                 if (log.isDebugEnabled()) {
                     log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() +
-                        ", waitCaches=" + (info != null ? cacheNames(info.waitCaches.keySet()) : null) + ']');
+                        ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
                 }
             }
         }
     }
 
     /**
-     * @param cacheIds Cache IDs.
+     * @param grpIds Cache group IDs.
      * @return Cache names.
      */
-    private String cacheNames(Collection<Integer> cacheIds) {
+    private String groupNames(Collection<Integer> grpIds) {
         StringBuilder names = new StringBuilder();
 
-        for (Integer cacheId : cacheIds) {
-            String name = registeredCaches.get(cacheId).cacheConfiguration().getName();
+        for (Integer grpId : grpIds) {
+            String name = registeredGrps.get(grpId).cacheOrGroupName();
 
             if (names.length() != 0)
                 names.append(", ");
@@ -982,21 +1009,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            DynamicCacheDescriptor cacheDesc = registeredCaches.get(cacheCtx.cacheId());
-
-            if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) {
-                List<List<ClusterNode>> assignment =
-                    cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+            if (fut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) {
+                List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(),
+                    fut.discoveryEvent(),
+                    fut.discoCache());
 
-                cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment);
+                grp.affinity().initialize(fut.topologyVersion(), assignment);
             }
             else {
+                CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId());
+
+                assert grpDesc != null : grp.cacheOrGroupName();
+
                 GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                    cacheDesc,
+                    grpDesc,
                     topVer,
                     fut.discoCache());
 
@@ -1009,9 +1039,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         for (int i = 0; i < fetchFuts.size(); i++) {
             GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
 
-            Integer cacheId = fetchFut.cacheId();
+            Integer grpId = fetchFut.groupId();
 
-            fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
+            fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
         }
     }
 
@@ -1070,11 +1100,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         boolean centralizedAff;
 
         if (lateAffAssign) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
-                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
             }
 
             centralizedAff = true;
@@ -1101,11 +1131,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
         assert !lateAffAssign;
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            initAffinity(registeredCaches.get(cacheCtx.cacheId()), cacheCtx.affinity().affinityCache(), fut, false);
+            initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut);
         }
     }
 
@@ -1116,35 +1146,38 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
+        assert lateAffAssign;
+
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
 
-        forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
-            @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException {
-                CacheHolder cache = caches.get(desc.cacheId());
+        forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+            @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+                CacheGroupHolder grpHolder = grpHolders.get(desc.groupId());
 
-                if (cache != null) {
-                    if (cache.client())
-                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                if (grpHolder != null) {
+                    if (grpHolder.client()) // Affinity for non-client holders calculated in {@link #onServerLeft}.
+                        grpHolder.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
                     return;
                 }
 
-                final Integer cacheId = desc.cacheId();
+                // Need initialize holders and affinity if this node became coordinator during this exchange.
+                final Integer grpId = desc.groupId();
 
-                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                if (cacheCtx == null) {
-                    cctx.io().addHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class,
+                if (grp == null) {
+                    cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class,
                         new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                             @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                                processAffinityAssignmentResponse(nodeId, res);
+                                processAffinityAssignmentResponse(grpId, nodeId, res);
                             }
                         }
                     );
 
-                    cache = CacheHolder2.create(cctx, desc, fut, null);
+                    grpHolder = CacheGroupHolder2.create(cctx, desc, fut, null);
 
-                    final GridAffinityAssignmentCache aff = cache.affinity();
+                    final GridAffinityAssignmentCache aff = grpHolder.affinity();
 
                     List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
 
@@ -1154,9 +1187,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         ", total=" + exchFuts.size() + ']';
 
                     final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
+
                     if (log.isDebugEnabled()) {
                         log.debug("Need initialize affinity on coordinator [" +
-                            "cache=" + desc.cacheConfiguration().getName() +
+                            "cacheGrp=" + desc.cacheOrGroupName() +
                             "prevAff=" + prev.topologyVersion() + ']');
                     }
 
@@ -1185,9 +1219,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     futs.add(affFut);
                 }
                 else
-                    cache = new CacheHolder1(cacheCtx, null);
+                    grpHolder = new CacheGroupHolder1(grp, null);
 
-                CacheHolder old = caches.put(cache.cacheId(), cache);
+                CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder);
 
                 assert old == null : old;
             }
@@ -1213,38 +1247,36 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @return Cache holder.
      * @throws IgniteCheckedException If failed.
      */
-    private CacheHolder cache(GridDhtPartitionsExchangeFuture fut, DynamicCacheDescriptor desc)
+    private CacheGroupHolder groupHolder(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor desc)
         throws IgniteCheckedException {
         assert lateAffAssign;
 
-        final Integer cacheId = desc.cacheId();
-
-        CacheHolder cache = caches.get(cacheId);
+        CacheGroupHolder cacheGrp = grpHolders.get(desc.groupId());
 
-        if (cache != null)
-            return cache;
+        if (cacheGrp != null)
+            return cacheGrp;
 
-        GridCacheContext cacheCtx = cctx.cacheContext(desc.cacheId());
+        final CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
 
-        if (cacheCtx == null) {
-            cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class,
+        if (grp == null) {
+            cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(nodeId, res);
+                        processAffinityAssignmentResponse(desc.groupId(), nodeId, res);
                     }
                 }
             );
 
-            cache = CacheHolder2.create(cctx, desc, fut, null);
+            cacheGrp = CacheGroupHolder2.create(cctx, desc, fut, null);
         }
         else
-            cache = new CacheHolder1(cacheCtx, null);
+            cacheGrp = new CacheGroupHolder1(grp, null);
 
-        CacheHolder old = caches.put(cache.cacheId(), cache);
+        CacheGroupHolder old = grpHolders.put(desc.groupId(), cacheGrp);
 
         assert old == null : old;
 
-        return cache;
+        return cacheGrp;
     }
 
     /**
@@ -1255,18 +1287,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd)
         throws IgniteCheckedException {
+        assert lateAffAssign;
+
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
         if (!crd) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
-                boolean latePrimary = cacheCtx.rebalanceEnabled();
+                boolean latePrimary = grp.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
+                initAffinityOnNodeJoin(fut, grp.affinity(), null, latePrimary, affCache);
             }
 
             return null;
@@ -1274,9 +1308,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         else {
             final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
 
-            forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
-                @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
-                    CacheHolder cache = cache(fut, cacheDesc);
+            forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+                @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+                    CacheGroupHolder cache = groupHolder(fut, desc);
 
                     boolean latePrimary = cache.rebalanceEnabled;
 
@@ -1309,7 +1343,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         AffinityTopologyVersion affTopVer = aff.lastVersion();
 
-        assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [cache=" + aff.cacheName() +
+        assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() +
             ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']';
 
         List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
@@ -1400,7 +1434,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         if (rebalance != null)
-            rebalance.add(aff.cacheId(), part, newNodes.get(0).id(), newNodes);
+            rebalance.add(aff.groupId(), part, newNodes.get(0).id(), newNodes);
 
         return nodes0;
     }
@@ -1443,6 +1477,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
+        assert lateAffAssign;
+
         final AffinityTopologyVersion topVer = fut.topologyVersion();
 
         final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
@@ -1451,24 +1487,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         final Map<Integer, Map<Integer, List<UUID>>> assignment = new HashMap<>();
 
-        forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
-            @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
-                CacheHolder cache = cache(fut, cacheDesc);
+        forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+            @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+                CacheGroupHolder grpHolder = groupHolder(fut, desc);
 
-                if (!cache.rebalanceEnabled)
+                if (!grpHolder.rebalanceEnabled)
                     return;
 
-                AffinityTopologyVersion affTopVer = cache.affinity().lastVersion();
+                AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion();
 
                 assert affTopVer.topologyVersion() > 0 && !affTopVer.equals(topVer) : "Invalid affinity version " +
-                    "[last=" + affTopVer + ", futVer=" + topVer + ", cache=" + cache.name() + ']';
+                    "[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']';
 
-                List<List<ClusterNode>> curAssignment = cache.affinity().assignments(affTopVer);
-                List<List<ClusterNode>> newAssignment = cache.affinity().idealAssignment();
+                List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer);
+                List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignment();
 
                 assert newAssignment != null;
 
-                GridDhtPartitionTopology top = cache.topology(fut);
+                GridDhtPartitionTopology top = grpHolder.topology(fut);
 
                 Map<Integer, List<UUID>> cacheAssignment = null;
 
@@ -1482,7 +1518,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     List<ClusterNode> newNodes0 = null;
 
                     assert newPrimary == null || aliveNodes.contains(newPrimary) : "Invalid new primary [" +
-                        "cache=" + cache.name() +
+                        "grp=" + desc.cacheOrGroupName() +
                         ", node=" + newPrimary +
                         ", topVer=" + topVer + ']';
 
@@ -1491,7 +1527,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             GridDhtPartitionState state = top.partitionState(newPrimary.id(), p);
 
                             if (state != GridDhtPartitionState.OWNING) {
-                                newNodes0 = latePrimaryAssignment(cache.affinity(),
+                                newNodes0 = latePrimaryAssignment(grpHolder.affinity(),
                                     p,
                                     curPrimary,
                                     newNodes,
@@ -1506,7 +1542,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                                     ClusterNode curNode = curNodes.get(i);
 
                                     if (top.partitionState(curNode.id(), p) == GridDhtPartitionState.OWNING) {
-                                        newNodes0 = latePrimaryAssignment(cache.affinity(),
+                                        newNodes0 = latePrimaryAssignment(grpHolder.affinity(),
                                             p,
                                             curNode,
                                             newNodes,
@@ -1521,7 +1557,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                                     for (ClusterNode owner : owners) {
                                         if (aliveNodes.contains(owner)) {
-                                            newNodes0 = latePrimaryAssignment(cache.affinity(),
+                                            newNodes0 = latePrimaryAssignment(grpHolder.affinity(),
                                                 p,
                                                 owner,
                                                 newNodes,
@@ -1544,7 +1580,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
 
                 if (cacheAssignment != null)
-                    assignment.put(cache.cacheId(), cacheAssignment);
+                    assignment.put(grpHolder.groupId(), cacheAssignment);
             }
         });
 
@@ -1557,7 +1593,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             if (log.isDebugEnabled()) {
                 log.debug("Computed new affinity after node left [topVer=" + topVer +
-                    ", waitCaches=" + (info != null ? cacheNames(info.waitCaches.keySet()) : null) + ']');
+                    ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
             }
         }
 
@@ -1616,7 +1652,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      *
      */
-    abstract static class CacheHolder {
+    abstract static class CacheGroupHolder {
         /** */
         private final GridAffinityAssignmentCache aff;
 
@@ -1628,7 +1664,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param aff Affinity cache.
          * @param initAff Existing affinity cache.
          */
-        CacheHolder(boolean rebalanceEnabled,
+        CacheGroupHolder(boolean rebalanceEnabled,
             GridAffinityAssignmentCache aff,
             @Nullable GridAffinityAssignmentCache initAff) {
             this.aff = aff;
@@ -1645,10 +1681,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         abstract boolean client();
 
         /**
-         * @return Cache ID.
+         * @return Group ID.
          */
-        int cacheId() {
-            return aff.cacheId();
+        int groupId() {
+            return aff.groupId();
         }
 
         /**
@@ -1659,13 +1695,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /**
-         * @return Cache name.
-         */
-        String name() {
-            return aff.cacheName();
-        }
-
-        /**
          * @param fut Exchange future.
          * @return Cache topology.
          */
@@ -1682,20 +1711,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * Created cache is started on coordinator.
      */
-    private class CacheHolder1 extends CacheHolder {
+    private class CacheGroupHolder1 extends CacheGroupHolder {
         /** */
-        private final GridCacheContext cctx;
+        private final CacheGroupContext grp;
 
         /**
-         * @param cctx Cache context.
+         * @param grp Cache group.
          * @param initAff Current affinity.
          */
-        CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
-            super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff);
+        CacheGroupHolder1(CacheGroupContext grp, @Nullable GridAffinityAssignmentCache initAff) {
+            super(grp.rebalanceEnabled(), grp.affinity(), initAff);
 
-            assert !cctx.isLocal() : cctx.name();
+            assert !grp.isLocal() : grp;
 
-            this.cctx = cctx;
+            this.grp = grp;
         }
 
         /** {@inheritDoc} */
@@ -1704,56 +1733,41 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /** {@inheritDoc} */
-        @Override public int partitions() {
-            return cctx.affinity().partitions();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return cctx.name();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int cacheId() {
-            return cctx.cacheId();
-        }
-
-        /** {@inheritDoc} */
         @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) {
-            return cctx.topology();
+            return grp.topology();
         }
     }
 
     /**
      * Created if cache is not started on coordinator.
      */
-    private static class CacheHolder2 extends CacheHolder {
+    private static class CacheGroupHolder2 extends CacheGroupHolder {
         /** */
         private final GridCacheSharedContext cctx;
 
         /**
          * @param cctx Context.
-         * @param cacheDesc Cache descriptor.
+         * @param grpDesc Cache group descriptor.
          * @param fut Exchange future.
          * @param initAff Current affinity.
          * @return Cache holder.
          * @throws IgniteCheckedException If failed.
          */
-        static CacheHolder2 create(
+        static CacheGroupHolder2 create(
             GridCacheSharedContext cctx,
-            DynamicCacheDescriptor cacheDesc,
+            CacheGroupDescriptor grpDesc,
             GridDhtPartitionsExchangeFuture fut,
             @Nullable GridAffinityAssignmentCache initAff) throws IgniteCheckedException {
-            assert cacheDesc != null;
+            assert grpDesc != null;
             assert !cctx.kernalContext().clientNode();
 
-            CacheConfiguration<?, ?> ccfg = cacheDesc.cacheConfiguration();
+            CacheConfiguration<?, ?> ccfg = grpDesc.config();
 
-            assert ccfg != null : cacheDesc;
+            assert ccfg != null : grpDesc;
             assert ccfg.getCacheMode() != LOCAL : ccfg.getName();
 
-            assert !cctx.discovery().cacheAffinityNodes(ccfg.getName(),
-                fut.topologyVersion()).contains(cctx.localNode()) : cacheDesc.cacheName();
+            assert !cctx.discovery().cacheGroupAffinityNodes(grpDesc.groupId(),
+                fut.topologyVersion()).contains(cctx.localNode()) : grpDesc.cacheOrGroupName();
 
             AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity());
 
@@ -1763,13 +1777,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             U.startLifecycleAware(F.asList(affFunc));
 
             GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
-                ccfg.getName(),
+                grpDesc.cacheOrGroupName(),
+                grpDesc.groupId(),
                 affFunc,
                 ccfg.getNodeFilter(),
                 ccfg.getBackups(),
                 ccfg.getCacheMode() == LOCAL);
 
-            return new CacheHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);
+            return new CacheGroupHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);
         }
 
         /**
@@ -1778,7 +1793,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param aff Affinity.
          * @param initAff Current affinity.
          */
-        CacheHolder2(
+        CacheGroupHolder2(
             boolean rebalanceEnabled,
             GridCacheSharedContext cctx,
             GridAffinityAssignmentCache aff,
@@ -1795,7 +1810,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         /** {@inheritDoc} */
         @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) {
-            return cctx.exchange().clientTopology(cacheId(), fut);
+            return cctx.exchange().clientTopology(groupId(), fut);
         }
     }
 
@@ -1807,7 +1822,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         private final AffinityTopologyVersion topVer;
 
         /** */
-        private Map<Integer, Map<Integer, UUID>> waitCaches;
+        private Map<Integer, Map<Integer, UUID>> waitGrps;
 
         /** */
         private Map<Integer, Map<Integer, List<ClusterNode>>> assignments;
@@ -1826,9 +1841,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @return {@code True} if there are partitions waiting for rebalancing.
          */
         boolean empty() {
-            if (waitCaches != null) {
-                assert !waitCaches.isEmpty();
-                assert waitCaches.size() == assignments.size();
+            if (waitGrps != null) {
+                assert !waitGrps.isEmpty();
+                assert waitGrps.size() == assignments.size();
 
                 return false;
             }
@@ -1837,34 +1852,34 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /**
-         * @param cacheId Cache ID.
+         * @param grpId Group ID.
          * @param part Partition.
          * @param waitNode Node rebalancing data.
          * @param assignment New assignment.
          */
-        void add(Integer cacheId, Integer part, UUID waitNode, List<ClusterNode> assignment) {
+        void add(Integer grpId, Integer part, UUID waitNode, List<ClusterNode> assignment) {
             assert !F.isEmpty(assignment) : assignment;
 
-            if (waitCaches == null) {
-                waitCaches = new HashMap<>();
+            if (waitGrps == null) {
+                waitGrps = new HashMap<>();
                 assignments = new HashMap<>();
                 deploymentIds = new HashMap<>();
             }
 
-            Map<Integer, UUID> cacheWaitParts = waitCaches.get(cacheId);
+            Map<Integer, UUID> cacheWaitParts = waitGrps.get(grpId);
 
             if (cacheWaitParts == null) {
-                waitCaches.put(cacheId, cacheWaitParts = new HashMap<>());
+                waitGrps.put(grpId, cacheWaitParts = new HashMap<>());
 
-                deploymentIds.put(cacheId, registeredCaches.get(cacheId).deploymentId());
+                deploymentIds.put(grpId, registeredGrps.get(grpId).deploymentId());
             }
 
             cacheWaitParts.put(part, waitNode);
 
-            Map<Integer, List<ClusterNode>> cacheAssignment = assignments.get(cacheId);
+            Map<Integer, List<ClusterNode>> cacheAssignment = assignments.get(grpId);
 
             if (cacheAssignment == null)
-                assignments.put(cacheId, cacheAssignment = new HashMap<>());
+                assignments.put(grpId, cacheAssignment = new HashMap<>());
 
             cacheAssignment.put(part, assignment);
         }
@@ -1872,7 +1887,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         /** {@inheritDoc} */
         @Override public String toString() {
             return "WaitRebalanceInfo [topVer=" + topVer +
-                ", caches=" + (waitCaches != null ? waitCaches.keySet() : null) + ']';
+                ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']';
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
index a30331f..6a6f40d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
@@ -31,18 +31,31 @@ public class CacheClientReconnectDiscoveryData implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private final Map<Integer, CacheGroupInfo> clientCacheGrps;
+
+    /** */
     private final Map<String, CacheInfo> clientCaches;
 
     /**
      * @param clientCaches Information about caches started on re-joining client node.
+     * @param clientCacheGrps Information about cach groups started on re-joining client node.
      */
-    CacheClientReconnectDiscoveryData(Map<String, CacheInfo> clientCaches) {
+    CacheClientReconnectDiscoveryData(Map<Integer, CacheGroupInfo> clientCacheGrps,
+        Map<String, CacheInfo> clientCaches) {
+        this.clientCacheGrps = clientCacheGrps;
         this.clientCaches = clientCaches;
     }
 
     /**
      * @return Information about caches started on re-joining client node.
      */
+    Map<Integer, CacheGroupInfo> clientCacheGroups() {
+        return clientCacheGrps;
+    }
+
+    /**
+     * @return Information about caches started on re-joining client node.
+     */
     Map<String, CacheInfo> clientCaches() {
         return clientCaches;
     }
@@ -50,6 +63,53 @@ public class CacheClientReconnectDiscoveryData implements Serializable {
     /**
      *
      */
+    static class CacheGroupInfo implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final CacheConfiguration ccfg;
+
+        /** */
+        private final IgniteUuid deploymentId;
+
+        /** Flags added for future usage. */
+        private final long flags;
+
+        /**
+         * @param ccfg Cache group configuration.
+         * @param deploymentId Cache group deployment ID.
+         * @param flags Flags (for future usage).
+         */
+        CacheGroupInfo(CacheConfiguration ccfg,
+            IgniteUuid deploymentId,
+            long flags) {
+            assert ccfg != null;
+            assert deploymentId != null;
+
+            this.ccfg = ccfg;
+            this.deploymentId = deploymentId;
+            this.flags = flags;
+        }
+
+        /**
+         * @return Cache group configuration.
+         */
+        CacheConfiguration config() {
+            return ccfg;
+        }
+
+        /**
+         * @return Cache group deployment ID.
+         */
+        IgniteUuid deploymentId() {
+            return deploymentId;
+        }
+    }
+
+    /**
+     *
+     */
     static class CacheInfo implements Serializable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -67,7 +127,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable {
         private final boolean nearCache;
 
         /** Flags added for future usage. */
-        private final byte flags;
+        private final long flags;
 
         /**
          * @param ccfg Cache configuration.
@@ -80,7 +140,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable {
             CacheType cacheType,
             IgniteUuid deploymentId,
             boolean nearCache,
-            byte flags) {
+            long flags) {
             assert ccfg != null;
             assert cacheType != null;
             assert deploymentId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 3e2c259..b728d96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -35,7 +35,10 @@ public class CacheData implements Serializable {
     private final CacheConfiguration cacheCfg;
 
     /** */
-    private final Integer cacheId;
+    private final int cacheId;
+
+    /** */
+    private final int grpId;
 
     /** */
     private final CacheType cacheType;
@@ -59,11 +62,12 @@ public class CacheData implements Serializable {
     private final boolean template;
 
     /** Flags added for future usage. */
-    private final byte flags;
+    private final long flags;
 
     /**
      * @param cacheCfg Cache configuration.
      * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param cacheType Cache ID.
      * @param deploymentId Cache deployment ID.
      * @param schema Query schema.
@@ -75,6 +79,7 @@ public class CacheData implements Serializable {
      */
     CacheData(CacheConfiguration cacheCfg,
         int cacheId,
+        int grpId,
         CacheType cacheType,
         IgniteUuid deploymentId,
         QuerySchema schema,
@@ -82,14 +87,16 @@ public class CacheData implements Serializable {
         boolean staticCfg,
         boolean sql,
         boolean template,
-        byte flags) {
+        long flags) {
         assert cacheCfg != null;
         assert rcvdFrom != null : cacheCfg.getName();
         assert deploymentId != null : cacheCfg.getName();
         assert template || cacheId != 0 : cacheCfg.getName();
+        assert template || grpId != 0 : cacheCfg.getName();
 
         this.cacheCfg = cacheCfg;
         this.cacheId = cacheId;
+        this.grpId = grpId;
         this.cacheType = cacheType;
         this.deploymentId = deploymentId;
         this.schema = schema;
@@ -101,9 +108,16 @@ public class CacheData implements Serializable {
     }
 
     /**
+     * @return Cache group ID.
+     */
+    public int groupId() {
+        return grpId;
+    }
+
+    /**
      * @return Cache ID.
      */
-    public Integer cacheId() {
+    public int cacheId() {
         return cacheId;
     }
 


Mime
View raw message