ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [9/9] ignite git commit: ignite-5075
Date Fri, 28 Apr 2017 12:40:18 GMT
ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c66d0e0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c66d0e0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c66d0e0

Branch: refs/heads/ignite-5075-cacheStart
Commit: 0c66d0e01b81f9a33efe067c31712725f8dee498
Parents: ea022d0
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Apr 28 11:12:33 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Apr 28 15:31:17 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  11 +-
 .../affinity/AffinityTopologyVersion.java       |   1 -
 .../cache/CacheAffinitySharedManager.java       |  80 ++++--
 .../internal/processors/cache/CacheData.java    |   8 +-
 .../processors/cache/ClusterCachesInfo.java     | 164 ++++++++----
 .../cache/DynamicCacheChangeBatch.java          |  10 +-
 .../cache/DynamicCacheChangeRequest.java        |  11 +-
 .../cache/DynamicCacheDescriptor.java           |  13 +
 .../processors/cache/ExchangeActions.java       |  82 +++++-
 .../processors/cache/GridCacheContext.java      |   3 +
 .../processors/cache/GridCacheProcessor.java    | 256 ++++---------------
 .../ignite/spi/discovery/DiscoveryDataBag.java  |   4 +
 .../processors/cache/IgniteCacheStartTest.java  | 176 +++++++++++++
 .../cache/IgniteDynamicCacheStartSelfTest.java  |   2 +-
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 15 files changed, 524 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index e5f2278..24c7283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -315,8 +315,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         boolean nearEnabled,
         CacheMode cacheMode
     ) {
-        if (!registeredCaches.containsKey(cacheName))
+        if (!registeredCaches.containsKey(cacheName)) {
+            if (cacheMode == CacheMode.REPLICATED)
+                nearEnabled = false;
+            
             registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode));
+        }
     }
 
     /**
@@ -2737,7 +2741,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @return {@code True} if this node is a data node for given cache.
          */
         public boolean dataNode(ClusterNode node) {
-            return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
+            return CU.affinityNode(node, cacheFilter);
         }
 
         /**
@@ -2753,9 +2757,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @return {@code True} if near cache is present on the given nodes.
          */
         public boolean nearNode(ClusterNode node) {
-            if (node.isDaemon())
-                return false;
-
             if (CU.affinityNode(node, cacheFilter))
                 return nearEnabled;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 0a6d965..8669530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.nio.ByteBuffer;
-
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index e599231..6b99e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -318,21 +319,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
     }
 
-    /**
-     * Called on exchange initiated for cache start/stop request.
-     *
-     * @param fut Exchange future.
-     * @param crd Coordinator flag.
-     * @param exchActions Cache change requests.
-     * @throws IgniteCheckedException If failed.
-     * @return {@code True} if client-only exchange is needed.
-     */
-    public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
-        boolean crd,
-        ExchangeActions exchActions)
-        throws IgniteCheckedException {
-        assert exchActions != null && !exchActions.empty() : fut;
-
+    private void updateCachesInfo(ExchangeActions exchActions) {
         for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
             Integer cacheId = CU.cacheId(req.cacheName());
 
@@ -341,7 +328,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             assert desc != null : cacheId;
         }
 
-        for (DynamicCacheChangeRequest req : exchActions.startRequests()) {
+        for (ExchangeActions.ActionData action : exchActions.newCachesStartRequests()) {
+            DynamicCacheChangeRequest req = action.request();
+
             Integer cacheId = CU.cacheId(req.cacheName());
 
             DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
@@ -355,6 +344,25 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             assert old == null : old;
         }
+    }
+
+    /**
+     * Called on exchange initiated for cache start/stop request.
+     *
+     * @param fut Exchange future.
+     * @param crd Coordinator flag.
+     * @param exchActions Cache change requests.
+     * @throws IgniteCheckedException If failed.
+     * @return {@code True} if client-only exchange is needed.
+     */
+    public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
+        boolean crd,
+        ExchangeActions exchActions)
+        throws IgniteCheckedException
+    {
+        assert exchActions != null && !exchActions.empty() : exchActions;
+
+        updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
         forAllCaches(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@@ -366,10 +374,27 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         });
 
