ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/17] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Date Thu, 18 May 2017 08:54:33 GMT
Moved logic related to caches discovery data handling to ClusterCachesInfo.
Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86002002
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86002002
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86002002

Branch: refs/heads/ignite-5075
Commit: 86002002fda063eb33431e8bfd3f3a0df047325b
Parents: 20f4d18
Author: sboikov <sboikov@gridgain.com>
Authored: Thu May 18 11:33:25 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu May 18 11:33:25 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |   11 +-
 .../affinity/AffinityTopologyVersion.java       |    9 +
 .../cache/CacheAffinitySharedManager.java       |  286 ++--
 .../CacheClientReconnectDiscoveryData.java      |  133 ++
 .../internal/processors/cache/CacheData.java    |  157 ++
 .../cache/CacheJoinNodeDiscoveryData.java       |  147 ++
 .../cache/CacheNodeCommonDiscoveryData.java     |   82 +
 .../processors/cache/ClusterCachesInfo.java     |  903 +++++++++++
 .../cache/DynamicCacheChangeBatch.java          |   92 +-
 .../cache/DynamicCacheChangeRequest.java        |  102 +-
 .../cache/DynamicCacheDescriptor.java           |  171 +--
 .../processors/cache/ExchangeActions.java       |  338 +++++
 .../processors/cache/GridCacheContext.java      |   26 +-
 .../GridCachePartitionExchangeManager.java      |   54 +-
 .../processors/cache/GridCacheProcessor.java    | 1413 +++++-------------
 .../dht/GridClientPartitionTopology.java        |    2 +-
 .../dht/GridDhtAffinityAssignmentRequest.java   |   36 +-
 .../dht/GridDhtAffinityAssignmentResponse.java  |   36 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |   63 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   12 +-
 .../GridDhtAtomicAbstractUpdateRequest.java     |    2 +
 .../GridDhtPartitionsExchangeFuture.java        |  160 +-
 .../dht/preloader/GridDhtPreloader.java         |    4 +-
 .../processors/cache/local/GridLocalCache.java  |   31 +-
 .../cache/local/GridLocalLockFuture.java        |   41 +-
 .../cache/query/GridCacheQueryManager.java      |    3 +-
 .../continuous/CacheContinuousQueryHandler.java |   42 +-
 .../continuous/CacheContinuousQueryManager.java |   11 +-
 .../cluster/GridClusterStateProcessor.java      |   26 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |    2 +-
 .../ignite/spi/discovery/DiscoveryDataBag.java  |   18 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |    4 +-
 .../core/src/test/config/examples.properties    |    1 +
 ...ityFunctionBackupFilterAbstractSelfTest.java |   13 +-
 .../internal/GridNodeMetricsLogSelfTest.java    |   13 +-
 .../GridCacheAbstractLocalStoreSelfTest.java    |   41 +-
 ...gniteCacheP2pUnmarshallingNearErrorTest.java |    6 +-
 .../processors/cache/IgniteCacheStartTest.java  |  191 +++
 .../cache/IgniteDynamicCacheStartSelfTest.java  |    2 +-
 ...niteTopologyValidatorGridSplitCacheTest.java |    8 +-
 .../CacheDiscoveryDataConcurrentJoinTest.java   |  198 +++
 .../CacheLateAffinityAssignmentTest.java        |    4 +-
 .../cache/distributed/CacheStartOnJoinTest.java |   10 +
 .../IgniteCrossCacheTxStoreSelfTest.java        |   12 +-
 ...ePartitionedBasicStoreMultiNodeSelfTest.java |   22 +-
 .../loadtests/hashmap/GridCacheTestContext.java |    4 +
 .../testsuites/IgniteCacheTestSuite4.java       |    4 +
 .../jdbc/CacheJdbcPojoStoreFactorySelfTest.java |    2 +-
 48 files changed, 3198 insertions(+), 1750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index e5f2278..24c7283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -315,8 +315,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         boolean nearEnabled,
         CacheMode cacheMode
     ) {
-        if (!registeredCaches.containsKey(cacheName))
+        if (!registeredCaches.containsKey(cacheName)) {
+            if (cacheMode == CacheMode.REPLICATED)
+                nearEnabled = false;
+            
             registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode));
+        }
     }
 
     /**
@@ -2737,7 +2741,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @return {@code True} if this node is a data node for given cache.
          */
         public boolean dataNode(ClusterNode node) {
-            return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
+            return CU.affinityNode(node, cacheFilter);
         }
 
         /**
@@ -2753,9 +2757,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @return {@code True} if near cache is present on the given nodes.
          */
         public boolean nearNode(ClusterNode node) {
-            if (node.isDaemon())
-                return false;
-
             if (CU.affinityNode(node, cacheFilter))
                 return nearEnabled;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index f564e28..8669530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -73,6 +73,15 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
     }
 
     /**
+     * @return Topology version with incremented minor version.
+     */
+    public AffinityTopologyVersion nextMinorVersion() {
+        assert topVer > 0;
+
+        return new AffinityTopologyVersion(topVer, minorTopVer + 1);
+    }
+
+    /**
      * @return Topology version.
      */
     public long topologyVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0958208..cec42a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -48,7 +49,6 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -90,8 +90,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final Object mux = new Object();
 
     /** Pending affinity assignment futures. */
-    private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
-        pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
+        new ConcurrentHashMap8<>();
 
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -312,50 +312,59 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(cacheId, nodeId, res);
+                        processAffinityAssignmentResponse(nodeId, res);
                     }
                 });
         }
     }
 
     /**
+     * @param exchActions Cache change requests to execute on exchange.
+     */
+    private void updateCachesInfo(ExchangeActions exchActions) {
+        for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
+            DynamicCacheDescriptor desc = registeredCaches.remove(action.descriptor().cacheId());
+
+            assert desc != null : action.request().cacheName();
+        }
+
+        for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) {
+            DynamicCacheChangeRequest req = action.request();
+
+            Integer cacheId = action.descriptor().cacheId();
+
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
+                req.startCacheConfiguration(),
+                req.cacheType(),
+                false,
+                action.descriptor().receivedFrom(),
+                action.descriptor().staticallyConfigured(),
+                req.deploymentId(),
+                req.schema());
+
+            DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
+
+            assert old == null : old;
+        }
+    }
+
+    /**
      * Called on exchange initiated for cache start/stop request.
      *
      * @param fut Exchange future.
      * @param crd Coordinator flag.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change requests.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if client-only exchange is needed.
      */
     public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
         boolean crd,
