ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [11/50] [abbrv] ignite git commit: Reworked cluster activation/deactivation.
Date Fri, 07 Jul 2017 09:37:12 GMT
Reworked cluster activation/deactivation.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1337901f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1337901f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1337901f

Branch: refs/heads/master
Commit: 1337901f04c866e20093b20449c0872f089fb64b
Parents: 54572c3
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Jul 5 11:19:43 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Jul 5 11:19:43 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/GridComponent.java   |    4 +-
 .../ignite/internal/GridPluginComponent.java    |    2 +-
 .../apache/ignite/internal/IgniteKernal.java    |   33 +-
 .../internal/managers/GridManagerAdapter.java   |    2 +-
 .../internal/managers/discovery/DiscoCache.java |   17 +-
 .../discovery/DiscoveryLocalJoinData.java       |  104 ++
 .../discovery/GridDiscoveryManager.java         |  128 +-
 .../pagemem/store/IgnitePageStoreManager.java   |    3 +-
 .../processors/GridProcessorAdapter.java        |    2 +-
 .../cache/CacheAffinitySharedManager.java       |   67 +-
 .../processors/cache/CacheGroupContext.java     |    4 +-
 .../processors/cache/CacheGroupData.java        |    4 +-
 .../cache/ChangeGlobalStateMessage.java         |  120 --
 .../processors/cache/ClusterCachesInfo.java     |  490 +++++--
 .../internal/processors/cache/ClusterState.java |   38 -
 .../cache/DynamicCacheChangeRequest.java        |   52 +-
 .../processors/cache/ExchangeActions.java       |   37 +-
 .../processors/cache/GridCacheEventManager.java |    2 -
 .../cache/GridCacheEvictionManager.java         |    1 -
 .../processors/cache/GridCacheIoManager.java    |   13 +-
 .../processors/cache/GridCacheMvccManager.java  |    9 +-
 .../GridCachePartitionExchangeManager.java      |  423 +++---
 .../processors/cache/GridCacheProcessor.java    |  177 ++-
 .../cache/GridCacheSharedContext.java           |   60 +-
 .../cache/GridCacheSharedManager.java           |    6 -
 .../cache/GridCacheSharedManagerAdapter.java    |   16 -
 .../processors/cache/PendingDiscoveryEvent.java |   61 +
 .../processors/cache/StateChangeRequest.java    |   77 ++
 .../binary/CacheObjectBinaryProcessorImpl.java  |    4 +-
 .../distributed/GridCacheTxRecoveryFuture.java  |    1 -
 .../distributed/dht/GridDhtCacheAdapter.java    |    1 -
 .../cache/distributed/dht/GridDhtGetFuture.java |    1 -
 .../distributed/dht/GridDhtGetSingleFuture.java |    2 -
 .../dht/GridDhtPartitionTopologyImpl.java       |   13 +-
 .../dht/GridDhtTopologyFutureAdapter.java       |    2 +-
 .../dht/GridPartitionedSingleGetFuture.java     |    3 -
 .../GridNearAtomicAbstractUpdateFuture.java     |    1 -
 .../dht/preloader/GridDhtForceKeysFuture.java   |    1 -
 .../dht/preloader/GridDhtPartitionDemander.java |    2 +
 .../GridDhtPartitionsExchangeFuture.java        |  228 +++-
 .../preloader/GridDhtPartitionsFullMessage.java |   44 +-
 .../GridDhtPartitionsSingleMessage.java         |   38 +-
 .../dht/preloader/GridDhtPreloader.java         |    2 +-
 .../distributed/near/GridNearGetFuture.java     |    2 -
 .../near/GridNearTxPrepareRequest.java          |    1 -
 .../GridCacheDatabaseSharedManager.java         |  105 +-
 .../persistence/GridCacheOffheapManager.java    |    5 +-
 .../IgniteCacheDatabaseSharedManager.java       |   64 +-
 .../persistence/IgniteCacheSnapshotManager.java |   12 +-
 .../persistence/file/FilePageStoreManager.java  |   14 +-
 .../wal/FileWriteAheadLogManager.java           |    8 -
 .../query/GridCacheDistributedQueryManager.java |    4 +-
 .../store/GridCacheStoreManagerAdapter.java     |    1 -
 .../cache/version/GridCacheVersionManager.java  |    6 -
 .../cacheobject/IgniteCacheObjectProcessor.java |    5 -
 .../IgniteCacheObjectProcessorImpl.java         |    5 -
 .../cluster/ChangeGlobalStateFinishMessage.java |   86 ++
 .../cluster/ChangeGlobalStateMessage.java       |  140 ++
 .../processors/cluster/ClusterProcessor.java    |    3 +-
 .../cluster/DiscoveryDataClusterState.java      |  157 +++
 .../cluster/GridClusterStateProcessor.java      | 1122 ++++++---------
 .../cluster/IgniteChangeGlobalStateSupport.java |    3 +-
 .../datastructures/DataStructuresProcessor.java |    6 +-
 .../datastructures/GridCacheAtomicLongImpl.java |    2 +-
 .../GridCacheAtomicReferenceImpl.java           |    2 +-
 .../GridCacheAtomicSequenceImpl.java            |    2 +-
 .../GridCacheAtomicStampedImpl.java             |    2 +-
 .../GridCacheCountDownLatchImpl.java            |    2 +-
 .../datastructures/GridCacheLockImpl.java       |    4 +-
 .../datastructures/GridCacheQueueAdapter.java   |    1 -
 .../datastructures/GridCacheSemaphoreImpl.java  |    2 +-
 .../datastructures/GridCacheSetImpl.java        |    1 -
 .../internal/processors/igfs/IgfsImpl.java      |    2 -
 .../internal/processors/igfs/IgfsProcessor.java |    2 +-
 .../processors/query/GridQueryProcessor.java    |    4 +-
 .../processors/rest/GridRestProcessor.java      |    2 +-
 .../cluster/GridChangeStateCommandHandler.java  |    2 +-
 .../service/GridServiceProcessor.java           |    6 +-
 .../processors/task/GridTaskProcessor.java      |    2 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   12 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   10 +-
 .../internal/TestRecordingCommunicationSpi.java |   10 +
 ...GridManagerLocalMessageListenerSelfTest.java |    4 +-
 .../cache/IgniteActiveClusterTest.java          |  182 ---
 .../IgniteClusterActivateDeactivateTest.java    | 1284 ++++++++++++++++++
 ...erActivateDeactivateTestWithPersistence.java |  197 +++
 .../IgniteDaemonNodeMarshallerCacheTest.java    |   10 -
 .../pagemem/NoOpPageStoreManager.java           |   12 +-
 .../persistence/pagemem/NoOpWALManager.java     |   23 +-
 .../AbstractNodeJoinTemplate.java               |  149 +-
 .../IgniteChangeGlobalStateAbstractTest.java    |   65 +-
 .../IgniteChangeGlobalStateCacheTest.java       |    2 +-
 ...IgniteChangeGlobalStateDataStreamerTest.java |    5 +-
 ...gniteChangeGlobalStateDataStructureTest.java |    6 +-
 .../IgniteChangeGlobalStateFailOverTest.java    |   26 +-
 .../IgniteChangeGlobalStateTest.java            |  158 +--
 .../IgniteStandByClusterTest.java               |   17 +-
 .../join/JoinActiveNodeToActiveCluster.java     |   62 +-
 ...ctiveNodeToActiveClusterWithPersistence.java |   17 +
 .../IgniteStandByClientReconnectTest.java       |   13 +-
 ...eStandByClientReconnectToNewClusterTest.java |   13 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |    2 +-
 .../testframework/junits/GridAbstractTest.java  |    4 +-
 .../junits/common/GridCommonAbstractTest.java   |    3 +
 .../testsuites/IgniteStandByClusterSuite.java   |    5 +-
 .../processors/hadoop/HadoopProcessor.java      |    4 +-
 106 files changed, 4180 insertions(+), 2197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 0505929..93ffe95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -84,9 +84,11 @@ public interface GridComponent {
      * Callback that notifies that kernal has successfully started,
      * including all managers and processors.
      *
+     * @param active Cluster active flag (note: should be used carefully since state can
+     *     change concurrently).
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void onKernalStart() throws IgniteCheckedException;
+    public void onKernalStart(boolean active) throws IgniteCheckedException;
 
     /**
      * Callback to notify that kernal is about to stop.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index cc1ae71..fd59d24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -61,7 +61,7 @@ public class GridPluginComponent implements GridComponent {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
         plugin.onIgniteStart();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 31ee3e2..0c17b32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
 import org.apache.ignite.internal.managers.collision.GridCollisionManager;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
+import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.failover.GridFailoverManager;
@@ -207,7 +208,6 @@ import static org.apache.ignite.internal.IgniteComponentType.HADOOP_HELPER;
 import static org.apache.ignite.internal.IgniteComponentType.IGFS;
 import static org.apache.ignite.internal.IgniteComponentType.IGFS_HELPER;
 import static org.apache.ignite.internal.IgniteComponentType.SCHEDULE;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_ACTIVE_ON_START;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_DATE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
@@ -818,8 +818,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
         List<PluginProvider> plugins = U.allPluginProviders();
 
-        final boolean activeOnStart = cfg.isActiveOnStart();
-
         // Spin out SPIs & managers.
         try {
             ctx = new GridKernalContextImpl(log,
@@ -994,11 +992,28 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             // Suggest Operation System optimizations.
             ctx.performance().addAll(OsConfigurationSuggestions.getSuggestions());
 
+            DiscoveryLocalJoinData joinData = ctx.discovery().localJoin();
+
+            IgniteInternalFuture<Boolean> transitionWaitFut = joinData.transitionWaitFuture();
+
+            boolean active;
+
+            if (transitionWaitFut != null) {
+                if (log.isInfoEnabled()) {
+                    log.info("Join cluster while cluster state transition is in progress, " +
+                        "waiting when transition finish.");
+                }
+
+                active = transitionWaitFut.get();
+            }
+            else
+                active = joinData.active();
+
             // Notify discovery manager the first to make sure that topology is discovered.
-            ctx.discovery().onKernalStart();
+            ctx.discovery().onKernalStart(active);
 
             // Notify IO manager the second so further components can send and receive messages.
-            ctx.io().onKernalStart();
+            ctx.io().onKernalStart(active);
 
             // Start plugins.
             for (PluginProvider provider : ctx.plugins().allProviders())
@@ -1021,7 +1036,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
                 if (!skipDaemon(comp)) {
                     try {
-                        comp.onKernalStart();
+                        comp.onKernalStart(active);
                     }
                     catch (IgniteNeedReconnectException e) {
                         assert ctx.discovery().reconnectSupported();
@@ -1486,7 +1501,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         add(ATTR_MARSHALLER_USE_DFLT_SUID,
             getBoolean(IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID, OptimizedMarshaller.USE_DFLT_SUID));
         add(ATTR_LATE_AFFINITY_ASSIGNMENT, cfg.isLateAffinityAssignment());
-        add(ATTR_ACTIVE_ON_START, cfg.isActiveOnStart());
 
         if (cfg.getMarshaller() instanceof BinaryMarshaller) {
             add(ATTR_MARSHALLER_COMPACT_FOOTER, cfg.getBinaryConfiguration() == null ?
@@ -3395,7 +3409,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         guard();
 
         try {
-            return context().state().active();
+            return context().state().publicApiActiveState();
         }
         finally {
             unguard();
@@ -3694,10 +3708,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
      * @throws IgniteException if cluster in inActive state
      */
     private void checkClusterState() throws IgniteException {
-        if (!ctx.state().active())
+        if (!ctx.state().publicApiActiveState()) {
             throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, that " +
                 "the cluster is considered inactive by default if Ignite Persistent Store is used to let all the nodes " +
                 "join the cluster. To activate the cluster call Ignite.activate(true).");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 7dfeffb..a151eb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -362,7 +362,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
     }
 
     /** {@inheritDoc} */
-    @Override public final void onKernalStart() throws IgniteCheckedException {
+    @Override public final void onKernalStart(boolean active) throws IgniteCheckedException {
         for (final IgniteSpi spi : spis) {
             try {
                 spi.onContextInitialized(new IgniteSpiContext() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 2b3c4fc..4c1077b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -37,6 +38,9 @@ import org.jetbrains.annotations.Nullable;
  *
  */
 public class DiscoCache {
+    /** */
+    private final DiscoveryDataClusterState state;
+
     /** Local node. */
     private final ClusterNode loc;
 
@@ -78,6 +82,7 @@ public class DiscoCache {
     private final Set<UUID> alives = new GridConcurrentHashSet<>();
 
     /**
+     * @param state Current cluster state.
      * @param loc Local node.
      * @param rmtNodes Remote nodes.
      * @param allNodes All nodes.
@@ -91,7 +96,9 @@ public class DiscoCache {
      * @param nodeMap Node map.
      * @param alives Alive nodes.
      */
-    DiscoCache(ClusterNode loc,
+    DiscoCache(
+        DiscoveryDataClusterState state,
+        ClusterNode loc,
         List<ClusterNode> rmtNodes,
         List<ClusterNode> allNodes,
         List<ClusterNode> srvNodes,
@@ -103,6 +110,7 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
         Set<UUID> alives) {
+        this.state = state;
         this.loc = loc;
         this.rmtNodes = rmtNodes;
         this.allNodes = allNodes;
@@ -117,6 +125,13 @@ public class DiscoCache {
         this.alives.addAll(alives);
     }
 
+    /**
+     * @return Current cluster state.
+     */
+    public DiscoveryDataClusterState state() {
+        return state;
+    }
+
     /** @return Local node. */
     public ClusterNode localNode() {
         return loc;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java
new file mode 100644
index 0000000..a1f2aa7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Information about local join event.
+ */
+public class DiscoveryLocalJoinData {
+    /** */
+    private final DiscoveryEvent evt;
+
+    /** */
+    private final DiscoCache discoCache;
+
+    /** */
+    private final AffinityTopologyVersion joinTopVer;
+
+    /** */
+    private final IgniteInternalFuture<Boolean> transitionWaitFut;
+
+    /** */
+    private final boolean active;
+
+    /**
+     * @param evt Event.
+     * @param discoCache Discovery data cache.
+     * @param transitionWaitFut Future if cluster state transition is in progress.
+     * @param active Cluster active status.
+     */
+    public DiscoveryLocalJoinData(DiscoveryEvent evt,
+        DiscoCache discoCache,
+        @Nullable IgniteInternalFuture<Boolean> transitionWaitFut,
+        boolean active) {
+        assert evt != null && evt.topologyVersion() > 0 : evt;
+
+        this.evt = evt;
+        this.discoCache = discoCache;
+        this.transitionWaitFut = transitionWaitFut;
+        this.active = active;
+
+        joinTopVer = new AffinityTopologyVersion(evt.topologyVersion(), 0);
+    }
+
+    /**
+     * @return Future if cluster state transition is in progress.
+     */
+    @Nullable public IgniteInternalFuture<Boolean> transitionWaitFuture() {
+        return transitionWaitFut;
+    }
+
+    /**
+     * @return Cluster state.
+     */
+    public boolean active() {
+        return active;
+    }
+
+    /**
+     * @return Event.
+     */
+    public DiscoveryEvent event() {
+        return evt;
+    }
+
+    /**
+     * @return Discovery data cache.
+     */
+    public DiscoCache discoCache() {
+        return discoCache;
+    }
+
+    /**
+     * @return Join topology version.
+     */
+    public AffinityTopologyVersion joinTopologyVersion() {
+        return joinTopVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DiscoveryLocalJoinData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index c38e37a..9f5bd3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -75,8 +75,11 @@ import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscove
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -90,7 +93,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -133,7 +135,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_ACTIVE_ON_START;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
@@ -144,6 +145,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_COMP
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SERVICES_COMPATIBILITY_MODE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME;
 import static org.apache.ignite.internal.IgniteVersionUtils.VER;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.processors.security.SecurityUtils.SERVICE_PERMISSIONS_SINCE;
 import static org.apache.ignite.internal.processors.security.SecurityUtils.isSecurityCompatibilityMode;
 import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.NOOP;
@@ -238,7 +240,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     private long segChkFreq;
 
     /** Local node join to topology event. */
-    private GridFutureAdapter<T2<DiscoveryEvent, DiscoCache>> locJoin = new GridFutureAdapter<>();
+    private GridFutureAdapter<DiscoveryLocalJoinData> locJoin = new GridFutureAdapter<>();
 
     /** GC CPU load. */
     private volatile double gcCpuLoad;
@@ -570,7 +572,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     if (type != EVT_NODE_SEGMENTED &&
                         type != EVT_CLIENT_NODE_DISCONNECTED &&
                         type != EVT_CLIENT_NODE_RECONNECTED &&
-                        type != DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                        type != EVT_DISCOVERY_CUSTOM_EVT) {
                         minorTopVer = 0;
 
                         verChanged = true;
@@ -586,15 +588,50 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     updateClientNodes(node.id());
                 }
 
+                DiscoCache discoCache = null;
+
+                boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id());
+
+                IgniteInternalFuture<Boolean> transitionWaitFut = null;
+
+                ChangeGlobalStateFinishMessage stateFinishMsg = null;
+
+                if (locJoinEvt) {
+                    discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+
+                    transitionWaitFut = ctx.state().onLocalJoin(discoCache);
+                }
+                else if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)
+                    stateFinishMsg = ctx.state().onNodeLeft(node);
+
                 final AffinityTopologyVersion nextTopVer;
 
-                if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                if (type == EVT_DISCOVERY_CUSTOM_EVT) {
                     assert customMsg != null;
 
-                    boolean incMinorTopVer = ctx.cache().onCustomEvent(
-                        customMsg,
-                        new AffinityTopologyVersion(topVer, minorTopVer),
-                        node);
+                    boolean incMinorTopVer;
+
+                    if (customMsg instanceof ChangeGlobalStateMessage) {
+                        incMinorTopVer = ctx.state().onStateChangeMessage(
+                            new AffinityTopologyVersion(topVer, minorTopVer),
+                            (ChangeGlobalStateMessage)customMsg,
+                            discoCache());
+                    }
+                    else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
+                        ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
+
+                        discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+
+                        topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
+
+                        incMinorTopVer = false;
+                    }
+                    else {
+                        incMinorTopVer = ctx.cache().onCustomEvent(
+                            customMsg,
+                            new AffinityTopologyVersion(topVer, minorTopVer),
+                            node);
+                    }
 
                     if (incMinorTopVer) {
                         minorTopVer++;
@@ -603,17 +640,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
 
                     nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
-
-                    if (verChanged)
-                        ctx.cache().onDiscoveryEvent(type, node, nextTopVer);
                 }
-                else {
+                else
                     nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
 
-                    ctx.cache().onDiscoveryEvent(type, node, nextTopVer);
-                }
+                ctx.cache().onDiscoveryEvent(type, customMsg, node, nextTopVer, ctx.state().clusterState());
 
-                if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                if (type == EVT_DISCOVERY_CUSTOM_EVT) {
                     for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
                         List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
 
@@ -630,13 +663,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
                 }
 
-                final DiscoCache discoCache;
-
                 // Put topology snapshot into discovery history.
                 // There is no race possible between history maintenance and concurrent discovery
                 // event notifications, since SPI notifies manager about all events from this listener.
                 if (verChanged) {
-                    discoCache = createDiscoCache(locNode, topSnapshot);
+                    if (discoCache == null)
+                        discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
 
                     discoCacheHist.put(nextTopVer, discoCache);
 
@@ -650,8 +682,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     // Current version.
                     discoCache = discoCache();
 
+                final DiscoCache discoCache0 = discoCache;
+
                 // If this is a local join event, just save it and do not notify listeners.
-                if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) {
+                if (locJoinEvt) {
                     if (gridStartTime == 0)
                         gridStartTime = getSpi().getGridStartTime();
 
@@ -668,7 +702,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     discoEvt.topologySnapshot(topVer, new ArrayList<>(F.view(topSnapshot, FILTER_DAEMON)));
 
-                    locJoin.onDone(new T2<>(discoEvt, discoCache));
+                    discoWrk.discoCache = discoCache;
+
+                    if (!isLocDaemon && !ctx.clientDisconnected())
+                        ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache);
+
+                    locJoin.onDone(new DiscoveryLocalJoinData(discoEvt,
+                        discoCache,
+                        transitionWaitFut,
+                        ctx.state().clusterState().active()));
 
                     return;
                 }
@@ -697,7 +739,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     topHist.clear();
 
                     topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                        createDiscoCache(locNode, Collections.<ClusterNode>emptySet())));
+                        createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -709,12 +751,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
 
+                    ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache);
+
                     ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() {
                         @Override public void apply(IgniteFuture<?> fut) {
                             try {
                                 fut.get();
 
-                                discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, null);
+                                discoWrk.addEvent(type, nextTopVer, node, discoCache0, topSnapshot, null);
                             }
                             catch (IgniteException ignore) {
                                 // No-op.
@@ -727,6 +771,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                 if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected())
                     discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, customMsg);
+
+                if (stateFinishMsg != null)
+                    discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT, nextTopVer, node, discoCache, topSnapshot, stateFinishMsg);
             }
         });
 
@@ -826,7 +873,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return {@code True} if should not process message.
      */
     private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) {
-        if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+        if (type == EVT_DISCOVERY_CUSTOM_EVT) {
             assert customMsg != null && customMsg.id() != null : customMsg;
 
             if (rcvdCustomMsgs.contains(customMsg.id())) {
@@ -1157,7 +1204,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             locMarshStrSerVer2;
 
         boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
-        boolean locActiveOnStart = locNode.attribute(ATTR_ACTIVE_ON_START);
 
         Boolean locSrvcCompatibilityEnabled = locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
         Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE);
@@ -1971,7 +2017,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** @return Event that represents a local node joined to topology. */
     public DiscoveryEvent localJoinEvent() {
         try {
-            return locJoin.get().get1();
+            return locJoin.get().event();
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -1981,7 +2027,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * @return Tuple that consists of a local join event and discovery cache at the join time.
      */
-    public T2<DiscoveryEvent, DiscoCache> localJoin() {
+    public DiscoveryLocalJoinData localJoin() {
         try {
             return locJoin.get();
         }
@@ -2016,7 +2062,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     public void clientCacheStartEvent(UUID reqId,
         @Nullable Map<String, DynamicCacheChangeRequest> startReqs,
         @Nullable Set<String> cachesToClose) {
-        discoWrk.addEvent(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
+        discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT,
             AffinityTopologyVersion.NONE,
             localNode(),
             null,
@@ -2098,11 +2144,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * Called from discovery thread.
      *
+     * @param state Current state.
      * @param loc Local node.
      * @param topSnapshot Topology snapshot.
      * @return Newly created discovery cache.
      */
-    @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
+    @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state,
+        ClusterNode loc,
+        Collection<ClusterNode> topSnapshot) {
         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
         HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
 
@@ -2177,6 +2226,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         return new DiscoCache(
+            state,
             loc,
             Collections.unmodifiableList(rmtNodes),
             Collections.unmodifiableList(allNodes),
@@ -2318,7 +2368,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         discoWrk.addEvent(EVT_NODE_SEGMENTED,
                             AffinityTopologyVersion.NONE,
                             node,
-                            createDiscoCache(node, empty),
+                            createDiscoCache(null, node, empty),
                             empty,
                             null);
 
@@ -2339,6 +2389,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
     /** Worker for discovery events. */
     private class DiscoveryWorker extends GridWorker {
+        /** */
+        private DiscoCache discoCache;
+
         /** Event queue. */
         private final BlockingQueue<GridTuple6<Integer, AffinityTopologyVersion, ClusterNode,
             DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
@@ -2457,6 +2510,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             boolean segmented = false;
 
+            if (evt.get4() != null)
+                discoCache = evt.get4();
+
             switch (type) {
                 case EVT_NODE_JOINED: {
                     assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer +
@@ -2570,8 +2626,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     break;
                 }
 
-                case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
-                    if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) {
+                case EVT_DISCOVERY_CUSTOM_EVT: {
+                    if (ctx.event().isRecordable(EVT_DISCOVERY_CUSTOM_EVT)) {
                         DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent();
 
                         customEvt.node(ctx.discovery().localNode());
@@ -2581,6 +2637,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         customEvt.affinityTopologyVersion(topVer);
                         customEvt.customMessage(evt.get6());
 
+                        if (evt.get4() == null) {
+                            assert discoCache != null : evt.get6();
+
+                            evt.set4(discoCache);
+                        }
+
                         ctx.event().record(customEvt, evt.get4());
                     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 468d35d..fa6e9e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -183,11 +183,10 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh
     public Map<String, StoredCacheData> readCacheConfigurations() throws IgniteCheckedException;
 
     /**
-     * @param grpDesc Cache group descriptor.
      * @param cacheData Cache configuration.
      * @throws IgniteCheckedException If failed.
      */
-    public void storeCacheData(CacheGroupDescriptor grpDesc, StoredCacheData cacheData) throws IgniteCheckedException;
+    public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException;
     /**
      * @param grpId Cache group ID.
      * @return {@code True} if index store for given cache group existed before node started.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index 690ba0e..d6f78ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -65,7 +65,7 @@ public abstract class GridProcessorAdapter implements GridProcessor {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 9516f84..8d08c3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -52,6 +53,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -108,6 +111,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /** */
     private final ThreadLocal<ClientCacheChangeDiscoveryMessage> clientCacheChanges = new ThreadLocal<>();
 
+    /** Caches initialized flag (initialized when join activate cluster or after activation. */
+    private boolean cachesInitialized;
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -140,10 +146,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * Callback invoked from discovery thread when discovery message is received.
      *
      * @param type Event type.
+     * @param customMsg Custom message instance.
      * @param node Event node.
      * @param topVer Topology version.
+     * @param state Cluster state.
      */
-    void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+    void onDiscoveryEvent(int type,
+        @Nullable DiscoveryCustomMessage customMsg,
+        ClusterNode node,
+        AffinityTopologyVersion topVer,
+        DiscoveryDataClusterState state) {
+        if (state.transition() || !state.active())
+            return;
+
         if (type == EVT_NODE_JOINED && node.isLocal()) {
             // Clean-up in case of client reconnect.
             caches.clear();
@@ -153,6 +168,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             lastAffVer = null;
 
             caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors());
+
+            cachesInitialized = true;
+        }
+        else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
+            if (!cachesInitialized && ((ChangeGlobalStateFinishMessage)customMsg).clusterActive()) {
+                caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors());
+
+                cachesInitialized = true;
+            }
         }
 
         if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
@@ -404,7 +428,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName());
 
-                cctx.cache().prepareCacheStart(desc, startReq.nearCacheConfiguration(), topVer);
+                cctx.cache().prepareCacheStart(desc.cacheConfiguration(),
+                    desc,
+                    startReq.nearCacheConfiguration(),
+                    topVer);
 
                 startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null);
 
@@ -683,19 +710,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             NearCacheConfiguration nearCfg = null;
 
-            if (exchActions.newClusterState() == ClusterState.ACTIVE) {
-                if (CU.isSystemCache(req.cacheName()))
-                    startCache = true;
-                else if (!cctx.localNode().isClient()) {
-                    startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
-                        CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
-
-                    nearCfg = req.nearCacheConfiguration();
-                }
-                else // Only static cache configured on client must be started.
-                    startCache = cctx.kernalContext().state().isLocallyConfigured(req.cacheName());
-            }
-            else if (cctx.localNodeId().equals(req.initiatingNodeId())) {
+            if (req.locallyConfigured() || (cctx.localNodeId().equals(req.initiatingNodeId()) && !exchActions.activate())) {
                 startCache = true;
 
                 nearCfg = req.nearCacheConfiguration();
@@ -703,7 +718,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             else {
                 // Cache should not be started
                 assert cctx.cacheContext(cacheDesc.cacheId()) == null
-                        : "Starting cache has not null context: " + cacheDesc.cacheName();
+                    : "Starting cache has not null context: " + cacheDesc.cacheName();
 
                 IgniteCacheProxy cacheProxy = cctx.cache().jcacheProxy(req.cacheName());
 
@@ -711,27 +726,29 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 if (cacheProxy != null) {
                     // Cache should be in restarting mode
                     assert cacheProxy.isRestarting()
-                            : "Cache has non restarting proxy " + cacheProxy;
+                        : "Cache has non restarting proxy " + cacheProxy;
 
                     startCache = true;
                 }
-                else
-                    startCache = CU.affinityNode(cctx.localNode(), cacheDesc.groupDescriptor().config().getNodeFilter());
+                else {
+                    startCache = CU.affinityNode(cctx.localNode(),
+                        cacheDesc.groupDescriptor().config().getNodeFilter());
+                }
             }
 
             try {
                 // Save configuration before cache started.
-                if (cctx.pageStore() != null && !cctx.localNode().isClient())
+                if (cctx.pageStore() != null && !cctx.kernalContext().clientNode()) {
                     cctx.pageStore().storeCacheData(
-                        cacheDesc.groupDescriptor(),
                         new StoredCacheData(req.startCacheConfiguration())
                     );
+                }
 
                 if (startCache) {
-                    cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());
-
-                    if (exchActions.newClusterState() == null)
-                        cctx.kernalContext().state().onCacheStart(req);
+                    cctx.cache().prepareCacheStart(req.startCacheConfiguration(),
+                        cacheDesc,
+                        nearCfg,
+                        fut.topologyVersion());
 
                     if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
                         if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index c3ddc5f..14eb362 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -683,6 +683,8 @@ public class CacheGroupContext {
 
         aff.cancelFutures(err);
 
+        preldr.onKernalStop();
+
         offheapMgr.stop();
 
         ctx.io().removeCacheGroupHandlers(grpId);
@@ -853,8 +855,6 @@ public class CacheGroupContext {
             preldr = new GridCachePreloaderAdapter(this);
 
         if (ctx.kernalContext().config().getPersistentStoreConfiguration() != null) {
-            ClassLoader clsLdr = U.gridClassLoader();
-
             try {
                 offheapMgr = new GridCacheOffheapManager();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
index a290caf..99b7b1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -78,8 +78,8 @@ public class CacheGroupData implements Serializable {
         Map<String, Integer> caches,
         long flags) {
         assert cacheCfg != null;
-        assert grpId != 0;
-        assert deploymentId != null;
+        assert grpId != 0 : cacheCfg.getName();
+        assert deploymentId != null : cacheCfg.getName();
 
         this.cacheCfg = cacheCfg;
         this.grpName = grpName;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java
deleted file mode 100644
index 4d1a50b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.UUID;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Message represent request for change cluster global state.
- */
-public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Custom message ID. */
-    private IgniteUuid id = IgniteUuid.randomUuid();
-
-    /** Request ID */
-    private UUID requestId;
-
-    /** Initiator node ID. */
-    private UUID initiatingNodeId;
-
-    /** If true activate else deactivate. */
-    private boolean activate;
-
-    /** Batch contains all requests for start or stop caches. */
-    private DynamicCacheChangeBatch changeGlobalStateBatch;
-
-    /** If happened concurrent activate/deactivate then processed only first message, other message must be skip. */
-    private boolean concurrentChangeState;
-
-    /**
-     *
-     */
-    public ChangeGlobalStateMessage(
-        UUID requestId,
-        UUID initiatingNodeId,
-        boolean activate,
-        DynamicCacheChangeBatch changeGlobalStateBatch
-    ) {
-        this.requestId = requestId;
-        this.initiatingNodeId = initiatingNodeId;
-        this.activate = activate;
-        this.changeGlobalStateBatch = changeGlobalStateBatch;
-    }
-
-    /**
-     *
-     */
-    public DynamicCacheChangeBatch getDynamicCacheChangeBatch() {
-        return changeGlobalStateBatch;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid id() {
-        return id;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
-        return !concurrentChangeState ? changeGlobalStateBatch : null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMutable() {
-        return false;
-    }
-
-    /**
-     *
-     */
-    public UUID initiatorNodeId() {
-        return initiatingNodeId;
-    }
-
-    /**
-     *
-     */
-    public boolean activate() {
-        return activate;
-    }
-
-    /**
-     *
-     */
-    public UUID requestId() {
-        return requestId;
-    }
-
-    /**
-     *
-     */
-    public void concurrentChangeState() {
-        this.concurrentChangeState = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ChangeGlobalStateMessage.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 8f124b2..5452bd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -40,6 +40,9 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
@@ -93,10 +96,13 @@ class ClusterCachesInfo {
     private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
 
     /** */
-    private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
+    private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation;
 
     /** */
-    private volatile Exception onJoinCacheException;
+    private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
+
+    /** {@code True} if joined cluster while cluster state change was in progress. */
+    private boolean joinOnTransition;
 
     /**
      * @param ctx Context.
@@ -113,14 +119,25 @@ class ClusterCachesInfo {
      */
     void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException {
         this.joinDiscoData = joinDiscoData;
-    }
 
-    /**
-     *
-     * @return Exception if cache has conflict.
-     */
-    Exception onJoinCacheException(){
-        return onJoinCacheException;
+        Map<String, CacheConfiguration> grpCfgs = new HashMap<>();
+
+        for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) {
+            if (info.cacheData().config().getGroupName() == null)
+                continue;
+
+            CacheConfiguration ccfg = grpCfgs.get(info.cacheData().config().getGroupName());
+
+            if (ccfg == null)
+                grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config());
+            else
+                validateCacheGroupConfiguration(ccfg, info.cacheData().config());
+        }
+
+        String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true);
+
+        if (conflictErr != null)
+            throw new IgniteCheckedException("Failed to start configured cache. " + conflictErr);
     }
 
     /**
@@ -142,7 +159,9 @@ class ClusterCachesInfo {
         if (gridData != null && gridData.conflictErr != null)
             throw new IgniteCheckedException(gridData.conflictErr);
 
-        if (joinDiscoData != null && gridData != null) {
+        if (gridData != null && gridData.joinDiscoData != null) {
+            CacheJoinNodeDiscoveryData joinDiscoData = gridData.joinDiscoData;
+
             for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) {
                 CacheConfiguration locCfg = locCacheInfo.cacheData().config();
 
@@ -165,9 +184,9 @@ class ClusterCachesInfo {
             }
         }
 
-        joinDiscoData = null;
         gridData = null;
     }
+
     /**
      * Checks that remote caches has configuration compatible with the local.
      *
@@ -308,22 +327,64 @@ class ClusterCachesInfo {
             }
         }
     }
-
     /**
      * @param batch Cache change request.
      * @param topVer Topology version.
      * @return {@code True} if minor topology version should be increased.
      */
     boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
-        ExchangeActions exchangeActions = new ExchangeActions();
+        DiscoveryDataClusterState state = ctx.state().clusterState();
+
+        if (state.active() && !state.transition()) {
+            ExchangeActions exchangeActions = new ExchangeActions();
+
+            CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions,
+                batch.requests(),
+                topVer,
+                false);
 
-        boolean incMinorTopVer = false;
+            if (res.needExchange) {
+                assert !exchangeActions.empty() : exchangeActions;
 
-        List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+                batch.exchangeActions(exchangeActions);
+            }
+
+            return res.needExchange;
+        }
+        else {
+            IgniteCheckedException err = new IgniteCheckedException("Failed to start/stop cache, cluster state change " +
+                "is in progress.");
+
+            for (DynamicCacheChangeRequest req : batch.requests()) {
+                if (req.template()) {
+                    ctx.cache().completeTemplateAddFuture(req.startCacheConfiguration().getName(),
+                        req.deploymentId());
+                }
+                else
+                    ctx.cache().completeCacheStartFuture(req, false, err);
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * @param exchangeActions Exchange actions to update.
+     * @param reqs Requests.
+     * @param topVer Topology version.
+     * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
+     * @return Process result.
+     */
+    private CacheChangeProcessResult processCacheChangeRequests(
+        ExchangeActions exchangeActions,
+        Collection<DynamicCacheChangeRequest> reqs,
+        AffinityTopologyVersion topVer,
+        boolean persistedCfgs) {
+        CacheChangeProcessResult res = new CacheChangeProcessResult();
 
         final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
 
-        for (DynamicCacheChangeRequest req : batch.requests()) {
+        for (DynamicCacheChangeRequest req : reqs) {
             if (req.template()) {
                 CacheConfiguration ccfg = req.startCacheConfiguration();
 
@@ -347,17 +408,18 @@ class ClusterCachesInfo {
 
                     assert old == null;
 
-                    addedDescs.add(templateDesc);
+                    res.addedDescs.add(templateDesc);
                 }
 
-                ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+                if (!persistedCfgs)
+                    ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
 
                 continue;
             }
 
             assert !req.clientStartOnly() : req;
 
-            DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName());
+            DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName());
 
             boolean needExchange = false;
 
@@ -373,22 +435,32 @@ class ClusterCachesInfo {
                     if (conflictErr != null) {
                         U.warn(log, "Ignore cache start request. " + conflictErr);
 
-                        ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
-                            "cache. " + conflictErr));
+                        IgniteCheckedException err = new IgniteCheckedException("Failed to start " +
+                            "cache. " + conflictErr);
+
+                        if (persistedCfgs)
+                            res.errs.add(err);
+                        else
+                            ctx.cache().completeCacheStartFuture(req, false, err);
 
                         continue;
                     }
 
                     if (req.clientStartOnly()) {
+                        assert !persistedCfgs;
+
                         ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
                             "client cache (a cache with the given name is not started): " + req.cacheName()));
                     }
                     else {
                         SchemaOperationException err = QueryUtils.checkQueryEntityConflicts(
-                            req.startCacheConfiguration(), ctx.cache().cacheDescriptors().values());
+                            req.startCacheConfiguration(), registeredCaches.values());
 
                         if (err != null) {
-                            ctx.cache().completeCacheStartFuture(req, false, err);
+                            if (persistedCfgs)
+                                res.errs.add(err);
+                            else
+                                ctx.cache().completeCacheStartFuture(req, false, err);
 
                             continue;
                         }
@@ -430,11 +502,13 @@ class ClusterCachesInfo {
                             ccfg.getName(),
                             ccfg.getNearConfiguration() != null);
 
-                        ctx.discovery().addClientNode(req.cacheName(),
-                            req.initiatingNodeId(),
-                            req.nearCacheConfiguration() != null);
+                        if (!persistedCfgs) {
+                            ctx.discovery().addClientNode(req.cacheName(),
+                                req.initiatingNodeId(),
+                                req.nearCacheConfiguration() != null);
+                        }
 
-                        addedDescs.add(startDesc);
+                        res.addedDescs.add(startDesc);
 
                         exchangeActions.addCacheToStart(req, startDesc);
 
@@ -442,6 +516,7 @@ class ClusterCachesInfo {
                     }
                 }
                 else {
+                    assert !persistedCfgs;
                     assert req.initiatingNodeId() != null : req;
 
                     if (req.failIfExists()) {
@@ -489,8 +564,6 @@ class ClusterCachesInfo {
                     }
                 }
             }
-            else if (req.globalStateChange())
-                exchangeActions.newClusterState(req.state());
             else if (req.resetLostPartitions()) {
                 if (desc != null) {
                     needExchange = true;
@@ -559,18 +632,18 @@ class ClusterCachesInfo {
                 assert false : req;
 
             if (!needExchange) {
-                if (!clientCacheStart && req.initiatingNodeId().equals(ctx.localNodeId()))
+                if (!clientCacheStart && ctx.localNodeId().equals(req.initiatingNodeId()))
                     reqsToComplete.add(new T2<>(req, waitTopVer));
             }
             else
-                incMinorTopVer = true;
+                res.needExchange = true;
         }
 
-        if (!F.isEmpty(addedDescs)) {
-            AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
+        if (!F.isEmpty(res.addedDescs)) {
+            AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer;
 
-            for (DynamicCacheDescriptor desc : addedDescs) {
-                assert desc.template() || incMinorTopVer;
+            for (DynamicCacheDescriptor desc : res.addedDescs) {
+                assert desc.template() || res.needExchange;
 
                 desc.startTopologyVersion(startTopVer);
             }
@@ -602,13 +675,7 @@ class ClusterCachesInfo {
             });
         }
 
-        if (incMinorTopVer) {
-            assert !exchangeActions.empty() : exchangeActions;
-
-            batch.exchangeActions(exchangeActions);
-        }
-
-        return incMinorTopVer;
+        return res;
     }
 
     /**
@@ -669,7 +736,7 @@ class ClusterCachesInfo {
             return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo);
         }
         else {
-            assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active();
+            assert ctx.config().isDaemon() || joinDiscoData != null;
 
             return joinDiscoData;
         }
@@ -720,31 +787,6 @@ class ClusterCachesInfo {
         return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
     }
 
-    public void addJoinInfo() {
-        try {
-            Map<String, CacheConfiguration> grpCfgs = new HashMap<>();
-
-            for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) {
-                if (info.cacheData().config().getGroupName() == null)
-                    continue;
-
-                CacheConfiguration ccfg = grpCfgs.get(info.cacheData().config().getGroupName());
-
-                if (ccfg == null)
-                    grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config());
-                else
-                    validateCacheGroupConfiguration(ccfg, info.cacheData().config());
-            }
-
-            String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true);
-
-            if (conflictErr != null)
-                onJoinCacheException = new IgniteCheckedException("Failed to start configured cache. " + conflictErr);
-        }catch (IgniteCheckedException e){
-            onJoinCacheException = e;
-        }
-    }
-
     /**
      * Discovery event callback, executed from discovery thread.
      *
@@ -771,10 +813,7 @@ class ClusterCachesInfo {
 
             if (node.id().equals(ctx.discovery().localNode().id())) {
                 if (gridData == null) { // First node starts.
-                    assert joinDiscoData != null || !ctx.state().active();
-
-                    if (ctx.state().active())
-                        addJoinInfo();
+                    assert joinDiscoData != null;
 
                     initStartCachesForLocalJoin(true);
                 }
@@ -864,7 +903,7 @@ class ClusterCachesInfo {
         if (ctx.isDaemon() || data.commonData() == null)
             return;
 
-        assert joinDiscoData != null || disconnectedState() || !ctx.state().active();
+        assert joinDiscoData != null || disconnectedState();
         assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
 
         CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
@@ -965,7 +1004,7 @@ class ClusterCachesInfo {
             }
         }
 
-        gridData = new GridData(cachesData, conflictErr);
+        gridData = new GridData(joinDiscoData, cachesData, conflictErr);
 
         if (!disconnectedState())
             initStartCachesForLocalJoin(false);
@@ -977,11 +1016,20 @@ class ClusterCachesInfo {
      * @param firstNode {@code True} if first node in cluster starts.
      */
     private void initStartCachesForLocalJoin(boolean firstNode) {
-        assert locJoinStartCaches == null;
+        assert F.isEmpty(locJoinStartCaches) : locJoinStartCaches;
+
+        if (ctx.state().clusterState().transition()) {
+            joinOnTransition = true;
 
-        locJoinStartCaches = new ArrayList<>();
+            return;
+        }
 
         if (joinDiscoData != null) {
+            locJoinStartCaches = new ArrayList<>();
+            locCfgsForActivation = new HashMap<>();
+
+            boolean active = ctx.state().clusterState().active();
+
             for (DynamicCacheDescriptor desc : registeredCaches.values()) {
                 if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
                     continue;
@@ -997,13 +1045,13 @@ class ClusterCachesInfo {
 
                     DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
                         locCfg.cacheData().config(),
-                            desc.cacheType(),
-                            desc.groupDescriptor(),
-                            desc.template(),
-                            desc.receivedFrom(),
-                            desc.staticallyConfigured(),
-                            desc.sql(),
-                            desc.deploymentId(),
+                        desc.cacheType(),
+                        desc.groupDescriptor(),
+                        desc.template(),
+                        desc.receivedFrom(),
+                        desc.staticallyConfigured(),
+                        desc.sql(),
+                        desc.deploymentId(),
                         new QuerySchema(locCfg.cacheData().queryEntities()));
 
                     desc0.startTopologyVersion(desc.startTopologyVersion());
@@ -1016,14 +1064,126 @@ class ClusterCachesInfo {
                 if (locCfg != null ||
                     joinDiscoData.startCaches() ||
                     CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) {
-                    // Move system and internal caches first.
-                    if (desc.cacheType().userCache())
-                        locJoinStartCaches.add(new T2<>(desc, nearCfg));
+                    if (active) {
+                        // Move system and internal caches first.
+                        if (desc.cacheType().userCache())
+                            locJoinStartCaches.add(new T2<>(desc, nearCfg));
+                        else
+                            locJoinStartCaches.add(0, new T2<>(desc, nearCfg));
+                    }
                     else
-                        locJoinStartCaches.add(0, new T2<>(desc, nearCfg));
+                        locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg));
+                }
+            }
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+        if (joinOnTransition) {
+            initStartCachesForLocalJoin(false);
+
+            joinOnTransition = false;
+        }
+    }
+
+    /**
+     * @param msg Message.
+     * @param topVer Current topology version.
+     * @return Exchange action.
+     * @throws IgniteCheckedException If configuration validation failed.
+     */
+    ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
+        ExchangeActions exchangeActions = new ExchangeActions();
+
+        if (msg.activate()) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                desc.startTopologyVersion(topVer);
+
+                T2<CacheConfiguration, NearCacheConfiguration> locCfg = !F.isEmpty(locCfgsForActivation) ?
+                    locCfgsForActivation.get(desc.cacheName()) : null;
+
+                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
+                    desc.cacheName(),
+                    msg.initiatorNodeId());
+
+                req.startCacheConfiguration(desc.cacheConfiguration());
+                req.cacheType(desc.cacheType());
+
+                if (locCfg != null) {
+                    if (locCfg.get1() != null)
+                        req.startCacheConfiguration(locCfg.get1());
+
+                    req.nearCacheConfiguration(locCfg.get2());
+
+                    req.locallyConfigured(true);
+                }
+
+                exchangeActions.addCacheToStart(req, desc);
+            }
+
+            for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values())
+                exchangeActions.addCacheGroupToStart(grpDesc);
+
+            List<StoredCacheData> storedCfgs = msg.storedCacheConfigurations();
+
+            if (storedCfgs != null) {
+                List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
+
+                IgniteUuid deplymentId = IgniteUuid.fromUuid(msg.requestId());
+
+                for (StoredCacheData storedCfg : storedCfgs) {
+                    CacheConfiguration ccfg = storedCfg.config();
+
+                    if (!registeredCaches.containsKey(ccfg.getName())) {
+                        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
+                            ccfg.getName(),
+                            msg.initiatorNodeId());
+
+                        req.deploymentId(deplymentId);
+                        req.startCacheConfiguration(ccfg);
+                        req.cacheType(ctx.cache().cacheType(ccfg.getName()));
+                        req.schema(new QuerySchema(storedCfg.queryEntities()));
+
+                        reqs.add(req);
+                    }
+                }
+
+                CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, reqs, topVer, true);
+
+                if (!res.errs.isEmpty()) {
+                    IgniteCheckedException err = new IgniteCheckedException("Failed to activate cluster.");
+
+                    for (IgniteCheckedException err0 : res.errs)
+                        err.addSuppressed(err0);
+
+                    throw err;
                 }
             }
         }
+        else {
+            locCfgsForActivation = new HashMap<>();
+
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx,
+                    desc.cacheName(),
+                    desc.sql(),
+                    false);
+
+                exchangeActions.addCacheToStop(req, desc);
+
+                if (ctx.discovery().cacheClientNode(ctx.discovery().localNode(), desc.cacheName()))
+                    locCfgsForActivation.put(desc.cacheName(), new T2<>((CacheConfiguration)null, (NearCacheConfiguration)null));
+            }
+
+            for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values())
+                exchangeActions.addCacheGroupToStop(grpDesc, false);
+        }
+
+        return exchangeActions;
     }
 
     /**
@@ -1053,16 +1213,20 @@ class ClusterCachesInfo {
      * @param clientNodeId Client node ID.
      */
     private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) {
-        for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) {
-            String cacheName = cacheInfo.config().getName();
+        DiscoveryDataClusterState state = ctx.state().clusterState();
+
+        if (state.active() && !state.transition()) {
+            for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) {
+                String cacheName = cacheInfo.config().getName();
 
-            if (surviveReconnect(cacheName))
-                ctx.discovery().addClientNode(cacheName, clientNodeId, false);
-            else {
-                DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
+                if (surviveReconnect(cacheName))
+                    ctx.discovery().addClientNode(cacheName, clientNodeId, false);
+                else {
+                    DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
 
-                if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId()))
-                    ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache());
+                    if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId()))
+                        ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache());
+                }
             }
         }
     }
@@ -1371,6 +1535,7 @@ class ClusterCachesInfo {
      */
     void onDisconnect() {
         cachesOnDisconnect = new CachesOnDisconnect(
+            ctx.state().clusterState(),
             new HashMap<>(registeredCacheGrps),
             new HashMap<>(registeredCaches));
 
@@ -1382,57 +1547,82 @@ class ClusterCachesInfo {
     }
 
     /**
+     * @param active {@code True} if reconnected to active cluster.
+     * @param transition {@code True} if reconnected while state transition in progress.
      * @return Information about stopped caches and cache groups.
      */
-    ClusterCachesReconnectResult onReconnected() {
+    ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) {
         assert disconnectedState();
 
         Set<String> stoppedCaches = new HashSet<>();
         Set<Integer> stoppedCacheGrps = new HashSet<>();
 
-        for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) {
-            CacheGroupDescriptor locDesc = e.getValue();
-
-            CacheGroupDescriptor desc;
-            boolean stopped = true;
+        if (!active) {
+            joinOnTransition = transition;
 
-            if (locDesc.sharedGroup()) {
-                desc = cacheGroupByName(locDesc.groupName());
+            if (F.isEmpty(locCfgsForActivation)) {
+                locCfgsForActivation = new HashMap<>();
 
-                if (desc != null && desc.deploymentId().equals(locDesc.deploymentId()))
-                    stopped = false;
+                for (IgniteInternalCache cache : ctx.cache().caches()) {
+                    locCfgsForActivation.put(cache.name(),
+                        new T2<>((CacheConfiguration)null, cache.configuration().getNearConfiguration()));
+                }
             }
-            else {
-                desc = nonSharedCacheGroupByCacheName(locDesc.config().getName());
 
-                if (desc != null &&
-                    (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId())))
-                    stopped = false;
-            }
+            for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet())
+                stoppedCacheGrps.add(e.getValue().groupId());
 
-            if (stopped)
-                stoppedCacheGrps.add(locDesc.groupId());
-            else
-                assert locDesc.groupId() == desc.groupId();
+            for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet())
+                stoppedCaches.add(e.getKey());
         }
+        else {
+            for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) {
+                CacheGroupDescriptor locDesc = e.getValue();
 
-        for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) {
-            DynamicCacheDescriptor desc = e.getValue();
+                CacheGroupDescriptor desc;
+                boolean stopped = true;
 
-            String cacheName = e.getKey();
+                if (locDesc.sharedGroup()) {
+                    desc = cacheGroupByName(locDesc.groupName());
 
-            boolean stopped;
+                    if (desc != null && desc.deploymentId().equals(locDesc.deploymentId()))
+                        stopped = false;
+                }
+                else {
+                    desc = nonSharedCacheGroupByCacheName(locDesc.config().getName());
 
-            if (!surviveReconnect(cacheName) || !ctx.state().active()) {
-                DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName);
+                    if (desc != null &&
+                        (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId())))
+                        stopped = false;
+                }
 
-                stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+                if (stopped)
+                    stoppedCacheGrps.add(locDesc.groupId());
+                else
+                    assert locDesc.groupId() == desc.groupId();
             }
-            else
-                stopped = false;
 
-            if (stopped)
-                stoppedCaches.add(cacheName);
+            for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) {
+                DynamicCacheDescriptor desc = e.getValue();
+
+                String cacheName = e.getKey();
+
+                boolean stopped;
+
+                if (!surviveReconnect(cacheName)) {
+                    DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName);
+
+                    stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+                }
+                else
+                    stopped = false;
+
+                if (stopped)
+                    stoppedCaches.add(cacheName);
+            }
+
+            if (!cachesOnDisconnect.clusterActive())
+                initStartCachesForLocalJoin(false);
         }
 
         if (clientReconnectReqs != null) {
@@ -1450,7 +1640,7 @@ class ClusterCachesInfo {
     /**
      * @return {@code True} if client node is currently in disconnected state.
      */
-    public boolean disconnectedState() {
+    private boolean disconnectedState() {
         return cachesOnDisconnect != null;
     }
 
@@ -1465,27 +1655,23 @@ class ClusterCachesInfo {
     /**
      *
      */
-    void clearCaches() {
-        registeredCacheGrps.clear();
-
-        registeredCaches.clear();
-    }
-
-    /**
-     *
-     */
     private static class GridData {
         /** */
+        private final CacheJoinNodeDiscoveryData joinDiscoData;
+
+        /** */
         private final CacheNodeCommonDiscoveryData gridData;
 
         /** */
         private final String conflictErr;
 
         /**
+         * @param joinDiscoData Discovery data collected for local node join.
          * @param gridData Grid data.
          * @param conflictErr Cache configuration conflict error.
          */
-        GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) {
+        GridData(CacheJoinNodeDiscoveryData joinDiscoData, CacheNodeCommonDiscoveryData gridData, String conflictErr) {
+            this.joinDiscoData = joinDiscoData;
             this.gridData = gridData;
             this.conflictErr = conflictErr;
         }
@@ -1496,18 +1682,46 @@ class ClusterCachesInfo {
      */
     private static class CachesOnDisconnect {
         /** */
+        final DiscoveryDataClusterState state;
+
+        /** */
         final Map<Integer, CacheGroupDescriptor> cacheGrps;
 
         /** */
         final Map<String, DynamicCacheDescriptor> caches;
 
         /**
+         * @param state Cluster state.
          * @param cacheGrps Cache groups.
          * @param caches Caches.
          */
-        CachesOnDisconnect(Map<Integer, CacheGroupDescriptor> cacheGrps, Map<String, DynamicCacheDescriptor> caches) {
+        CachesOnDisconnect(DiscoveryDataClusterState state,
+            Map<Integer, CacheGroupDescriptor> cacheGrps,
+            Map<String, DynamicCacheDescriptor> caches) {
+            this.state = state;
             this.cacheGrps = cacheGrps;
             this.caches = caches;
         }
+
+        /**
+         * @return {@code True} if cluster was in active state.
+         */
+        boolean clusterActive() {
+            return state.active() && !state.transition();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheChangeProcessResult {
+        /** */
+        private boolean needExchange;
+
+        /** */
+        private final List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+
+        /** */
+        private final List<IgniteCheckedException> errs = new ArrayList<>();
     }
 }


Mime
View raw message