-        for (DynamicCacheChangeRequest req : exchActions.startRequests()) {
+        for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+            DynamicCacheChangeRequest req = action.request();
+
             Integer cacheId = CU.cacheId(req.cacheName());
 
-            cctx.cache().prepareCacheStart(req, fut.topologyVersion());
+            boolean startCache;
+
+            NearCacheConfiguration nearCfg = null;
+
+            if (cctx.localNodeId().equals(req.initiatingNodeId())) {
+                startCache = true;
+
+                nearCfg = req.nearCacheConfiguration();
+            }
+            else {
+                startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
+                    CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
+            }
+
+            if (startCache)
+                cctx.cache().prepareCacheStart(req, nearCfg, action.descriptor(), fut.topologyVersion());
 
             if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
                 if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
@@ -411,18 +436,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (crd) {
                 GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                assert cacheCtx != null : req;
+                // Client cache was stopped, need create 'client' CacheHolder.
+                if (cacheCtx != null && !cacheCtx.affinityNode()) {
+                    CacheHolder cache = caches.remove(cacheId);
 
-                CacheHolder cache = caches.remove(cacheId);
+                    assert !cache.client() : cache;
 
-                assert !cache.client();
+                    cache = CacheHolder2.create(cctx,
+                        cctx.cache().cacheDescriptor(cacheId),
+                        fut,
+                        cache.affinity());
 
-                cache = CacheHolder2.create(cctx,
-                    cctx.cache().cacheDescriptor(cacheId),
-                    fut,
-                    cache.affinity());
-
-                caches.put(cacheId, cache);
+                    caches.put(cacheId, cache);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 39c6e90..b38e03f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -66,10 +66,10 @@ public class CacheData implements Serializable {
         boolean staticCfg,
         boolean template) {
         assert cacheCfg != null;
-        assert rcvdFrom != null;
-        assert startTopVer != null;
-        assert deploymentId != null;
-        assert template || cacheId != 0;
+        assert rcvdFrom != null : cacheCfg.getName();
+        assert startTopVer != null : cacheCfg.getName();
+        assert deploymentId != null : cacheCfg.getName();
+        assert template || cacheId != 0 : cacheCfg.getName();
 
         this.cacheCfg = cacheCfg;
         this.cacheId = cacheId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index cd2cd77..6cc09a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -46,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
 
 /**
  *
@@ -61,13 +64,19 @@ class ClusterCachesInfo {
     private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
 
     /** */
+    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
+    /** */
     private CacheJoinNodeDiscoveryData joinDiscoData;
 
     /** */
     private CacheNodeCommonDiscoveryData gridData;
 
     /** */
-    private List<DynamicCacheDescriptor> locJoinStartCaches;
+    private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
+
+    /** */
+    private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>();
 
     /**
      * @param ctx Context.
@@ -81,7 +90,7 @@ class ClusterCachesInfo {
     }
 
     void onKernalStart() throws IgniteCheckedException {
-
+        // TODO: validate cache configurations.
     }
 
     /**
@@ -104,7 +113,7 @@ class ClusterCachesInfo {
 
                 assert ccfg != null : req;
 
-                DynamicCacheDescriptor desc = registeredTemplates().get(req.cacheName());
+                DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
 
                 if (desc == null) {
                     DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
@@ -204,6 +213,8 @@ class ClusterCachesInfo {
                     }
 
                     if (needExchange) {
+                        req.clientStartOnly(true);
+
                         desc.clientCacheStartVersion(topVer.nextMinorVersion());
 
                         exchangeActions.addClientCacheToStart(req, desc);
@@ -222,39 +233,37 @@ class ClusterCachesInfo {
             else if (req.globalStateChange())
                 needExchange = true;
             else if (req.resetLostPartitions()) {
-                needExchange = desc != null;
+                if (desc != null) {
+                    needExchange = true;
 
-                if (needExchange)
                     exchangeActions.addCacheToResetLostPartitions(req, desc);
+                }
             }
-            else {
+            else if (req.stop()) {
                 assert req.stop() ^ req.close() : req;
 
                 if (desc != null) {
-                    if (req.stop()) {
-                        DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
-
-                        assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
+                    DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
 
-                        ctx.discovery().removeCacheFilter(req.cacheName());
-
-                        needExchange = true;
+                    assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
 
-                        exchangeActions.addCacheToStop(req, desc);
-                    }
-                    else {
-                        assert req.close() : req;
+                    ctx.discovery().removeCacheFilter(req.cacheName());
 
-                        needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+                    needExchange = true;
 
-                        if (needExchange) {
-                            exchangeActions.addCacheToStop(req, desc);
+                    exchangeActions.addCacheToStop(req, desc);
+                }
+            }
+            else if (req.close()) {
+                if (desc != null) {
+                    needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
 
-                            exchangeActions.addCacheToClose(req, desc);
-                        }
-                    }
+                    if (needExchange)
+                        exchangeActions.addCacheToClose(req, desc);
                 }
             }
+            else
+                assert false : req;
 
             if (!needExchange) {
                 if (req.initiatingNodeId().equals(ctx.localNodeId()))
@@ -267,8 +276,11 @@ class ClusterCachesInfo {
         if (!F.isEmpty(addedDescs)) {
             AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
 
-            for (DynamicCacheDescriptor desc : addedDescs)
+            for (DynamicCacheDescriptor desc : addedDescs) {
+                assert desc.template() || incMinorTopVer;
+
                 desc.startTopologyVersion(startTopVer);
+            }
         }
 
         if (!F.isEmpty(reqsToComplete)) {
@@ -306,7 +318,15 @@ class ClusterCachesInfo {
         return incMinorTopVer;
     }
 
-    Serializable joinDiscoveryData() {
+    void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        if (!ctx.isDaemon())
+        dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
+    }
+
+    /**
+     * @return Discovery date sent on local node join.
+     */
+    private Serializable joinDiscoveryData() {
         if (cachesOnDisconnect != null) {
             Map<String, CacheClientReconnectDiscoveryData.CacheInfo> cachesInfo = new HashMap<>();
 
@@ -336,16 +356,23 @@ class ClusterCachesInfo {
      *
      * @return Caches to be started when this node starts.
      */
-    List<DynamicCacheDescriptor> cachesToStartOnLocalJoin() {
+    List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+        if (ctx.isDaemon())
+            return Collections.emptyList();
+
         assert locJoinStartCaches != null;
 
-        List<DynamicCacheDescriptor> locJoinStartCaches = this.locJoinStartCaches;
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = this.locJoinStartCaches;
 
         this.locJoinStartCaches = null;
 
         return locJoinStartCaches;
     }
 
+    /**
+     * @param joinedNodeId Joined node ID.
+     * @return New caches received from joined node.
+     */
     List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
         assert joinedNodeId != null;
 
@@ -369,7 +396,7 @@ class ClusterCachesInfo {
             }
         }
 
-        return started;
+        return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
     }
 
     /**
@@ -380,16 +407,16 @@ class ClusterCachesInfo {
      * @param topVer Topology version.
      */
     void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
-        if (type == EVT_NODE_JOINED) {
+        if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
             if (node.id().equals(ctx.discovery().localNode().id())) {
                 if (gridData == null) { // First node starts.
                     assert registeredCaches.isEmpty();
                     assert registeredTemplates.isEmpty();
                     assert joinDiscoData != null;
-
-                    processJoiningNode(joinDiscoData, node.id());
                 }
 
+                processJoiningNode(joinDiscoData, node.id());
+
                 assert locJoinStartCaches == null;
 
                 locJoinStartCaches = new ArrayList<>();
@@ -397,14 +424,24 @@ class ClusterCachesInfo {
                 for (DynamicCacheDescriptor desc : registeredCaches.values()) {
                     CacheConfiguration cfg = desc.cacheConfiguration();
 
-                    boolean locCfg = joinDiscoData.caches().containsKey(cfg.getName());
+                    CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
+
+                    boolean affNode = CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter());
 
-                    if (locCfg || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
-                        locJoinStartCaches.add(desc);
+                    NearCacheConfiguration nearCfg = (!affNode && locCfg != null) ? locCfg.config().getNearConfiguration() : null;
+
+                    if (locCfg != null || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
+                        locJoinStartCaches.add(new T2<>(desc, nearCfg));
                 }
 
                 joinDiscoData = null;
             }
+            else {
+                CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
+
+                if (discoData != null)
+                    processJoiningNode(discoData, node.id());
+            }
 
             initStartVersionOnJoin(registeredCaches.values(), node, topVer);
 
@@ -412,6 +449,11 @@ class ClusterCachesInfo {
         }
     }
 
+    /**
+     * @param descs Cache descriptors.
+     * @param joinedNode Joined node.
+     * @param topVer Current topology version.
+     */
     private void initStartVersionOnJoin(Collection<DynamicCacheDescriptor> descs,
         ClusterNode joinedNode,
         AffinityTopologyVersion topVer) {
@@ -421,7 +463,18 @@ class ClusterCachesInfo {
         }
     }
 
-    CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+    void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.isDaemon())
+            return;
+
+        if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
+            dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData());
+    }
+
+    /**
+     * @return Information about started caches.
+     */
+    private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
         Map<String, CacheData> caches = new HashMap<>();
 
         for (DynamicCacheDescriptor desc : registeredCaches.values()) {
@@ -435,7 +488,7 @@ class ClusterCachesInfo {
                 desc.staticallyConfigured(),
                 false);
 
-            caches.put(desc.cacheConfiguration().getName(), cacheData);
+            caches.put(desc.cacheName(), cacheData);
         }
 
         Map<String, CacheData> templates = new HashMap<>();
@@ -451,13 +504,19 @@ class ClusterCachesInfo {
                 desc.staticallyConfigured(),
                 true);
 
-            templates.put(desc.cacheConfiguration().getName(), cacheData);
+            templates.put(desc.cacheName(), cacheData);
         }
 
         return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
     }
 
+    /**
+     * @param data Discovery data.
+     */
     void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        if (ctx.isDaemon() || data.commonData() == null)
+            return;
+
         assert joinDiscoData != null;
         assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
 
@@ -525,12 +584,17 @@ class ClusterCachesInfo {
 
             if (joiningNodeData instanceof CacheClientReconnectDiscoveryData)
                 processClientReconnectData((CacheClientReconnectDiscoveryData)joiningNodeData, data.joiningNodeId());
-            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
-                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
+            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
+                CacheJoinNodeDiscoveryData old =
+                    joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData);
+
+                assert old == null : old;
+            }
         }
     }
 
     /**
+     * @param clientData Discovery data.
      * @param clientNodeId Client node ID.
      */
     private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) {
@@ -594,23 +658,27 @@ class ClusterCachesInfo {
                     cfg.getCacheMode());
             }
 