-        Collection<DynamicCacheChangeRequest> reqs)
-        throws IgniteCheckedException {
-        assert !F.isEmpty(reqs) : fut;
-
-        for (DynamicCacheChangeRequest req : reqs) {
-            Integer cacheId = CU.cacheId(req.cacheName());
-
-            if (req.stop()) {
-                DynamicCacheDescriptor desc = registeredCaches.remove(cacheId);
-
-                assert desc != null : cacheId;
-            }
-            else if (req.start() && !req.clientStartOnly()) {
-                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
-                    req.startCacheConfiguration(),
-                    req.cacheType(),
-                    false,
-                    req.deploymentId(),
-                    req.schema());
-
-                DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
-
-                assert old == null : old;
-            }
-        }
+        ExchangeActions exchActions)
+        throws IgniteCheckedException
+    {
+        assert exchActions != null && !exchActions.empty() : exchActions;
 
-        boolean clientOnly = true;
+        updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
         forAllCaches(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@@ -367,87 +376,103 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         });
 
-        Set<Integer> stoppedCaches = null;
+        for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+            DynamicCacheDescriptor cacheDesc = action.descriptor();
 
-        for (DynamicCacheChangeRequest req : reqs) {
-            if (!(req.clientStartOnly() || req.close()))
-                clientOnly = false;
+            DynamicCacheChangeRequest req = action.request();
 
-            Integer cacheId = CU.cacheId(req.cacheName());
+            boolean startCache;
 
-            if (req.start()) {
-                cctx.cache().prepareCacheStart(req, fut.topologyVersion());
+            NearCacheConfiguration nearCfg = null;
 
-                if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
+            if (cctx.localNodeId().equals(req.initiatingNodeId())) {
+                startCache = true;
+
+                nearCfg = req.nearCacheConfiguration();
+            }
+            else {
+                startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
+                    CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
+            }
+
+            if (startCache) {
+                cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());
+
+                if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
                     if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
                         U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
                 }
+            }
 
-                if (!crd || !lateAffAssign) {
-                    GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            if (!crd || !lateAffAssign) {
+                GridCacheContext cacheCtx = cctx.cacheContext(cacheDesc.cacheId());
 
-                    if (cacheCtx != null && !cacheCtx.isLocal()) {
-                        boolean clientCacheStarted =
-                            req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
+                if (cacheCtx != null && !cacheCtx.isLocal()) {
+                    boolean clientCacheStarted =
+                        req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
 
-                        if (clientCacheStarted)
-                            initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
-                        else if (!req.clientStartOnly()) {
-                            assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                    if (clientCacheStarted)
+                        initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
+                    else if (!req.clientStartOnly()) {
+                        assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
-                            GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+                        GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
 
-                            assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
+                        assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
 
-                            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                                fut.discoveryEvent(), fut.discoCache());
+                        List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
+                            fut.discoveryEvent(), fut.discoCache());
 
-                            aff.initialize(fut.topologyVersion(), assignment);
-                        }
+                        aff.initialize(fut.topologyVersion(), assignment);
                     }
                 }
-                else
-                    initStartedCacheOnCoordinator(fut, cacheId);
             }
-            else if (req.stop() || req.close()) {
-                cctx.cache().blockGateway(req);
+            else
+                initStartedCacheOnCoordinator(fut, cacheDesc.cacheId());
+        }
 
-                if (crd) {
-                    boolean rmvCache = false;
+        for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) {
+            Integer cacheId = CU.cacheId(req.cacheName());
 
-                    if (req.close() && req.initiatingNodeId().equals(cctx.localNodeId())) {
-                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            cctx.cache().blockGateway(req);
 
-                        rmvCache = cacheCtx != null && !cacheCtx.affinityNode();
-                    }
-                    else if (req.stop())
-                        rmvCache = true;
+            if (crd) {
+                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                    if (rmvCache) {
-                        CacheHolder cache = caches.remove(cacheId);
+                // Client cache was stopped, need create 'client' CacheHolder.
+                if (cacheCtx != null && !cacheCtx.affinityNode()) {
+                    CacheHolder cache = caches.remove(cacheId);
 
-                        if (cache != null) {
-                            if (!req.stop()) {
-                                assert !cache.client();
+                    assert !cache.client() : cache;
 
-                                cache = CacheHolder2.create(cctx,
-                                    cctx.cache().cacheDescriptor(cacheId),
-                                    fut,
-                                    cache.affinity());
+                    cache = CacheHolder2.create(cctx,
+                        cctx.cache().cacheDescriptor(cacheId),
+                        fut,
+                        cache.affinity());
 
-                                caches.put(cacheId, cache);
-                            }
-                            else {
-                                if (stoppedCaches == null)
-                                    stoppedCaches = new HashSet<>();
+                    caches.put(cacheId, cache);
+                }
+            }
+        }
 
-                                stoppedCaches.add(cache.cacheId());
+        Set<Integer> stoppedCaches = null;
 
-                                cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class);
-                            }
-                        }
-                    }
-                }
+        for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
+            DynamicCacheDescriptor desc = action.descriptor();
+
+            cctx.cache().blockGateway(action.request());
+
+            if (crd && lateAffAssign && desc.cacheConfiguration().getCacheMode() != LOCAL) {
+                CacheHolder cache = caches.remove(desc.cacheId());
+
+                assert cache != null : action.request();
+
+                if (stoppedCaches == null)
+                    stoppedCaches = new HashSet<>();
+
+                stoppedCaches.add(cache.cacheId());
+
+                cctx.io().removeHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class);
             }
         }
 
