ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [10/50] [abbrv] ignite git commit: Reworked cluster activation/deactivation.
Date Fri, 07 Jul 2017 09:37:11 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java
deleted file mode 100644
index 1e1ef71..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-/**
- *
- */
-public enum ClusterState {
-    /**
-     * Cache is inactive. No operations are allowed, no partition assignments or rebalancing is performed.
-     */
-    INACTIVE,
-
-    /**
-     * Cache is active and operations. There are no lost partitions.
-     */
-    ACTIVE,
-
-    /**
-     * Cache is inactive. But process of it activation in progress.
-     */
-    TRANSITION
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 6d5eaf3..2fd8780 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -83,15 +83,15 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** */
     private UUID rcvdFrom;
 
-    /** Cache state. Set to non-null when global state is changed. */
-    private ClusterState state;
-
     /** Reset lost partitions flag. */
     private boolean resetLostPartitions;
 
     /** Dynamic schema. */
     private QuerySchema schema;
 
+    /** */
+    private transient boolean locallyConfigured;
+
     /**
      * @param reqId Unique request ID.
      * @param cacheName Cache stop name.
@@ -100,7 +100,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId) {
         assert reqId != null;
         assert cacheName != null;
-        assert initiatingNodeId != null;
 
         this.reqId = reqId;
         this.cacheName = cacheName;
@@ -108,21 +107,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     }
 
     /**
-     * @param reqId Unique request ID.
-     * @param state New cluster state.
-     * @param initiatingNodeId Initiating node ID.
-     */
-    public DynamicCacheChangeRequest(UUID reqId, ClusterState state, UUID initiatingNodeId) {
-        assert reqId != null;
-        assert state != null;
-        assert initiatingNodeId != null;
-
-        this.reqId = reqId;
-        this.state = state;
-        this.initiatingNodeId = initiatingNodeId;
-    }
-
-    /**
      * @param ctx Context.
      * @param cacheName Cache name.
      * @return Request to reset lost partitions.
@@ -183,20 +167,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     }
 
     /**
-     * @return State.
-     */
-    public ClusterState state() {
-        return state;
-    }
-
-    /**
-     * @return {@code True} if global caches state is changes.
-     */
-    public boolean globalStateChange() {
-        return state != null;
-    }
-
-    /**
      * @param template {@code True} if this is request for adding template configuration.
      */
     public void template(boolean template) {
@@ -253,7 +223,7 @@ public class DynamicCacheChangeRequest implements Serializable {
     }
 
     /**
-     *
+     * @return Destroy flag.
      */
     public boolean destroy(){
         return destroy;
@@ -420,6 +390,20 @@ public class DynamicCacheChangeRequest implements Serializable {
         this.schema = schema != null ? schema.copy() : null;
     }
 
+    /**
+     * @return Locally configured flag.
+     */
+    public boolean locallyConfigured() {
+        return locallyConfigured;
+    }
+
+    /**
+     * @param locallyConfigured Locally configured flag.
+     */
+    public void locallyConfigured(boolean locallyConfigured) {
+        this.locallyConfigured = locallyConfigured;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return "DynamicCacheChangeRequest [cacheName=" + cacheName() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 9caf9aa..e9ece5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -50,7 +50,7 @@ public class ExchangeActions {
     private Map<String, ActionData> cachesToResetLostParts;
 
     /** */
-    private ClusterState newState;
+    private StateChangeRequest stateChangeReq;
 
     /**
      * @param grpId Group ID.
@@ -89,7 +89,7 @@ public class ExchangeActions {
     /**
      * @return New caches start requests.
      */
-    Collection<ActionData> cacheStartRequests() {
+    public Collection<ActionData> cacheStartRequests() {
         return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
     }
 
@@ -184,19 +184,31 @@ public class ExchangeActions {
     }
 
     /**
-     * @param state New cluster state.
+     * @param stateChange Cluster state change request.
      */
-    void newClusterState(ClusterState state) {
-        assert state != null;
+    public void stateChangeRequest(StateChangeRequest stateChange) {
+        this.stateChangeReq = stateChange;
+    }
+
+    /**
+     * @return {@code True} if has deactivate request.
+     */
+    public boolean deactivate() {
+        return stateChangeReq != null && !stateChangeReq.activate();
+    }
 
-        newState = state;
+    /**
+     * @return {@code True} if has activate request.
+     */
+    public boolean activate() {
+        return stateChangeReq != null && stateChangeReq.activate();
     }
 
     /**
-     * @return New cluster state if state change was requested.
+     * @return Cluster state change request.
      */
-    @Nullable public ClusterState newClusterState() {
-        return newState;
+    @Nullable public StateChangeRequest stateChangeRequest() {
+        return stateChangeReq;
     }
 
     /**
@@ -328,13 +340,14 @@ public class ExchangeActions {
             F.isEmpty(cachesToStop) &&
             F.isEmpty(cacheGrpsToStart) &&
             F.isEmpty(cacheGrpsToStop) &&
-            F.isEmpty(cachesToResetLostParts);
+            F.isEmpty(cachesToResetLostParts) &&
+            stateChangeReq == null;
     }
 
     /**
      *
      */
-    static class ActionData {
+    public static class ActionData {
         /** */
         private final DynamicCacheChangeRequest req;
 
@@ -429,6 +442,6 @@ public class ExchangeActions {
             ", startGrps=" + startGrps +
             ", stopGrps=" + stopGrps +
             ", resetParts=" + (cachesToResetLostParts != null ? cachesToResetLostParts.keySet() : null) +
-            ", newState=" + newState + ']';
+            ", stateChangeRequest=" + stateChangeReq + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index a967305..a9692f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheEvent;
-import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.typedef.F;
@@ -32,7 +31,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 8ba10a2..7735f74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
-import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.eviction.EvictionFilter;
 import org.apache.ignite.cache.eviction.EvictionPolicy;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2de3808..f9d1114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1405,30 +1405,33 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param cctx Context.
      * @param topic Topic.
      * @param c Handler.
      */
-    public void addOrderedCacheHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) {
-        addOrderedHandler(false, topic, c);
+    public void addOrderedCacheHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) {
+        addOrderedHandler(cctx, false, topic, c);
     }
 
     /**
+     * @param cctx Context.
      * @param topic Topic.
      * @param c Handler.
      */
-    public void addOrderedCacheGroupHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) {
-        addOrderedHandler(true, topic, c);
+    public void addOrderedCacheGroupHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) {
+        addOrderedHandler(cctx, true, topic, c);
     }
 
     /**
      * Adds ordered message handler.
      *
+     * @param cctx Context.
      * @param cacheGrp {@code True} if cache group message, {@code false} if cache message.
      * @param topic Topic.
      * @param c Handler.
      */
     @SuppressWarnings({"unchecked"})
-    private void addOrderedHandler(boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+    private void addOrderedHandler(GridCacheSharedContext cctx, boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
         MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
 
         IgniteLogger log0 = log;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 24433de..a6907b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -274,10 +274,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         pendingExplicit = GridConcurrentFactory.newMap();
     }
 
-    /** {@inheritDoc} */
-    @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
-        if (!reconnect)
-            cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+    /**
+     * Cache futures listener must be registered after communication listener.
+     */
+    public void registerEventListener() {
+        cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 93310e3..22345d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -81,6 +82,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Ign
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
@@ -192,6 +195,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
 
+    /** Events received while cluster state transition was in progress. */
+    private final List<PendingDiscoveryEvent> pendingEvts = new ArrayList<>();
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -199,109 +205,53 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 return;
 
             try {
-                ClusterNode loc = cctx.localNode();
-
-                assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
-                    evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
-
-                final ClusterNode n = evt.eventNode();
-
-                GridDhtPartitionExchangeId exchId = null;
-                GridDhtPartitionsExchangeFuture exchFut = null;
-
-                if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
-                    assert !loc.id().equals(n.id());
-
-                    if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
-                        assert cctx.discovery().node(n.id()) == null;
-
-                        // Avoid race b/w initial future add and discovery event.
-                        GridDhtPartitionsExchangeFuture initFut = null;
-
-                        if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) {
-                            initFut = exchangeFuture(initialExchangeId(), null, null, null, null);
-
-                            initFut.onNodeLeft(n);
-                        }
-
-                        for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) {
-                            if (f != initFut)
-                                f.onNodeLeft(n);
-                        }
-                    }
+                if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+                    (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateMessage)) {
+                    ChangeGlobalStateMessage stateChangeMsg =
+                        (ChangeGlobalStateMessage)((DiscoveryCustomEvent)evt).customMessage();
 
-                    assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
-                        "Node joined with smaller-than-local " +
-                        "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
+                    if (stateChangeMsg.exchangeActions() == null)
+                        return;
 
-                    exchId = exchangeId(n.id(),
-                        affinityTopologyVersion(evt),
-                        evt.type());
+                    onDiscoveryEvent(evt, cache);
 
-                    exchFut = exchangeFuture(exchId, evt, cache,null, null);
+                    return;
                 }
-                else {
-                    DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
+                if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+                    (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateFinishMessage)) {
+                    ChangeGlobalStateFinishMessage stateFinishMsg =
+                        (ChangeGlobalStateFinishMessage)((DiscoveryCustomEvent)evt).customMessage();
 
-                    if (customMsg instanceof DynamicCacheChangeBatch) {
-                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
-
-                        ExchangeActions exchActions = batch.exchangeActions();
-
-                        if (exchActions != null) {
-                            exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+                    if (stateFinishMsg.clusterActive()) {
+                        for (PendingDiscoveryEvent pendingEvt : pendingEvts) {
+                            if (log.isDebugEnabled())
+                                log.debug("Process pending event: " + pendingEvt.event());
 
-                            exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
+                            onDiscoveryEvent(pendingEvt.event(), pendingEvt.discoCache());
                         }
                     }
-                    else if (customMsg instanceof CacheAffinityChangeMessage) {
-                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
-
-                        if (msg.exchangeId() == null) {
-                            if (msg.exchangeNeeded()) {
-                                exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
-
-                                exchFut = exchangeFuture(exchId, evt, cache, null, msg);
-                            }
-                        }
-                        else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion())
-                            exchangeFuture(msg.exchangeId(), null, null, null, null)
-                                .onAffinityChangeMessage(evt.eventNode(), msg);
+                    else {
+                        for (PendingDiscoveryEvent pendingEvt : pendingEvts)
+                            processEventInactive(pendingEvt.event(), pendingEvt.discoCache());
                     }
-                    else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage
-                        && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) {
-                        exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
-                        exchFut = exchangeFuture(exchId, evt, null, null, null);
-                    }
-                    else {
-                        // Process event as custom discovery task if needed.
-                        CachePartitionExchangeWorkerTask task =
-                            cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+                    pendingEvts.clear();
 
-                        if (task != null)
-                            exchWorker.addCustomTask(task);
-                    }
+                    return;
                 }
 
-                if (exchId != null) {
+                if (cache.state().transition()) {
                     if (log.isDebugEnabled())
-                        log.debug("Discovery event (will start exchange): " + exchId);
-
-                    // Event callback - without this callback future will never complete.
-                    exchFut.onEvent(exchId, evt, cache);
+                        log.debug("Add pending event: " + evt);
 
-                    // Start exchange process.
-                    addFuture(exchFut);
-                }
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Do not start exchange for discovery event: " + evt);
+                    pendingEvts.add(new PendingDiscoveryEvent(evt, cache));
                 }
+                else if (cache.state().active())
+                    onDiscoveryEvent(evt, cache);
+                else
+                    processEventInactive(evt, cache);
 
-                // Notify indexing engine about node leave so that we can re-map coordinator accordingly.
-                if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
-                    exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(evt.eventNode()));
+                notifyNodeFail(evt);
             }
             finally {
                 leaveBusy();
@@ -309,6 +259,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
     };
 
+    /**
+     * @param evt Event.
+     */
+    private void notifyNodeFail(DiscoveryEvent evt) {
+        if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
+            final ClusterNode n = evt.eventNode();
+
+            assert cctx.discovery().node(n.id()) == null;
+
+            for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
+                f.onNodeLeft(n);
+        }
+    }
+
+    /**
+     * @param evt Event.
+     * @param cache Discovery data cache.
+     */
+    private void processEventInactive(DiscoveryEvent evt, DiscoCache cache) {
+        if (log.isDebugEnabled())
+            log.debug("Ignore event, cluster is inactive: " + evt);
+   }
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
@@ -338,12 +311,158 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     processSinglePartitionRequest(node, msg);
                 }
             });
+
+        if (!cctx.kernalContext().clientNode()) {
+            for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedCacheGroupHandler(cctx, rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() {
+                    @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) {
+                        if (!enterBusy())
+                            return;
+
+                        try {
+                            CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId());
+
+                            if (grp != null) {
+                                if (m instanceof GridDhtPartitionSupplyMessage) {
+                                    grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m);
+
+                                    return;
+                                }
+                                else if (m instanceof GridDhtPartitionDemandMessage) {
+                                    grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m);
+
+                                    return;
+                                }
+                            }
+
+                            U.error(log, "Unsupported message type: " + m.getClass().getName());
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * Callback for local join event (needed since regular event for local join is not generated).
+     *
+     * @param evt Event.
+     * @param cache Cache.
+     */
+    public void onLocalJoin(DiscoveryEvent evt, DiscoCache cache) {
+        discoLsnr.onEvent(evt, cache);
+    }
+
+    /**
+     * @param evt Event.
+     * @param cache Discovery data cache.
+     */
+    private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) {
+        ClusterNode loc = cctx.localNode();
+
+        assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
+            evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
+
+        final ClusterNode n = evt.eventNode();
+
+        GridDhtPartitionExchangeId exchId = null;
+        GridDhtPartitionsExchangeFuture exchFut = null;
+
+        if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
+            assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > loc.order() :
+                "Node joined with smaller-than-local " +
+                    "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
+
+            exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+            exchFut = exchangeFuture(exchId, evt, cache,null, null);
+        }
+        else {
+            DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
+
+            if (customMsg instanceof ChangeGlobalStateMessage) {
+                ChangeGlobalStateMessage stateChangeMsg = (ChangeGlobalStateMessage)customMsg;
+
+                ExchangeActions exchActions = stateChangeMsg.exchangeActions();
+
+                if (exchActions != null) {
+                    exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+                    exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
+                }
+            }
+            else if (customMsg instanceof DynamicCacheChangeBatch) {
+                DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
+
+                ExchangeActions exchActions = batch.exchangeActions();
+
+                if (exchActions != null) {
+                    exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+                    exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
+                }
+            }
+            else if (customMsg instanceof CacheAffinityChangeMessage) {
+                CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
+
+                if (msg.exchangeId() == null) {
+                    if (msg.exchangeNeeded()) {
+                        exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+                        exchFut = exchangeFuture(exchId, evt, cache, null, msg);
+                    }
+                }
+                else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion())
+                    exchangeFuture(msg.exchangeId(), null, null, null, null)
+                        .onAffinityChangeMessage(evt.eventNode(), msg);
+            }
+            else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage
+                && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) {
+                exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+                exchFut = exchangeFuture(exchId, evt, null, null, null);
+            }
+            else {
+                // Process event as custom discovery task if needed.
+                CachePartitionExchangeWorkerTask task =
+                    cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+                if (task != null)
+                    exchWorker.addCustomTask(task);
+            }
+        }
+
+        if (exchId != null) {
+            if (log.isDebugEnabled())
+                log.debug("Discovery event (will start exchange): " + exchId);
+
+            // Event callback - without this callback future will never complete.
+            exchFut.onEvent(exchId, evt, cache);
+
+            // Start exchange process.
+            addFuture(exchFut);
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Do not start exchange for discovery event: " + evt);
+        }
+
+        notifyNodeFail(evt);
+
+        // Notify indexing engine about node leave so that we can re-map coordinator accordingly.
+        if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
+            exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(evt.eventNode()));
     }
 
     /**
      * @param task Task to run in exchange worker thread.
      */