-            ctx.discovery().addClientNode(cfg.getName(),
-                nodeId,
-                cfg.getNearConfiguration() != null);
+            ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
         }
     }
 
+    /**
+     * @return Registered caches.
+     */
     ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches() {
         return registeredCaches;
     }
 
+    /**
+     * @return Registered cache templates.
+     */
     ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates() {
         return registeredTemplates;
     }
 
-    /** */
-    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
-
+    /**
+     *
+     */
     void onDisconnect() {
         cachesOnDisconnect = new HashMap<>(registeredCaches);
 
@@ -618,6 +686,9 @@ class ClusterCachesInfo {
         registeredTemplates.clear();
     }
 
+    /**
+     * @return Stopped caches names.
+     */
     Set<String> onReconnected() {
         assert cachesOnDisconnect != null;
 
@@ -651,6 +722,9 @@ class ClusterCachesInfo {
         return CU.isUtilityCache(cacheName) || CU.isAtomicsCache(cacheName);
     }
 
+    /**
+     *
+     */
     void clearCaches() {
         registeredCaches.clear();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 56639b7..e27d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -39,7 +39,7 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     @GridToStringInclude
     private Collection<DynamicCacheChangeRequest> reqs;
 
-    /** */
+    /** Cache updates to be executed on exchange. */
     private transient ExchangeActions exchangeActions;
 
     /**
@@ -77,13 +77,19 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
      * @return {@code True} if request should trigger partition exchange.
      */
     public boolean exchangeNeeded() {
-        return exchangeActions != null && !exchangeActions.empty();
+        return exchangeActions != null;
     }
 
+    /**
+     * @return Cache updates to be executed on exchange.
+     */
     ExchangeActions exchangeActions() {
         return exchangeActions;
     }
 
+    /**
+     * @param exchangeActions Cache updates to be executed on exchange.
+     */
     void exchangeActions(ExchangeActions exchangeActions) {
         assert !exchangeActions.empty() : exchangeActions;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index ee316ab..e4c95a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -106,6 +105,11 @@ public class DynamicCacheChangeRequest implements Serializable {
         this.initiatingNodeId = initiatingNodeId;
     }
 
+    /**
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @return Request to reset lost partitions.
+     */
     static DynamicCacheChangeRequest resetLostPartitions(GridKernalContext ctx, String cacheName) {
         DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
 
@@ -114,6 +118,11 @@ public class DynamicCacheChangeRequest implements Serializable {
         return req;
     }
 
+    /**
+     * @param ctx Context.
+     * @param cfg0 Template configuration.
+     * @return Request to add template.
+     */
     static DynamicCacheChangeRequest addTemplateRequest(GridKernalContext ctx, CacheConfiguration<?, ?> cfg0) {
         CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(cfg0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 536da79..bae711a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -123,6 +124,9 @@ public class DynamicCacheDescriptor {
 
         cacheId = CU.cacheId(cacheCfg.getName());
 
+        if (cacheCfg.getCacheMode() == CacheMode.REPLICATED)
+            cacheCfg.setNearConfiguration(null);
+
         synchronized (schemaMux) {
             this.schema = schema.copy();
         }
@@ -206,6 +210,15 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        assert cacheCfg != null : this;
+
+        return cacheCfg.getName();
+    }
+
+    /**
      * @return Cache configuration.
      */
     public CacheConfiguration cacheConfiguration() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 3d14f23..6de02b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.F;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -56,6 +58,10 @@ public class ExchangeActions {
             F.isEmpty(cachesToResetLostParts);
     }
 
+    /**
+     * @param nodeId Local node ID.
+     * @return Close cache requests.
+     */
     public List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) {
         List<DynamicCacheChangeRequest> res = null;
 
@@ -73,19 +79,35 @@ public class ExchangeActions {
         return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
     }
 
-    public List<DynamicCacheChangeRequest> startRequests() {
-        List<DynamicCacheChangeRequest> res = null;
+    /**
+     * @return Start cache requests.
+     */
+    Collection<ActionData> newAndClientCachesStartRequests() {
+        if (cachesToStart != null || clientCachesToStart != null) {
+            List<ActionData> res = new ArrayList<>();
 
-        if (cachesToStart != null) {
-            res = new ArrayList<>(cachesToStart.size());
+            if (cachesToStart != null)
+                res.addAll(cachesToStart.values());
 
-            for (ActionData req : cachesToStart.values())
-                res.add(req.req);
+            if (clientCachesToStart != null)
+                res.addAll(clientCachesToStart.values());
+
+            return res;
         }
 
-        return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+        return Collections.emptyList();
+    }
+
+    /**
+     * @return Start cache requests.
+     */
+    Collection<ActionData> newCachesStartRequests() {
+        return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
     }
 
+    /**
+     * @return Stop cache requests.
+     */
     public List<DynamicCacheChangeRequest> stopRequests() {
         List<DynamicCacheChangeRequest> res = null;
 
@@ -99,11 +121,14 @@ public class ExchangeActions {
         return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
     }
 
+    /**
+     * @param ctx Context.
+     */
     public void completeRequestFutures(GridCacheSharedContext ctx) {
         completeRequestFutures(cachesToStart, ctx);
-        completeRequestFutures(clientCachesToStart, ctx);
         completeRequestFutures(cachesToStop, ctx);
         completeRequestFutures(cachesToClose, ctx);
+        completeRequestFutures(clientCachesToStart, ctx);
         completeRequestFutures(cachesToResetLostParts, ctx);
     }
 
@@ -114,10 +139,16 @@ public class ExchangeActions {
         }
     }
 
+    /**
+     * @return {@code True} if have cache stop requests.
+     */
     public boolean hasStop() {
         return !F.isEmpty(cachesToStop);
     }
 
+    /**
+     * @return
+     */
     public Set<String> cachesToResetLostPartitions() {
         Set<String> caches = null;
         
@@ -149,6 +180,10 @@ public class ExchangeActions {
         return false;
     }
 
+    /**
+     * @param nodeId Local node ID.
+     * @return {@code True} if client cache was started.
+     */
     public boolean clientCacheStarted(UUID nodeId) {
         if (clientCachesToStart != null) {
             for (ActionData cache : clientCachesToStart.values()) {
@@ -160,7 +195,10 @@ public class ExchangeActions {
         return false;
     }
 
-    public ClusterState newClusterState() {
+    /**
+     * @return New cluster state if state change was requested.
+     */
+    @Nullable public ClusterState newClusterState() {
         return newState;
     }
 
@@ -179,25 +217,38 @@ public class ExchangeActions {
     }
 
     void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.start() : req;
+
         cachesToStart = add(cachesToStart, req, desc);
     }
 
     void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.start() : req;
+
         clientCachesToStart = add(clientCachesToStart, req, desc);
     }
 
     void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.stop() : req;
+
         cachesToStop = add(cachesToStop, req, desc);
     }
 
     void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.close() : req;
+
         cachesToClose = add(cachesToClose, req, desc);
     }
 
     void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.resetLostPartitions() : req;
+
         cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
     }
 
+    /**
+     * @return {@code True} if there are no cache change actions.
+     */
     public boolean empty() {
         return F.isEmpty(cachesToStart) &&
             F.isEmpty(clientCachesToStart) &&
@@ -216,9 +267,20 @@ public class ExchangeActions {
         /** */
         private DynamicCacheDescriptor desc;
 
-        public ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+            assert req != null;
+            assert desc != null;
+
             this.req = req;
             this.desc = desc;
         }
+
+        public DynamicCacheChangeRequest request() {
+            return req;
+        }
+
+        public DynamicCacheDescriptor descriptor() {
+            return desc;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1f6391b..72adeaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -461,6 +461,9 @@ public class GridCacheContext<K, V> implements Externalizable {
         this.startTopVer = startTopVer;
     }
 
+    /**
+     * @param cacheStartTopVer Global cache start topology version.
+     */
     public void cacheStartTopologyVersion(AffinityTopologyVersion cacheStartTopVer) {
         this.cacheStartTopVer = cacheStartTopVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d486d3a..0951676 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -117,6 +117,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -657,6 +658,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
         Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) throws IgniteCheckedException {
         CU.validateCacheName(cfg.getName());
+
         cloneCheckSerializable(cfg);
 
         CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
@@ -674,7 +676,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             CacheType cacheType;
 
-        if (CU.isUtilityCache(cfg.getName()))
+            if (CU.isUtilityCache(cfg.getName()))
                 cacheType = CacheType.UTILITY;
             else if (internalCaches.contains(cfg.getName()))
                 cacheType = CacheType.INTERNAL;
@@ -748,96 +750,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param cfg Cache configuration.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
-//        cloneCheckSerializable(cfg);
-//
-//        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
-//
-//        // Initialize defaults.
-//        initialize(cfg, cacheObjCtx);
-//
-//        String masked = maskNull(cfg.getName());
-//
-//        if (cacheDescriptor(cfg.getName()) != null) {
-//            String cacheName = cfg.getName();
-//
-//            if (cacheName != null)
-//                throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
-//                    "assign unique name to each cache): " + U.maskName(cacheName));
-//            else
-//                throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
-//                    "assign unique name to each cache).");
-//        }
-//
-//        CacheType cacheType;
-//
-//        if (CU.isUtilityCache(cfg.getName()))
-//            cacheType = CacheType.UTILITY;
-//        else if (internalCaches.contains(maskNull(cfg.getName())))
-//            cacheType = CacheType.INTERNAL;
-//        else
-//            cacheType = CacheType.USER;
-//
-//        if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
-//            cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
-//
-//        boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
-//
-//        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
-//            cfg,
-//            cacheType,
-//            template,
-//            IgniteUuid.randomUuid(),
-//            new QuerySchema(cfg.getQueryEntities()));
-//
-//        desc.locallyConfigured(true);
-//        desc.staticallyConfigured(true);
-//        desc.receivedFrom(ctx.localNodeId());
-//
-//        if (!template) {
-//            cacheDescriptor(cfg.getName(), desc);
-//
-//            ctx.discovery().setCacheFilter(
-//                cfg.getName(),
-//                cfg.getNodeFilter(),
-//                cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
-//                cfg.getCacheMode());
-//
-//            ctx.discovery().addClientNode(cfg.getName(),
-//                ctx.localNodeId(),
-//                cfg.getNearConfiguration() != null);
-//
-//            if (!cacheType.userCache())
-//                stopSeq.addLast(cfg.getName());
-//            else
-//                stopSeq.addFirst(cfg.getName());
-//        }
-//        else {
-//            if (log.isDebugEnabled())
-//                log.debug("Use cache configuration as template: " + cfg);
-//
-//            registeredTemplates.put(masked, desc);
-//        }
-//
-//        if (cfg.getName() == null) { // Use cache configuration with null name as template.
-//            DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
-//                cfg,
-//                cacheType,
-//                true,
-//                IgniteUuid.randomUuid(),
-//                new QuerySchema(cfg.getQueryEntities()));
-//
-//            desc0.locallyConfigured(true);
-//            desc0.staticallyConfigured(true);
-//
-//            registeredTemplates.put(masked, desc0);
-//        }
-    }
-
-    /**
      * Initialize internal cache names
      */
     private void initializeInternalCacheNames() {
@@ -908,57 +820,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             ctx.query().onCacheKernalStart();
 
-            // Start dynamic caches received from collect discovery data.
-//            for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-//                if (ctx.config().isDaemon())
-//                    continue;
-//
-//                desc.clearRemoteConfigurations();
-//
-//                CacheConfiguration ccfg = desc.cacheConfiguration();
-//
-//                IgnitePredicate filter = ccfg.getNodeFilter();
-//
-//                boolean loc = desc.locallyConfigured();
-//
-//                if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
-//                    boolean started = desc.onStart();
-//
-//                    assert started : "Failed to change started flag for locally configured cache: " + desc;
-//
-//                    CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
-//
-//                    CachePluginManager pluginMgr = desc.pluginManager();
-//
-//                    GridCacheContext ctx = createCache(
-//                        ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
-//
-//                    ctx.dynamicDeploymentId(desc.deploymentId());
-//
-//                    sharedCtx.addCacheContext(ctx);
-//
-//                    GridCacheAdapter cache = ctx.cache();
-//
-//                    String name = ccfg.getName();
-//
-//                    caches.put(name, cache);
-//
-//                    startCache(cache, desc.schema());
-//
-//                    jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false));
-//                }
-//            }
+            // Must call onKernalStart on shared managers after creation of fetched caches.
+            for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
+                if (sharedCtx.database() != mgr)
+                    mgr.onKernalStart(false);
+            }
         }
         finally {
             cacheStartedLatch.countDown();
         }
 
-        // Must call onKernalStart on shared managers after creation of fetched caches.
-        for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
-            if (sharedCtx.database() != mgr)
-                mgr.onKernalStart(false);
-        }
-
         // Escape if start active on start false
         if (!activeOnStart)
             return;
@@ -1221,7 +1092,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 }
             }
         }
-//
+// TODO
 //        if (clientReconnectReqs != null) {
 //            for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet())
 //                processClientReconnectData(e.getKey(), e.getValue());
@@ -1835,39 +1706,44 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param req Cache start request.
-     * @param topVer Topology version.
+     * @param nearCfg Near cache configuration.
+     * @param desc Cache descriptor.
+     * @param exchTopVer Current exchange version.
      * @throws IgniteCheckedException If failed.
      */
-    public void prepareCacheStart(DynamicCacheChangeRequest req, AffinityTopologyVersion topVer)
+    void prepareCacheStart(DynamicCacheChangeRequest req,
+        @Nullable NearCacheConfiguration nearCfg,
+        DynamicCacheDescriptor desc,
+        AffinityTopologyVersion exchTopVer)
         throws IgniteCheckedException {
         assert req.start() : req;
         assert req.cacheType() != null : req;
 
-        DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
-
         prepareCacheStart(
             req.startCacheConfiguration(),
-            req.nearCacheConfiguration(),
+            nearCfg,
             req.cacheType(),
-            req.clientStartOnly(),
-            req.initiatingNodeId(),
             req.deploymentId(),
             desc.startTopologyVersion(),
-            topVer,
-            desc != null ? desc.schema() : null
+            exchTopVer,
+            desc.schema()
         );
     }
 
+    /**
+     * @param exchTopVer Current exchange version.
+     * @throws IgniteCheckedException If failed.
+     */
     public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
-        List<DynamicCacheDescriptor> caches = cachesInfo.cachesToStartOnLocalJoin();
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cachesInfo.cachesToStartOnLocalJoin();
+
+        for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : caches) {
+            DynamicCacheDescriptor desc = t.get1();
 
-        for (DynamicCacheDescriptor desc : caches) {
             prepareCacheStart(
                 desc.cacheConfiguration(),
-                null,
+                t.get2(),
                 desc.cacheType(),
-                false,
-                null,
                 desc.deploymentId(),
                 desc.startTopologyVersion(),
                 exchTopVer,
@@ -1894,8 +1770,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     desc.cacheConfiguration(),
                     null,
                     desc.cacheType(),
-                    false,
-                    null,
                     desc.deploymentId(),
                     desc.startTopologyVersion(),
                     exchTopVer,
@@ -1911,8 +1785,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cfg Start configuration.
      * @param nearCfg Near configuration.
      * @param cacheType Cache type.
-     * @param clientStartOnly Client only start request.
-     * @param initiatingNodeId Initiating node ID.
      * @param deploymentId Deployment ID.
      * @param cacheStartTopVer Cache start topology version.
      * @param exchTopVer Current exchange version.
@@ -1923,51 +1795,37 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         CacheConfiguration cfg,
         NearCacheConfiguration nearCfg,
         CacheType cacheType,
-        boolean clientStartOnly,
-        UUID initiatingNodeId,
         IgniteUuid deploymentId,
         AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion exchTopVer,
         @Nullable QuerySchema schema
     ) throws IgniteCheckedException {
-        CacheConfiguration ccfg = new CacheConfiguration(cfg);
-
-        IgnitePredicate nodeFilter = ccfg.getNodeFilter();
-
-        ClusterNode locNode = ctx.discovery().localNode();
+        assert !caches.containsKey(cfg.getName()) : cfg.getName();
 
-        boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
-        boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
+        CacheConfiguration ccfg = new CacheConfiguration(cfg);
 
-        assert !caches.containsKey(ccfg.getName()) : ccfg.getName();
+        if (nearCfg != null)
+            ccfg.setNearConfiguration(nearCfg);
 
-        if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) {
-            if (clientNodeStart && !affNodeStart) {
-                if (nearCfg != null)
-                    ccfg.setNearConfiguration(nearCfg);
-                else
-                    ccfg.setNearConfiguration(null);
-            }
+        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-            CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+        GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
 
-            GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
+        cacheCtx.startTopologyVersion(exchTopVer);
 
-            cacheCtx.startTopologyVersion(exchTopVer);
-            cacheCtx.cacheStartTopologyVersion(cacheStartTopVer);
+        cacheCtx.cacheStartTopologyVersion(cacheStartTopVer);
 
-            cacheCtx.dynamicDeploymentId(deploymentId);
+        cacheCtx.dynamicDeploymentId(deploymentId);
 
-            GridCacheAdapter cache = cacheCtx.cache();
+        GridCacheAdapter cache = cacheCtx.cache();
 
-            sharedCtx.addCacheContext(cacheCtx);
+        sharedCtx.addCacheContext(cacheCtx);
 
-            caches.put(cacheCtx.name(), cache);
+        caches.put(cacheCtx.name(), cache);
 
-            startCache(cache, schema != null ? schema : new QuerySchema());
+        startCache(cache, schema != null ? schema : new QuerySchema());
 
-            onKernalStart(cache);
-        }
+        onKernalStart(cache);
     }
 
     /**
@@ -2079,9 +1937,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
     }
 
-    void completeTemplateAddFuture(String name, IgniteUuid deploymentId) {
+    /**
+     * @param cacheName Cache name.
+     * @param deploymentId
+     */
+    void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) {
         GridCacheProcessor.TemplateConfigurationFuture fut =
-            (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name);
+            (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(cacheName);
 
         if (fut != null && fut.deploymentId().equals(deploymentId))
             fut.onDone();
@@ -2089,8 +1951,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param req Request to complete future for.
+     * @param err Error if any.
      */
-    public void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) {
+    public void completeCacheStartFuture(DynamicCacheChangeRequest req, @Nullable Exception err) {
         if (req.initiatingNodeId().equals(ctx.localNodeId())) {
             DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
 
@@ -2155,13 +2018,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
-        dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), cachesInfo.joinDiscoveryData());
+        cachesInfo.collectJoiningNodeData(dataBag);
     }
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
-            dataBag.addGridCommonData(CACHE_PROC.ordinal(), cachesInfo.collectCommonDiscoveryData());
+        cachesInfo.collectGridNodeData(dataBag);
     }
 
     /** {@inheritDoc} */
@@ -2474,7 +2336,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cacheName Cache name to close.
      * @return Future that will be completed when cache is closed.
      */
-    public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
+    IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
         assert cacheName != null;
 
         IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
@@ -2734,18 +2596,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param req Cache change request.
-     */
-    private void initReceivedCacheConfiguration(DynamicCacheChangeRequest req) {
-        if (req.startCacheConfiguration() != null) {
-            CacheConfiguration ccfg = req.startCacheConfiguration();
-
-            if (ccfg.isStoreKeepBinary() == null)
-                ccfg.setStoreKeepBinary(CacheConfiguration.DFLT_STORE_KEEP_BINARY);
-        }
-    }
-
-    /**
      * Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode.
      *
      * @param cfgs Caches.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 96df255..1ca4ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -218,6 +218,10 @@ public class DiscoveryDataBag {
         return newJoinerData;
     }
 
+    void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+
+    }
+
     /**
      * @param cmpId component ID.
      * @param data Data.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
new file mode 100644
index 0000000..da34424
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheStartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private CacheConfiguration ccfg;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        if (ccfg != null)
+            cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartAndNodeJoin() throws Exception {
+        Ignite node0 = startGrid(0);
+
+        checkCache(0, "c1", false);
+
+        node0.createCache(cacheConfiguration("c1"));
+
+        checkCache(0, "c1", true);
+
+        startGrid(1);
+
+        checkCache(0, "c1", true);
+        checkCache(1, "c1", true);
+
+        client = true;
+
+        startGrid(2);
+
+        checkCache(0, "c1", true);
+        checkCache(1, "c1", true);
+        checkCache(2, "c1", false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartFromJoiningNode1() throws Exception {
+        checkStartFromJoiningNode(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartFromJoiningNode2() throws Exception {
+        checkStartFromJoiningNode(true);
+    }
+
+    /**
+     * @param joinClient {@code True} if client node joins.
+     * @throws Exception If failed.
+     */
+    private void checkStartFromJoiningNode(boolean joinClient) throws Exception {
+        startGrid(0);
+        startGrid(1);
+
+        client = true;
+
+        startGrid(2);
+
+        ccfg = cacheConfiguration("c1");
+        client = joinClient;
+
+        startGrid(3);
+
+        checkCache(0, "c1", true);
+        checkCache(1, "c1", true);
+        checkCache(2, "c1", false);
+        checkCache(3, "c1", true);
+
+        client = false;
+        ccfg = null;
+
+        startGrid(4);
+
+        checkCache(0, "c1", true);
+        checkCache(1, "c1", true);
+        checkCache(2, "c1", false);
+        checkCache(3, "c1", true);
+        checkCache(4, "c1", true);
+
+        client = true;
+
+        startGrid(5);
+
+        checkCache(0, "c1", true);
+        checkCache(1, "c1", true);
+        checkCache(2, "c1", false);
+        checkCache(3, "c1", true);
+        checkCache(4, "c1", true);
+        checkCache(5, "c1", false);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(cacheName);
+
+        return ccfg;
+    }
+
+    /**
+     * @param idx Node index.
+     * @param cacheName Cache name.
+     * @param expCache {@code True} if cache should be created.
+     */
+    private void checkCache(int idx, String cacheName, boolean expCache) {
+        IgniteKernal node = (IgniteKernal)ignite(idx);
+
+        if (expCache)
+            assertNotNull(node.context().cache().cache(cacheName));
+        else
+            assertNull(node.context().cache().cache(cacheName));
+
+        assertNotNull(node.context().cache().cache(CU.UTILITY_CACHE_NAME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 4a34a1d..e7c5ca5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -1027,7 +1027,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
      * @param nearOnly Near only flag.
      * @throws Exception If failed.
      */
-    public void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
+    private void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
         try {
             final AtomicInteger cnt = new AtomicInteger(nodeCount());
             final AtomicReference<Throwable> err = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 8340cd7..72f13d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughS
 import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughStoreCallTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheStartTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalPeekModesTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxNearPeekModesTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxPeekModesTest;
@@ -209,6 +210,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
 
         suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class);
 
+        suite.addTestSuite(IgniteCacheStartTest.class);
         suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class);
         suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class);
         suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);


Mime
View raw message