@@ -479,7 +504,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
-        return clientOnly;
+        return exchActions.clientOnlyExchange();
     }
 
     /**
@@ -578,7 +603,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 assert affTopVer.topologyVersion() > 0 : affTopVer;
 
-                IgniteUuid deploymentId = registeredCaches.get(aff.cacheId()).deploymentId();
+                DynamicCacheDescriptor desc = registeredCaches.get(aff.cacheId());
+
+                assert desc != null : aff.cacheName();
+
+                IgniteUuid deploymentId = desc.deploymentId();
 
                 if (!deploymentId.equals(deploymentIds.get(aff.cacheId()))) {
                     aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
@@ -659,7 +688,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 });
             }
             else
-                initCachesAffinity(fut);
+                initAffinityNoLateAssignment(fut);
         }
     }
 
@@ -667,7 +696,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Future to add.
      */
     public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
-        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.key(), fut);
+        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut);
 
         assert old == null : "More than one thread is trying to fetch partition assignments [fut=" + fut +
             ", allFuts=" + pendingAssignmentFetchFuts + ']';
@@ -677,27 +706,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Future to remove.
      */
     public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
-        boolean rmv = pendingAssignmentFetchFuts.remove(fut.key(), fut);
+        boolean rmv = pendingAssignmentFetchFuts.remove(fut.id(), fut);
 
-        assert rmv : "Failed to remove assignment fetch future: " + fut.key();
+        assert rmv : "Failed to remove assignment fetch future: " + fut.id();
     }
 
     /**
-     * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processAffinityAssignmentResponse(Integer cacheId, UUID nodeId, GridDhtAffinityAssignmentResponse res) {
+    private void processAffinityAssignmentResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
 
-        for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) {
-            if (fut.key().get1().equals(cacheId)) {
-                fut.onResponse(nodeId, res);
+        GridDhtAssignmentFetchFuture fut = pendingAssignmentFetchFuts.get(res.futureId());
 
-                break;
-            }
-        }
+        if (fut != null)
+            fut.onResponse(nodeId, res);
     }
 
     /**
@@ -773,7 +798,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * Initialized affinity started on this exchange.
+     * Initialized affinity for cache received from node joining on this exchange.
      *
      * @param crd Coordinator flag.
      * @param fut Exchange future.
@@ -782,12 +807,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     public void initStartedCaches(boolean crd,
         final GridDhtPartitionsExchangeFuture fut,
-        @Nullable Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
-        if (descs != null) {
-            for (DynamicCacheDescriptor desc : descs) {
-                if (!registeredCaches.containsKey(desc.cacheId()))
-                    registeredCaches.put(desc.cacheId(), desc);
-            }
+        Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
+        for (DynamicCacheDescriptor desc : descs) {
+            if (!registeredCaches.containsKey(desc.cacheId()))
+                registeredCaches.put(desc.cacheId(), desc);
         }
 
         if (crd && lateAffAssign) {
@@ -808,28 +831,34 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(aff, fut, false);
+                        initAffinity(registeredCaches.get(aff.cacheId()), aff, fut, false);
                 }
             });
         }
     }
 
     /**
+     * @param desc Cache descriptor.
      * @param aff Affinity.
      * @param fut Exchange future.
      * @param fetch Force fetch flag.
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch)
+    private void initAffinity(DynamicCacheDescriptor desc,
+        GridAffinityAssignmentCache aff,
+        GridDhtPartitionsExchangeFuture fut,
+        boolean fetch)
         throws IgniteCheckedException {
-        if (!fetch && canCalculateAffinity(aff, fut)) {
+        assert desc != null;
+
+        if (!fetch && canCalculateAffinity(desc, aff, fut)) {
             List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
             aff.initialize(fut.topologyVersion(), assignment);
         }
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                aff.cacheName(),
+                desc,
                 fut.topologyVersion(),
                 fut.discoCache());
 
@@ -840,11 +869,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param desc Cache descriptor.
      * @param aff Affinity.
      * @param fut Exchange future.
      * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
      */
-    private boolean canCalculateAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut) {
+    private boolean canCalculateAffinity(DynamicCacheDescriptor desc,
+        GridAffinityAssignmentCache aff,
+        GridDhtPartitionsExchangeFuture fut) {
+        assert desc != null : aff.cacheName();
+
         // Do not request affinity from remote nodes if affinity function is not centralized.
         if (!aff.centralizedAffinityFunction())
             return true;
@@ -852,13 +886,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         // If local node did not initiate exchange or local node is the only cache node in grid.
         Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion());
 
-        DynamicCacheDescriptor cacheDesc = registeredCaches.get(aff.cacheId());
-
-        assert cacheDesc != null : aff.cacheName();
-
-        return fut.cacheStarted(aff.cacheId()) ||
+        return fut.cacheAddedOnExchange(aff.cacheId(), desc.receivedFrom()) ||
             !fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
-            cctx.localNodeId().equals(cacheDesc.receivedFrom()) ||
             (affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
     }
 
@@ -898,7 +927,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
         }
         else
-            initCachesAffinity(fut);
+            initAffinityNoLateAssignment(fut);
 
         synchronized (mux) {
             affCalcVer = fut.topologyVersion();
@@ -958,7 +987,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
             else {
                 GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                    cacheCtx.name(),
+                    cacheDesc,
                     topVer,
                     fut.discoCache());
 
@@ -971,7 +1000,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         for (int i = 0; i < fetchFuts.size(); i++) {
             GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
 
-            Integer cacheId = fetchFut.key().get1();
+            Integer cacheId = fetchFut.cacheId();
 
             fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
         }
@@ -1042,7 +1071,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             centralizedAff = true;
         }
         else {
-            initCachesAffinity(fut);
+            initAffinityNoLateAssignment(fut);
 
             centralizedAff = false;
         }
@@ -1060,14 +1089,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Exchange future.
      * @throws IgniteCheckedException If failed.
      */
-    private void initCachesAffinity(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+    private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
         assert !lateAffAssign;
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal())
                 continue;
 
-            initAffinity(cacheCtx.affinity().affinityCache(), fut, false);
+            initAffinity(registeredCaches.get(cacheCtx.cacheId()), cacheCtx.affinity().affinityCache(), fut, false);
         }
     }
 
@@ -1099,7 +1128,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     cctx.io().addHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class,
                         new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                             @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                                processAffinityAssignmentResponse(cacheId, nodeId, res);