-    public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+    void addCustomTask(CachePartitionExchangeWorkerTask task) {
         assert task != null;
 
         exchWorker.addCustomTask(task);
@@ -371,9 +490,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         return exchangeId(cctx.localNode().id(), startTopVer, EVT_NODE_JOINED);
     }
 
-    /** {@inheritDoc} */
-    @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
-        super.onKernalStart0(reconnect);
+    /**
+     * @param active Cluster state.
+     * @param reconnect Reconnect flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onKernalStart(boolean active, boolean reconnect) throws IgniteCheckedException {
+        for (ClusterNode n : cctx.discovery().remoteNodes())
+            cctx.versions().onReceived(n.id(), n.metrics().getLastDataVersion());
 
         ClusterNode loc = cctx.localNode();
 
@@ -381,79 +505,49 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         assert startTime > 0;
 
-        // Generate dummy discovery event for local node joining.
-        T2<DiscoveryEvent, DiscoCache> locJoin = cctx.discovery().localJoin();
-
-        DiscoveryEvent discoEvt = locJoin.get1();
-        DiscoCache discoCache = locJoin.get2();
-
-        GridDhtPartitionExchangeId exchId = initialExchangeId();
+        DiscoveryLocalJoinData locJoin = cctx.discovery().localJoin();
 
-        GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null);
+        GridDhtPartitionsExchangeFuture fut = null;
 
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        exchWorker.addFirstExchangeFuture(fut);
-
-        if (!cctx.kernalContext().clientNode()) {
-            for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
-                final int idx = cnt;
-
-                cctx.io().addOrderedCacheGroupHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() {
-                    @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) {
-                        if (!enterBusy())
-                            return;
-
-                        try {
-                            CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId());
-
-                            if (grp != null) {
-                                if (m instanceof GridDhtPartitionSupplyMessage) {
-                                    grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m);
-
-                                    return;
-                                }
-                                else if (m instanceof GridDhtPartitionDemandMessage) {
-                                    grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m);
+        if (active) {
+            DiscoveryEvent discoEvt = locJoin.event();
+            DiscoCache discoCache = locJoin.discoCache();
 
-                                    return;
-                                }
-                            }
+            GridDhtPartitionExchangeId exchId = initialExchangeId();
 
-                            U.error(log, "Unsupported message type: " + m.getClass().getName());
-                        }
-                        finally {
-                            leaveBusy();
-                        }
-                    }
-                });
-            }
+            fut = exchangeFuture(exchId, discoEvt, discoCache, null, null);
         }
+        else if (reconnect)
+            reconnectExchangeFut.onDone();
 
         new IgniteThread(cctx.igniteInstanceName(), "exchange-worker", exchWorker).start();
 
         if (reconnect) {
-            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    try {
-                        fut.get();
+            if (fut != null) {
+                fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                        try {
+                            fut.get();
 
-                        for (CacheGroupContext grp : cctx.cache().cacheGroups())
-                            grp.preloader().onInitialExchangeComplete(null);
+                            for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                                grp.preloader().onInitialExchangeComplete(null);
 
-                        reconnectExchangeFut.onDone();
-                    }
-                    catch (IgniteCheckedException e) {
-                        for (CacheGroupContext grp : cctx.cache().cacheGroups())
-                            grp.preloader().onInitialExchangeComplete(e);
+                            reconnectExchangeFut.onDone();
+                        }
+                        catch (IgniteCheckedException e) {
+                            for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                                grp.preloader().onInitialExchangeComplete(e);
 
-                        reconnectExchangeFut.onDone(e);
+                            reconnectExchangeFut.onDone(e);
+                        }
                     }
-                }
-            });
+                });
+            }
         }
-        else {
+        else if (fut != null) {
             if (log.isDebugEnabled())
                 log.debug("Beginning to wait on local exchange future: " + fut);
 
@@ -489,10 +583,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
             }
 
-            AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0);
-
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (nodeStartVer.equals(grp.localStartVersion()))
+                if (locJoin.joinTopologyVersion().equals(grp.localStartVersion()))
                     grp.preloader().onInitialExchangeComplete(null);
             }
 
@@ -1669,28 +1761,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
      * Exchange future thread. All exchanges happen only by one thread and next
      * exchange will not start until previous one completes.
      */