+                                processAffinityAssignmentResponse(nodeId, res);
                             }
                         }
                     );
@@ -1125,7 +1154,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
 
                     GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                        aff.cacheName(),
+                        desc,
                         prev.topologyVersion(),
                         prev.discoCache());
 
@@ -1192,7 +1221,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(cacheId, nodeId, res);
+                        processAffinityAssignmentResponse(nodeId, res);
                     }
                 }
             );
@@ -1714,7 +1743,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             assert ccfg != null : cacheDesc;
             assert ccfg.getCacheMode() != LOCAL : ccfg.getName();
 
-            assert !cctx.discovery().cacheAffinityNodes(ccfg.getName(), fut.topologyVersion()).contains(cctx.localNode());
+            assert !cctx.discovery().cacheAffinityNodes(ccfg.getName(),
+                fut.topologyVersion()).contains(cctx.localNode()) : cacheDesc.cacheName();
 
             AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
new file mode 100644
index 0000000..a30331f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Discovery data sent from client reconnecting to cluster.
+ */
+public class CacheClientReconnectDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final Map<String, CacheInfo> clientCaches;
+
+    /**
+     * @param clientCaches Information about caches started on re-joining client node.
+     */
+    CacheClientReconnectDiscoveryData(Map<String, CacheInfo> clientCaches) {
+        this.clientCaches = clientCaches;
+    }
+
+    /**
+     * @return Information about caches started on re-joining client node.
+     */
+    Map<String, CacheInfo> clientCaches() {
+        return clientCaches;
+    }
+
+    /**
+     *
+     */
+    static class CacheInfo implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final CacheConfiguration ccfg;
+
+        /** */
+        private final CacheType cacheType;
+
+        /** */
+        private final IgniteUuid deploymentId;
+
+        /** */
+        private final boolean nearCache;
+
+        /** Flags added for future usage. */
+        private final byte flags;
+
+        /**
+         * @param ccfg Cache configuration.
+         * @param cacheType Cache type.
+         * @param deploymentId Cache deployment ID.
+         * @param nearCache Near cache flag.
+         * @param flags Flags (for future usage).
+         */
+        CacheInfo(CacheConfiguration ccfg,
+            CacheType cacheType,
+            IgniteUuid deploymentId,
+            boolean nearCache,
+            byte flags) {
+            assert ccfg != null;
+            assert cacheType != null;
+            assert deploymentId != null;
+
+            this.ccfg = ccfg;
+            this.cacheType = cacheType;
+            this.deploymentId = deploymentId;
+            this.nearCache = nearCache;
+            this.flags = flags;
+        }
+
+        /**
+         * @return Cache configuration.
+         */
+        CacheConfiguration config() {
+            return ccfg;
+        }
+
+        /**
+         * @return Cache type.
+         */
+        CacheType cacheType() {
+            return cacheType;
+        }
+
+        /**
+         * @return Cache deployment ID.
+         */
+        IgniteUuid deploymentId() {
+            return deploymentId;
+        }
+
+        /**
+         * @return Near cache flag.
+         */
+        boolean nearCache() {
+            return nearCache;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheInfo.class, this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheClientReconnectDiscoveryData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
new file mode 100644
index 0000000..4768a9a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Cache information sent in discovery data to joining node.
+ */
+public class CacheData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final CacheConfiguration cacheCfg;
+
+    /** */
+    private final Integer cacheId;
+
+    /** */
+    private final CacheType cacheType;
+
+    /** */
+    private final IgniteUuid deploymentId;
+
+    /** */
+    private final QuerySchema schema;
+
+    /** */
+    private final UUID rcvdFrom;
+
+    /** */
+    private final boolean staticCfg;
+
+    /** */
+    private final boolean template;
+
+    /** Flags added for future usage. */
+    private final byte flags;
+
+    /**
+     * @param cacheCfg Cache configuration.
+     * @param cacheId Cache ID.
+     * @param cacheType Cache ID.
+     * @param deploymentId Cache deployment ID.
+     * @param schema Query schema.
+     * @param rcvdFrom Node ID cache was started from.
+     * @param staticCfg {@code True} if cache was statically configured.
+     * @param template {@code True} if this is cache template.
+     * @param flags Flags (added for future usage).
+     */
+    CacheData(CacheConfiguration cacheCfg,
+        int cacheId,
+        CacheType cacheType,
+        IgniteUuid deploymentId,
+        QuerySchema schema,
+        UUID rcvdFrom,
+        boolean staticCfg,
+        boolean template,
+        byte flags) {
+        assert cacheCfg != null;
+        assert rcvdFrom != null : cacheCfg.getName();
+        assert deploymentId != null : cacheCfg.getName();
+        assert template || cacheId != 0 : cacheCfg.getName();
+
+        this.cacheCfg = cacheCfg;
+        this.cacheId = cacheId;
+        this.cacheType = cacheType;
+        this.deploymentId = deploymentId;
+        this.schema = schema;
+        this.rcvdFrom = rcvdFrom;
+        this.staticCfg = staticCfg;
+        this.template = template;
+        this.flags = flags;
+    }
+
+    /**
+     * @return Cache ID.
+     */
+    public Integer cacheId() {
+        return cacheId;
+    }
+
+    /**
+     * @return {@code True} if this is template configuration.
+     */
+    public boolean template() {
+        return template;
+    }
+
+    /**
+     * @return Cache type.
+     */
+    public CacheType cacheType() {
+        return cacheType;
+    }
+
+    /**
+     * @return Start ID.
+     */
+    public IgniteUuid deploymentId() {
+        return deploymentId;
+    }
+
+    /**
+     * @return {@code True} if statically configured.
+     */
+    public boolean staticallyConfigured() {
+        return staticCfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    public CacheConfiguration cacheConfiguration() {
+        return cacheCfg;
+    }
+
+    /**
+     * @return Schema.
+     */
+    public QuerySchema schema() {
+        return schema.copy();
+    }
+
+    /**
+     * @return ID of node provided cache configuration.
+     */
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
new file mode 100644
index 0000000..ea24140
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Information about configured caches sent from joining node.
+ */
+class CacheJoinNodeDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, CacheInfo> caches;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, CacheInfo> templates;
+
+    /** */
+    @GridToStringInclude
+    private final IgniteUuid cacheDeploymentId;
+
+    /** */
+    private final boolean startCaches;
+
+    /**
+     * @param cacheDeploymentId Deployment ID for started caches.
+     * @param caches Caches.
+     * @param templates Templates.
+     * @param startCaches {@code True} if required to start all caches on joining node.
+     */
+    CacheJoinNodeDiscoveryData(
+        IgniteUuid cacheDeploymentId,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates,
+        boolean startCaches) {
+        this.cacheDeploymentId = cacheDeploymentId;
+        this.caches = caches;
+        this.templates = templates;
+        this.startCaches = startCaches;
+    }
+
+    /**
+     * @return {@code True} if required to start all caches on joining node.
+     */
+    boolean startCaches() {
+        return startCaches;
+    }
+
+    /**
+     * @return Deployment ID assigned on joining node.
+     */
+    IgniteUuid cacheDeploymentId() {
+        return cacheDeploymentId;
+    }
+
+    /**
+     * @return Templates configured on joining node.
+     */
+    Map<String, CacheInfo> templates() {
+        return templates;
+    }
+
+    /**
+     * @return Caches configured on joining node.
+     */
+    Map<String, CacheInfo> caches() {
+        return caches;
+    }
+
+    /**
+     *
+     */
+    static class CacheInfo implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @GridToStringInclude
+        private final CacheConfiguration ccfg;
+
+        /** */
+        @GridToStringInclude
+        private final CacheType cacheType;
+
+        /** Flags added for future usage. */
+        private final byte flags;
+
+        /**
+         * @param ccfg Cache configuration.
+         * @param cacheType Cache type.
+         * @param flags Flags (for future usage).
+         */
+        CacheInfo(CacheConfiguration ccfg, CacheType cacheType, byte flags) {
+            this.ccfg = ccfg;
+            this.cacheType = cacheType;
+            this.flags = flags;
+        }
+
+        /**
+         * @return Cache configuration.
+         */
+        CacheConfiguration config() {
+            return ccfg;
+        }
+
+        /**
+         * @return Cache type.
+         */
+        CacheType cacheType() {
+            return cacheType;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheInfo.class, this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheJoinNodeDiscoveryData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
new file mode 100644
index 0000000..84a33dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Cache information sent in discovery data to joining node.
+ */
+class CacheNodeCommonDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, CacheData> caches;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, CacheData> templates;
+
+    /** */
+    private final Map<String, Map<UUID, Boolean>> clientNodesMap;
+
+    /**
+     * @param caches Started caches.
+     * @param templates Configured templates.
+     * @param clientNodesMap Information about cache client nodes.
+     */
+    CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
+        Map<String, CacheData> templates,
+        Map<String, Map<UUID, Boolean>> clientNodesMap) {
+        this.caches = caches;
+        this.templates = templates;
+        this.clientNodesMap = clientNodesMap;
+    }
+
+    /**
+     * @return Started caches.
+     */
+    Map<String, CacheData> caches() {
+        return caches;
+    }
+
+    /**
+     * @return Configured templates.
+     */
+    Map<String, CacheData> templates() {
+        return templates;
+    }
+
+    /**
+     * @return Information about cache client nodes.
+     */
+    Map<String, Map<UUID, Boolean>> clientNodesMap() {
+        return clientNodesMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheNodeCommonDiscoveryData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
new file mode 100644
index 0000000..28ec600
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
+
+/**
+ * Logic related to cache discovery date processing.
+ */
+class ClusterCachesInfo {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** Dynamic caches. */
+    private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+
+    /** Cache templates. */
+    private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
+    /** */
+    private CacheJoinNodeDiscoveryData joinDiscoData;
+
+    /** */
+    private CacheNodeCommonDiscoveryData gridData;
+
+    /** */
+    private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
+
+    /** */
+    private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
+
+    /**
+     * @param ctx Context.
+     */
+    ClusterCachesInfo(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+    }
+
+    /**
+     * @param joinDiscoData Information about configured caches and templates.
+     */
+    void onStart(CacheJoinNodeDiscoveryData joinDiscoData) {
+        this.joinDiscoData = joinDiscoData;
+
+        processJoiningNode(joinDiscoData, ctx.localNodeId());
+    }
+
+    /**
+     * @param checkConsistency {@code True} if need check cache configurations consistency.
+     * @throws IgniteCheckedException If failed.
+     */
+    void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
+        if (checkConsistency && joinDiscoData != null && gridData != null) {
+            for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) {
+                CacheConfiguration locCfg = locCacheInfo.config();
+
+                CacheData cacheData = gridData.caches().get(locCfg.getName());
+
+                if (cacheData != null)
+                    checkCache(locCfg, cacheData.cacheConfiguration(), cacheData.receivedFrom());
+            }
+        }
+
+        joinDiscoData = null;
+        gridData = null;
+    }
+    /**
+     * Checks that remote caches has configuration compatible with the local.
+     *
+     * @param locCfg Local configuration.
+     * @param rmtCfg Remote configuration.
+     * @param rmt Remote node.
+     * @throws IgniteCheckedException If check failed.
+     */
+    private void checkCache(CacheConfiguration<?, ?> locCfg, CacheConfiguration<?, ?> rmtCfg, UUID rmt)
+        throws IgniteCheckedException {
+        GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg);
+        GridCacheAttributes locAttr = new GridCacheAttributes(locCfg);
+
+        CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
+            locAttr.cacheMode(), rmtAttr.cacheMode(), true);
+
+        if (rmtAttr.cacheMode() != LOCAL) {
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor",
+                locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode",
+                "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
+                "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
+
+            ClusterNode rmtNode = ctx.discovery().node(rmt);
+
+            if (CU.affinityNode(ctx.discovery().localNode(), locCfg.getNodeFilter())
+                && rmtNode != null && CU.affinityNode(rmtNode, rmtCfg.getNodeFilter())) {
+                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
+                    locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
+            }
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity",
+                    locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper",
+                "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(),
+                rmtAttr.cacheAffinityMapperClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount",
+                "Affinity partitions count", locAttr.affinityPartitionsCount(),
+                rmtAttr.affinityPartitionsCount(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter",
+                locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy",
+                locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup",
+                "Transaction manager lookup", locAttr.transactionManagerLookupClassName(),
+                rmtAttr.transactionManagerLookupClassName(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout",
+                "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize",
+                "Preload batch size", locAttr.rebalanceBatchSize(), rmtAttr.rebalanceBatchSize(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode",
+                "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(),
+                true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize",
+                "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(),
+                false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled",
+                "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency",
+                "Write behind flush frequency", locAttr.writeBehindFlushFrequency(),
+                rmtAttr.writeBehindFlushFrequency(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize",
+                "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(),
+                false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount",
+                "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(),
+                rmtAttr.writeBehindFlushThreadCount(), false);
+
+            if (locAttr.cacheMode() == PARTITIONED) {
+                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy",
+                    "Near eviction policy", locAttr.nearEvictionPolicyClassName(),
+                    rmtAttr.nearEvictionPolicyClassName(), false);
+
+                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors",
+                    "Affinity include neighbors", locAttr.affinityIncludeNeighbors(),
+                    rmtAttr.affinityIncludeNeighbors(), true);
+
+                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups",
+                    "Affinity key backups", locAttr.affinityKeyBackups(),
+                    rmtAttr.affinityKeyBackups(), true);
+            }
+        }
+    }
+
+    /**
+     * @param batch Cache change request.
+     * @param topVer Topology version.
+     * @return {@code True} if minor topology version should be increased.
+     */
+    boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
+        ExchangeActions exchangeActions = new ExchangeActions();
+
+        boolean incMinorTopVer = false;
+
+        List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+
+        final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
+
+        for (DynamicCacheChangeRequest req : batch.requests()) {
+            if (req.template()) {
+                CacheConfiguration ccfg = req.startCacheConfiguration();
+
+                assert ccfg != null : req;
+
+                DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
+
+                if (desc == null) {
+                    DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
+                        ccfg,
+                        req.cacheType(),
+                        true,
+                        req.initiatingNodeId(),
+                        false,
+                        req.deploymentId(),
+                        req.schema());
+
+                    DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
+
+                    assert old == null;
+
+                    addedDescs.add(templateDesc);
+                }
+
+                ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+
+                continue;
+            }
+
+            DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName());
+
+            boolean needExchange = false;
+
+            AffinityTopologyVersion waitTopVer = null;
+
+            if (req.start()) {
+                if (desc == null) {
+                    if (req.clientStartOnly()) {
+                        ctx.cache().completeCacheStartFuture(req, new IgniteCheckedException("Failed to start " +
+                            "client cache (a cache with the given name is not started): " + req.cacheName()));
+                    }
+                    else {
+                        CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
+
+                        assert req.cacheType() != null : req;
+                        assert F.eq(ccfg.getName(), req.cacheName()) : req;
+
+                        DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
+                            ccfg,
+                            req.cacheType(),
+                            false,
+                            req.initiatingNodeId(),
+                            false,
+                            req.deploymentId(),
+                            req.schema());
+
+                        DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
+
+                        assert old == null;
+
+                        ctx.discovery().setCacheFilter(
+                            ccfg.getName(),
+                            ccfg.getNodeFilter(),
+                            ccfg.getNearConfiguration() != null,
+                            ccfg.getCacheMode());
+
+                        ctx.discovery().addClientNode(req.cacheName(),
+                            req.initiatingNodeId(),
+                            req.nearCacheConfiguration() != null);
+
+                        addedDescs.add(startDesc);
+
+                        exchangeActions.addCacheToStart(req, startDesc);
+
+                        needExchange = true;
+                    }
+                }
+                else {
+                    assert req.initiatingNodeId() != null : req;
+
+                    // Cache already exists, exchange is needed only if client cache should be created.
+                    ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
+
+                    boolean clientReq = node != null &&
+                        !ctx.discovery().cacheAffinityNode(node, req.cacheName());
+
+                    if (req.clientStartOnly()) {
+                        needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+                            req.initiatingNodeId(),
+                            req.nearCacheConfiguration() != null);
+                    }
+                    else {
+                        if (req.failIfExists()) {
+                            ctx.cache().completeCacheStartFuture(req,
+                                new CacheExistsException("Failed to start cache " +
+                                    "(a cache with the same name is already started): " + req.cacheName()));
+                        }
+                        else {
+                            needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+                                req.initiatingNodeId(),
+                                req.nearCacheConfiguration() != null);
+                        }
+                    }
+
+                    if (needExchange) {
+                        req.clientStartOnly(true);
+
+                        desc.clientCacheStartVersion(topVer.nextMinorVersion());
+
+                        exchangeActions.addClientCacheToStart(req, desc);
+                    }
+                }
+
+                if (!needExchange && desc != null) {
+                    if (desc.clientCacheStartVersion() != null)
+                        waitTopVer = desc.clientCacheStartVersion();
+                    else
+                        waitTopVer = desc.startTopologyVersion();
+                }
+            }
+            else if (req.globalStateChange())
+                exchangeActions.newClusterState(req.state());
+            else if (req.resetLostPartitions()) {
+                if (desc != null) {
+                    needExchange = true;
+
+                    exchangeActions.addCacheToResetLostPartitions(req, desc);
+                }
+            }
+            else if (req.stop()) {
+                assert req.stop() ^ req.close() : req;
+
+                if (desc != null) {
+                    DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
+
+                    assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
+
+                    ctx.discovery().removeCacheFilter(req.cacheName());
+
+                    needExchange = true;
+
+                    exchangeActions.addCacheToStop(req, desc);
+                }
+            }
+            else if (req.close()) {
+                if (desc != null) {
+                    needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+
+                    if (needExchange)
+                        exchangeActions.addCacheToClose(req, desc);
+                }
+            }
+            else
+                assert false : req;
+
+            if (!needExchange) {
+                if (req.initiatingNodeId().equals(ctx.localNodeId()))
+                    reqsToComplete.add(new T2<>(req, waitTopVer));
+            }
+            else
+                incMinorTopVer = true;
+        }
+
+        if (!F.isEmpty(addedDescs)) {
+            AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
+
+            for (DynamicCacheDescriptor desc : addedDescs) {
+                assert desc.template() || incMinorTopVer;
+
+                desc.startTopologyVersion(startTopVer);
+            }
+        }
+
+        if (!F.isEmpty(reqsToComplete)) {
+            ctx.closure().callLocalSafe(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t :reqsToComplete) {
+                        final DynamicCacheChangeRequest req = t.get1();
+                        AffinityTopologyVersion waitTopVer = t.get2();
+
+                        IgniteInternalFuture<?> fut = waitTopVer != null ?
+                            ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;
+
+                        if (fut == null || fut.isDone())
+                            ctx.cache().completeCacheStartFuture(req, null);
+                        else {
+                            fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    ctx.cache().completeCacheStartFuture(req, null);
+                                }
+                            });
+                        }
+                    }
+
+                    return null;
+                }
+            });
+        }
+
+        if (incMinorTopVer) {
+            assert !exchangeActions.empty() : exchangeActions;
+
+            batch.exchangeActions(exchangeActions);
+        }
+
+        return incMinorTopVer;
+    }
+
+    /**
+     * @param dataBag Discovery data bag.
+     */
+    void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        if (!ctx.isDaemon())
+            dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
+    }
+
+    /**
+     * @return Discovery date sent on local node join.
+     */
+    private Serializable joinDiscoveryData() {
+        if (cachesOnDisconnect != null) {
+            Map<String, CacheClientReconnectDiscoveryData.CacheInfo> cachesInfo = new HashMap<>();
+
+            for (IgniteInternalCache cache : ctx.cache().caches()) {
+                DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name());
+
+                assert desc != null : cache.name();
+
+                cachesInfo.put(cache.name(), new CacheClientReconnectDiscoveryData.CacheInfo(desc.cacheConfiguration(),
+                    desc.cacheType(),
+                    desc.deploymentId(),
+                    cache.context().isNear(),
+                    (byte)0));
+            }
+
+            return new CacheClientReconnectDiscoveryData(cachesInfo);
+        }
+        else {
+            assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active();
+
+            return joinDiscoData;
+        }
+    }
+
+    /**
+     * Called from exchange worker.
+     *
+     * @return Caches to be started when this node starts.
+     */
+    List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+        if (ctx.isDaemon())
+            return Collections.emptyList();
+
+        assert locJoinStartCaches != null;
+
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = this.locJoinStartCaches;
+
+        this.locJoinStartCaches = null;
+
+        return locJoinStartCaches;
+    }
+
+    /**
+     * @param joinedNodeId Joined node ID.
+     * @return New caches received from joined node.
+     */
+    List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
+        assert joinedNodeId != null;
+
+        List<DynamicCacheDescriptor> started = null;
+
+        if (!ctx.isDaemon()) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (desc.staticallyConfigured()) {
+                    assert desc.receivedFrom() != null : desc;
+
+                    if (joinedNodeId.equals(desc.receivedFrom())) {
+                        if (started == null)
+                            started = new ArrayList<>();
+
+                        started.add(desc);
+                    }
+                }
+            }
+        }
+
+        return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
+    }
+
+    /**
+     * Discovery event callback, executed from discovery thread.
+     *
+     * @param type Event type.
+     * @param node Event node.
+     * @param topVer Topology version.
+     */
+    void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+        if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.receivedFromStartVersion(topVer);
+            }
+
+            for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.receivedFromStartVersion(topVer);
+            }
+
+            if (node.id().equals(ctx.discovery().localNode().id())) {
+                if (gridData == null) { // First node starts.
+                    assert joinDiscoData != null || !ctx.state().active();
+
+                    initStartCachesForLocalJoin(true);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param dataBag Discovery data bag.
+     */
+    void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.isDaemon())
+            return;
+
+        if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
+            dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData());
+    }
+
+    /**
+     * @return Information about started caches.
+     */
+    private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+        Map<String, CacheData> caches = new HashMap<>();
+
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+                desc.cacheId(),
+                desc.cacheType(),
+                desc.deploymentId(),
+                desc.schema(),
+                desc.receivedFrom(),
+                desc.staticallyConfigured(),
+                false,
+                (byte)0);
+
+            caches.put(desc.cacheName(), cacheData);
+        }
+
+        Map<String, CacheData> templates = new HashMap<>();
+
+        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+            CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+                0,
+                desc.cacheType(),
+                desc.deploymentId(),
+                desc.schema(),
+                desc.receivedFrom(),
+                desc.staticallyConfigured(),
+                true,
+                (byte)0);
+
+            templates.put(desc.cacheName(), cacheData);
+        }
+
+        return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
+    }
+
+    /**
+     * @param data Discovery data.
+     */
+    void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        if (ctx.isDaemon() || data.commonData() == null)
+            return;
+
+        assert joinDiscoData != null || disconnectedState() || !ctx.state().active();
+        assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
+
+        CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
+
+        for (CacheData cacheData : cachesData.templates().values()) {
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                ctx,
+                cacheData.cacheConfiguration(),
+                cacheData.cacheType(),
+                true,
+                cacheData.receivedFrom(),
+                cacheData.staticallyConfigured(),
+                cacheData.deploymentId(),
+                cacheData.schema());
+
+            registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
+        }
+
+        for (CacheData cacheData : cachesData.caches().values()) {
+            CacheConfiguration<?, ?> cfg = cacheData.cacheConfiguration();
+
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                ctx,
+                cacheData.cacheConfiguration(),
+                cacheData.cacheType(),
+                false,
+                cacheData.receivedFrom(),
+                cacheData.staticallyConfigured(),
+                cacheData.deploymentId(),
+                cacheData.schema());
+
+            desc.receivedOnDiscovery(true);
+
+            registeredCaches.put(cacheData.cacheConfiguration().getName(), desc);
+
+            ctx.discovery().setCacheFilter(
+                cfg.getName(),
+                cfg.getNodeFilter(),
+                cfg.getNearConfiguration() != null,
+                cfg.getCacheMode());
+        }
+
+        if (!F.isEmpty(cachesData.clientNodesMap())) {
+            for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) {
+                String cacheName = entry.getKey();
+
+                for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+                    ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+            }
+        }
+
+        gridData = cachesData;
+
+        if (!disconnectedState())
+            initStartCachesForLocalJoin(false);
+        else
+            locJoinStartCaches = Collections.emptyList();
+    }
+
+    /**
+     * @param firstNode {@code True} if first node in cluster starts.
+     */
+    private void initStartCachesForLocalJoin(boolean firstNode) {
+        assert locJoinStartCaches == null;
+
+        locJoinStartCaches = new ArrayList<>();
+
+        if (joinDiscoData != null) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
+                    continue;
+
+                CacheConfiguration<?, ?> cfg = desc.cacheConfiguration();
+
+                CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
+
+                NearCacheConfiguration nearCfg = null;
+
+                if (locCfg != null) {
+                    nearCfg = locCfg.config().getNearConfiguration();
+
+                    DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
+                            locCfg.config(),
+                            desc.cacheType(),
+                            desc.template(),
+                            desc.receivedFrom(),
+                            desc.staticallyConfigured(),
+                            desc.deploymentId(),
+                            desc.schema());
+
+                    desc0.startTopologyVersion(desc.startTopologyVersion());
+                    desc0.receivedFromStartVersion(desc.receivedFromStartVersion());
+                    desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
+
+                    desc = desc0;
+                }
+
+                if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
+                    locJoinStartCaches.add(new T2<>(desc, nearCfg));
+            }
+        }
+    }
+
+    /**
+     * @param data Joining node data.
+     */
+    void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+        if (data.hasJoiningNodeData()) {
+            Serializable joiningNodeData = data.joiningNodeData();
+
+            if (joiningNodeData instanceof CacheClientReconnectDiscoveryData) {
+                if (disconnectedState()) {
+                    if (clientReconnectReqs == null)
+                        clientReconnectReqs = new LinkedHashMap<>();
+
+                    clientReconnectReqs.put(data.joiningNodeId(), (CacheClientReconnectDiscoveryData)joiningNodeData);
+                }
+                else
+                    processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
+            }
+            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
+        }
+    }
+
+    /**
+     * @param clientData Discovery data.
+     * @param clientNodeId Client node ID.
+     */
+    private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) {
+        for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) {
+            String cacheName = cacheInfo.config().getName();
+
+            if (surviveReconnect(cacheName))
+                ctx.discovery().addClientNode(cacheName, clientNodeId, false);
+            else {
+                DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
+
+                if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId()))
+                    ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache());
+            }
+        }
+    }
+
+    /**
+     * @param joinData Joined node discovery data.
+     * @param nodeId Joined node ID.
+     */
+    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
+        for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
+            CacheConfiguration<?, ?> cfg = cacheInfo.config();
+
+            if (!registeredTemplates.containsKey(cfg.getName())) {
+                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+                    cfg,
+                    cacheInfo.cacheType(),
+                    true,
+                    nodeId,
+                    true,
+                    joinData.cacheDeploymentId(),
+                    new QuerySchema(cfg.getQueryEntities()));
+
+                DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
+
+                assert old == null : old;
+            }
+        }
+
+        for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
+            CacheConfiguration<?, ?> cfg = cacheInfo.config();
+
+            if (!registeredCaches.containsKey(cfg.getName())) {
+                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+                    cfg,
+                    cacheInfo.cacheType(),
+                    false,
+                    nodeId,
+                    true,
+                    joinData.cacheDeploymentId(),
+                    new QuerySchema(cfg.getQueryEntities()));
+
+                DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
+
+                assert old == null : old;
+
+                ctx.discovery().setCacheFilter(
+                    cfg.getName(),
+                    cfg.getNodeFilter(),
+                    cfg.getNearConfiguration() != null,
+                    cfg.getCacheMode());
+            }
+
+            ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
+        }
+
+        if (joinData.startCaches()) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                ctx.discovery().addClientNode(desc.cacheName(),
+                    nodeId,
+                    desc.cacheConfiguration().getNearConfiguration() != null);
+            }
+        }
+    }
+
+    /**
+     * @return Registered caches.
+     */
+    ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches() {
+        return registeredCaches;
+    }
+
+    /**
+     * @return Registered cache templates.
+     */
+    ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates() {
+        return registeredTemplates;
+    }
+
+    /**
+     *
+     */
+    void onDisconnect() {
+        cachesOnDisconnect = new HashMap<>(registeredCaches);
+
+        registeredCaches.clear();
+        registeredTemplates.clear();
+
+        clientReconnectReqs = null;
+    }
+
+    /**
+     * @return Stopped caches names.
+     */
+    Set<String> onReconnected() {
+        assert disconnectedState();
+
+        Set<String> stoppedCaches = new HashSet<>();
+
+        for(Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.entrySet()) {
+            DynamicCacheDescriptor desc = e.getValue();
+
+            String cacheName = e.getKey();
+
+            boolean stopped;
+
+            if (!surviveReconnect(cacheName)) {
+                DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName);
+
+                stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+            }
+            else
+                stopped = false;
+
+            if (stopped)
+                stoppedCaches.add(cacheName);
+        }
+
+        if (clientReconnectReqs != null) {
+            for (Map.Entry<UUID, CacheClientReconnectDiscoveryData> e : clientReconnectReqs.entrySet())
+                processClientReconnectData(e.getValue(), e.getKey());
+
+            clientReconnectReqs = null;
+        }
+
+        cachesOnDisconnect = null;
+
+        return stoppedCaches;
+    }
+
+    /**
+     * @return {@code True} if client node is currently in disconnected state.
+     */
+    private boolean disconnectedState() {
+        return cachesOnDisconnect != null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return {@code True} if cache with given name if system cache which should always survive client node disconnect.
+     */
+    private boolean surviveReconnect(String cacheName) {
+        return CU.isUtilityCache(cacheName) || CU.isAtomicsCache(cacheName);
+    }
+
+    /**
+     *
+     */
+    void clearCaches() {
+        registeredCaches.clear();
+    }
+}


Mime
View raw message