@@ -1710,15 +1780,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
 
         /**
-         * Add first exchange future.
-         *
-         * @param exchFut Exchange future.
-         */
-        void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
-            futQ.addFirst(exchFut);
-        }
-
-        /**
          * @param exchFut Exchange future.
          */
         void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
@@ -1946,7 +2007,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             }
                         }
 
-                        if (!exchFut.skipPreload() && cctx.kernalContext().state().active()) {
+                        if (!exchFut.skipPreload() ) {
                             assignsMap = new HashMap<>();
 
                             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0f859eb..624dec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -107,6 +107,9 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactio
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.query.QuerySchema;
@@ -692,36 +695,27 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheSharedManager mgr : sharedCtx.managers())
             mgr.start(sharedCtx);
 
-        if (ctx.config().isDaemon()) {
-            ctx.state().cacheProcessorStarted(new CacheJoinNodeDiscoveryData(
-                IgniteUuid.randomUuid(),
-                Collections.<String, CacheInfo>emptyMap(),
-                Collections.<String, CacheInfo>emptyMap(),
-                false
-            ));
-
-            return;
-        }
-
-        Map<String, CacheInfo> caches = new HashMap<>();
+        if (!ctx.isDaemon()) {
+            Map<String, CacheInfo> caches = new HashMap<>();
 
-        Map<String, CacheInfo> templates = new HashMap<>();
+            Map<String, CacheInfo> templates = new HashMap<>();
 
-        addCacheOnJoinFromConfig(caches, templates);
+            addCacheOnJoinFromConfig(caches, templates);
 
-        CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
-            IgniteUuid.randomUuid(),
-            caches,
-            templates,
-            startAllCachesOnClientStart()
-        );
+            CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
+                IgniteUuid.randomUuid(),
+                caches,
+                templates,
+                startAllCachesOnClientStart()
+            );
 
-        cachesInfo.onStart(discoData);
+            cachesInfo.onStart(discoData);
 
-        if (log.isDebugEnabled())
-            log.debug("Started cache processor.");
+            if (log.isDebugEnabled())
+                log.debug("Started cache processor.");
+        }
 
-        ctx.state().cacheProcessorStarted(discoData);
+        ctx.state().cacheProcessorStarted();
     }
 
     /**
@@ -830,51 +824,38 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        boolean active = ctx.state().active();
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return;
 
         try {
-            boolean checkConsistency =
-                !ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
+            boolean checkConsistency = !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
 
             if (checkConsistency)
                 checkConsistency();
 
-            if (active && cachesInfo.onJoinCacheException() != null)
-                throw new IgniteCheckedException(cachesInfo.onJoinCacheException());
-
             cachesInfo.onKernalStart(checkConsistency);
 
-            if (active && !ctx.clientNode() && !ctx.isDaemon())
-                sharedCtx.database().lock();
-
-            // Must start database before start first cache.
-            sharedCtx.database().onKernalStart(false);
-
             ctx.query().onCacheKernalStart();
 
-            // In shared context, we start exchange manager and wait until processed local join
-            // event, all caches which we get on join will be start.
-            for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
-                if (sharedCtx.database() != mgr)
-                    mgr.onKernalStart(false);
-            }
+            sharedCtx.mvcc().registerEventListener();
+
+            sharedCtx.exchange().onKernalStart(active, false);
         }
         finally {
             cacheStartedLatch.countDown();
         }
 
+        if (!ctx.clientNode())
+            addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
+
         // Escape if cluster inactive.
         if (!active)
             return;
 
-        if (!ctx.config().isDaemon())
-            ctx.cacheObjects().onUtilityCacheStarted();
-
         ctx.service().onUtilityCacheStarted();
 
-        final AffinityTopologyVersion startTopVer =
-            new AffinityTopologyVersion(ctx.discovery().localJoinEvent().topologyVersion(), 0);
+        final AffinityTopologyVersion startTopVer = ctx.discovery().localJoin().joinTopologyVersion();
 
         final List<IgniteInternalFuture> syncFuts = new ArrayList<>(caches.size());
 
@@ -894,15 +875,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         });
 
-        // Avoid iterator creation.
-        //noinspection ForLoopReplaceableByForEach
         for (int i = 0, size = syncFuts.size(); i < size; i++)
             syncFuts.get(i).get();
-
-        assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
-
-        if (!ctx.clientNode() && !ctx.isDaemon())
-            addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
     }
 
     /**
@@ -969,8 +943,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         for (CacheGroupContext grp : cacheGrps.values())
             stopCacheGroup(grp.groupId());
-
-        cachesInfo.clearCaches();
     }
 
     /**
@@ -1097,7 +1069,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
         List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
 
-        ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected();
+        DiscoveryDataClusterState state = ctx.state().clusterState();
+
+        boolean active = state.active() && !state.transition();
+
+        ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected(active, state.transition());
 
         final List<GridCacheAdapter> stoppedCaches = new ArrayList<>();
 
@@ -1135,7 +1111,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 grp.onReconnected();
         }
 
-        sharedCtx.onReconnected();
+        sharedCtx.onReconnected(active);
 
         for (GridCacheAdapter cache : reconnected)
             cache.context().gate().reconnected(false);
@@ -1750,17 +1726,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Caches to be started when this node starts.
+     */
+    public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+        return cachesInfo.cachesToStartOnLocalJoin();
+    }
+
+    /**
+     * @param caches Caches to start.
      * @param exchTopVer Current exchange version.
      * @throws IgniteCheckedException If failed.
      */
-    public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
-        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cachesInfo.cachesToStartOnLocalJoin();
-
+    public void startCachesOnLocalJoin(List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches,
+        AffinityTopologyVersion exchTopVer)
+        throws IgniteCheckedException {
         if (!F.isEmpty(caches)) {
             for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : caches) {
                 DynamicCacheDescriptor desc = t.get1();
 
                 prepareCacheStart(
+                    desc.cacheConfiguration(),
                     desc,
                     t.get2(),
                     exchTopVer
@@ -1787,6 +1772,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
                     prepareCacheStart(
+                        desc.cacheConfiguration(),
                         desc,
                         null,
                         exchTopVer
@@ -1799,17 +1785,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param startCfg Cache configuration to use.
      * @param desc Cache descriptor.
      * @param reqNearCfg Near configuration if specified for client cache start request.
      * @param exchTopVer Current exchange version.
      * @throws IgniteCheckedException If failed.
      */
-    public void prepareCacheStart(
+    void prepareCacheStart(
+        CacheConfiguration startCfg,
         DynamicCacheDescriptor desc,
         @Nullable NearCacheConfiguration reqNearCfg,
         AffinityTopologyVersion exchTopVer
     ) throws IgniteCheckedException {
-        CacheConfiguration startCfg = desc.cacheConfiguration();
         assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
 
         CacheConfiguration ccfg = new CacheConfiguration(startCfg);
@@ -2003,7 +1990,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             sharedCtx.removeCacheContext(ctx);
 
-            onKernalStop(cache, destroy);
+            onKernalStop(cache, true);
 
             stopCache(cache, true, destroy);
 
@@ -2017,9 +2004,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param startTopVer Cache start version.
      * @param err Cache start error if any.
      */
-    void initCacheProxies(
-        AffinityTopologyVersion startTopVer, @Nullable
-        Throwable err) {
+    void initCacheProxies(AffinityTopologyVersion startTopVer, @Nullable Throwable err) {
         for (GridCacheAdapter<?, ?> cache : caches.values()) {
             GridCacheContext<?, ?> cacheCtx = cache.context();
 
@@ -2122,7 +2107,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (exchActions == null)
             return;
 
-        if (exchActions.systemCachesStarting() && exchActions.newClusterState() == null)
+        if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null)
             ctx.dataStructures().restoreStructuresState(ctx);
 
         if (err == null) {
@@ -2143,9 +2128,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 try {
                     prepareCacheStop(action.request().cacheName(), action.request().destroy());
-
-                    if (exchActions.newClusterState() == null)
-                        ctx.state().onCacheStop(action.request());
                 }
                 finally {
                     sharedCtx.database().checkpointReadUnlock();
@@ -2166,6 +2148,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             if (!sharedCtx.kernalContext().clientNode())
                 sharedCtx.database().onCacheGroupsStopped(stoppedGroups);
+
+            if (exchActions.deactivate())
+                sharedCtx.deactivate();
         }
     }
 
@@ -2204,10 +2189,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param req Request to complete future for.
+     * @param success Future result.
      * @param err Error if any.
      */
     void completeCacheStartFuture(DynamicCacheChangeRequest req, boolean success, @Nullable Throwable err) {
-        if (req.initiatingNodeId().equals(ctx.localNodeId())) {
+        if (ctx.localNodeId().equals(req.initiatingNodeId())) {
             DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
 
             if (fut != null)
@@ -2304,30 +2290,35 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        if (ctx.state().active())
-            cachesInfo.collectGridNodeData(dataBag);
-        else
-            ctx.state().collectGridNodeData0(dataBag);
+        cachesInfo.collectGridNodeData(dataBag);
     }
 
     /** {@inheritDoc} */
     @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
-        if (ctx.state().active())
-            cachesInfo.onJoiningNodeDataReceived(data);
-
-        ctx.state().onJoiningNodeDataReceived0(data);
+        cachesInfo.onJoiningNodeDataReceived(data);
     }
 
     /** {@inheritDoc} */
     @Override public void onGridDataReceived(GridDiscoveryData data) {
-        if (ctx.state().active()) {
-            if (!cachesInfo.disconnectedState())
-                cachesInfo.addJoinInfo();
+        cachesInfo.onGridDataReceived(data);
+    }
 
-            cachesInfo.onGridDataReceived(data);
-        }
+    /**
+     * @param msg Message.
+     */
+    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+        cachesInfo.onStateChangeFinish(msg);
+    }
 
-        ctx.state().onGridDataReceived0(data);
+    /**
+     * @param msg Message.
+     * @param topVer Current topology version.
+     * @throws IgniteCheckedException If configuration validation failed.
+     * @return Exchange actions.
+     */
+    public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
+        return cachesInfo.onStateChangeRequest(msg, topVer);
     }
 
     /**
@@ -2929,13 +2920,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param type Event type.
+     * @param customMsg Custom message instance.
      * @param node Event node.
      * @param topVer Topology version.
+     * @param state Cluster state.
      */
-    public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+    public void onDiscoveryEvent(int type,
+        @Nullable DiscoveryCustomMessage customMsg,
+        ClusterNode node,
+        AffinityTopologyVersion topVer,
+        DiscoveryDataClusterState state) {
         cachesInfo.onDiscoveryEvent(type, node, topVer);
 
-        sharedCtx.affinity().onDiscoveryEvent(type, node, topVer);
+        sharedCtx.affinity().onDiscoveryEvent(type, customMsg, node, topVer, state);
     }
 
     /**
@@ -3214,7 +3211,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     proxy = new IgniteCacheProxy(cacheAdapter.context(), cacheAdapter, null, false);
             }
 
-            assert proxy != null;
+            assert proxy != null : name;
 
             return proxy.internalProxy();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 75d03d7..9adca8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -156,6 +157,9 @@ public class GridCacheSharedContext<K, V> {
     /** Concurrent DHT atomic updates counters. */
     private AtomicIntegerArray dhtAtomicUpdCnt;
 
+    /** */
+    private final List<IgniteChangeGlobalStateSupport> stateAwareMgrs;
+
     /**
      * @param kernalCtx  Context.
      * @param txMgr Transaction manager.
@@ -207,6 +211,49 @@ public class GridCacheSharedContext<K, V> {
         txFinishMsgLog = kernalCtx.log(CU.TX_MSG_FINISH_LOG_CATEGORY);
         txLockMsgLog = kernalCtx.log(CU.TX_MSG_LOCK_LOG_CATEGORY);
         txRecoveryMsgLog = kernalCtx.log(CU.TX_MSG_RECOVERY_LOG_CATEGORY);
+
+        stateAwareMgrs = new ArrayList<>();
+
+        if (pageStoreMgr != null)
+            stateAwareMgrs.add(pageStoreMgr);
+
+        if (walMgr != null)
+            stateAwareMgrs.add(walMgr);
+
+        stateAwareMgrs.add(dbMgr);
+
+        stateAwareMgrs.add(snpMgr);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void activate() throws IgniteCheckedException {
+        if (!kernalCtx.clientNode())
+            dbMgr.lock();
+
+        boolean success = false;
+
+        try {
+            for (IgniteChangeGlobalStateSupport mgr : stateAwareMgrs)
+                mgr.onActivate(kernalCtx);
+
+            success = true;
+        }
+        finally {
+            if (!success) {
+                if (!kernalCtx.clientNode())
+                    dbMgr.unLock();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    public void deactivate() {
+        for (int i = stateAwareMgrs.size() - 1; i >= 0; i--)
+            stateAwareMgrs.get(i).onDeActivate(kernalCtx);
     }
 
     /**
@@ -272,12 +319,15 @@ public class GridCacheSharedContext<K, V> {
             if (restartOnDisconnect(mgr))
                 mgr.stop(true);
         }
+
+        deactivate();
     }
 
     /**
+     * @param active Active flag.
      * @throws IgniteCheckedException If failed.
      */
-    void onReconnected() throws IgniteCheckedException {
+    void onReconnected(boolean active) throws IgniteCheckedException {
         List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>();
 
         setManagers(mgrs, txMgr,
@@ -303,8 +353,10 @@ public class GridCacheSharedContext<K, V> {
 
         kernalCtx.query().onCacheReconnect();
 
-        for (GridCacheSharedManager<?, ?> mgr : mgrs)
-            mgr.onKernalStart(true);
+        if (!active)
+            affinity().removeAllCacheInfo();
+
+        exchMgr.onKernalStart(active, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
index e0e4090..bc1bbb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
@@ -40,12 +40,6 @@ public interface GridCacheSharedManager<K, V> {
     public void stop(boolean cancel);
 
     /**
-     * @param reconnect {@code True} if manager restarted after client reconnect.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void onKernalStart(boolean reconnect) throws IgniteCheckedException;
-
-    /**
      * @param cancel Cancel flag.
      */
     public void onKernalStop(boolean cancel);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index f6f79e4..90ae670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -112,14 +112,6 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
     }
 
     /** {@inheritDoc} */
-    @Override public final void onKernalStart(boolean reconnect) throws IgniteCheckedException {
-        onKernalStart0(reconnect);
-
-        if (!reconnect && log != null && log.isDebugEnabled())
-            log.debug(kernalStartInfo());
-    }
-
-    /** {@inheritDoc} */
     @Override public final void onKernalStop(boolean cancel) {
         if (!starting.get())
             // Ignoring attempt to stop manager that has never been started.
@@ -132,14 +124,6 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
     }
 
     /**
-     * @param reconnect {@code True} if manager restarted after client reconnect.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /**
      * @param cancel Cancel flag.
      */
     protected void onKernalStop0(boolean cancel) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java
new file mode 100644
index 0000000..b4274f7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class PendingDiscoveryEvent {
+    /** */
+    private final DiscoveryEvent evt;
+
+    /** */
+    private final DiscoCache cache;
+
+    /**
+     * @param evt Event.
+     * @param cache Discovery data cache.
+     */
+    public PendingDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) {
+        this.evt = evt;
+        this.cache = cache;
+    }
+
+    /**
+     * @return Event.
+     */
+    public DiscoveryEvent event() {
+        return evt;
+    }
+
+    /**
+     * @return Discovery data cache.
+     */
+    public DiscoCache discoCache() {
+        return cache;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PendingDiscoveryEvent.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
new file mode 100644
index 0000000..2d35e81
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class StateChangeRequest {
+    /** */
+    private final ChangeGlobalStateMessage msg;
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /**
+     * @param msg Message.
+     * @param topVer State change topology versoin.
+     */
+    public StateChangeRequest(ChangeGlobalStateMessage msg,
+        AffinityTopologyVersion topVer) {
+        this.msg = msg;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return State change exchange version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return State change request ID.
+     */
+    public UUID requestId() {
+        return msg.requestId();
+    }
+
+    /**
+     * @return New state.
+     */
+    public boolean activate() {
+        return msg.activate();
+    }
+
+    /**
+     * @return Node initiated state change process.
+     */
+    public UUID initiatorNodeId() {
+        return msg.initiatorNodeId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StateChangeRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index c2c71ea..0065e41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -258,8 +258,8 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+        super.onKernalStart(active);
 
         discoveryStarted = true;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 1c97de2..d6b45b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -29,7 +29,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.GridLeanMap;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 38d0108..960b91a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCachePreloader;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 123d26b..0ea48e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -27,7 +27,6 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 6392d0a..439bb9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -19,13 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index c205c3b..57ce323 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -424,11 +423,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         throws IgniteCheckedException {
         DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
-        ClusterState newState = exchFut.newClusterState();
-
-        treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE)
-            || (ctx.kernalContext().state().active()
-            && discoEvt.type() == EventType.EVT_NODE_JOINED
+        treatAllPartAsLoc = exchFut.activateCluster()
+            || (discoEvt.type() == EventType.EVT_NODE_JOINED
             && discoEvt.eventNode().isLocal()
             && !ctx.kernalContext().clientNode()
         );
@@ -611,7 +607,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (locPart != null) {
                         GridDhtPartitionState state = locPart.state();
 
-                        if (state == MOVING && ctx.kernalContext().state().active()) {
+                        if (state == MOVING) {
                             locPart.rent(false);
 
                             updateSeq = updateLocal(p, locPart.state(), updateSeq);
@@ -1773,9 +1769,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return Checks if any of the local partitions need to be evicted.
      */
     private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
-        if (!ctx.kernalContext().state().active())
-            return false;
-
         boolean changed = false;
 
         UUID locId = ctx.localNodeId();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index e70f383..d04870a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -80,7 +80,7 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
         if (err != null)
             return err;
 
-        if (!cctx.shared().kernalContext().state().active())
+        if (!cctx.shared().kernalContext().state().publicApiActiveState())
             return new CacheInvalidStateException(
                 "Failed to perform cache operation (cluster is not activated): " + cctx.name());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index cfecb1c..d66afca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -18,16 +18,13 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
-import org.apache.ignite.internal.IgniteDiagnosticMessage;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 1bd8ec5..6fe96a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 763b43b..fe216a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 774f0ce..e7e95b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -161,6 +161,8 @@ public class GridDhtPartitionDemander {
         lastExchangeFut = null;
 
         lastTimeoutObj.set(null);
+
+        syncFut.onDone();
     }
 
     /**


Mime
View raw message