ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/3] ignite git commit: gg-12389
Date Thu, 29 Jun 2017 11:27:23 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-gg-12389 [created] 10649c7be


http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 031c596..204f797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -26,41 +26,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
-import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheClientReconnectDiscoveryData;
-import org.apache.ignite.internal.processors.cache.CacheData;
-import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData;
-import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData.CacheInfo;
-import org.apache.ignite.internal.processors.cache.CacheNodeCommonDiscoveryData;
-import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
-import org.apache.ignite.internal.processors.cache.ClusterState;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
-import org.apache.ignite.internal.processors.cache.StoredCacheData;
-import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.processors.cache.StateChangeRequest;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -70,33 +54,22 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.STATE_PROC;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE;
-import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE;
-import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION;
-import static org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest.stopRequest;
 
 /**
  *
  */
 public class GridClusterStateProcessor extends GridProcessorAdapter {
-    /** Global status. */
-    private volatile ClusterState globalState;
-
-    /** Action context. */
-    private volatile ChangeGlobalStateContext lastCgsCtx;
+    /** */
+    private DiscoveryDataClusterState globalState;
 
     /** Local action future. */
     private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>();
@@ -109,12 +82,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     @GridToStringExclude
     private GridCacheSharedContext<?, ?> sharedCtx;
 
-    /** */
-    private final ConcurrentHashMap<String, CacheInfo> cacheData = new ConcurrentHashMap<>();
-
-    /** */
-    private volatile CacheJoinNodeDiscoveryData localCacheData;
-
     /** Listener. */
     private final GridLocalEventListener lsr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -142,68 +109,118 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         super(ctx);
     }
 
+    public boolean publicApiActiveState() {
+        assert globalState != null;
+
+        return globalState.transition() ? !globalState.active() : globalState.active();
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        // Start first node as inactive if persistent enable.
-        globalState = ctx.config().isPersistentStoreEnabled() ? INACTIVE :
-            ctx.config().isActiveOnStart() ? ACTIVE : INACTIVE;
+        // Start first node as inactive if persistence is enabled.
+        boolean activeOnStart = !ctx.config().isPersistentStoreEnabled() && ctx.config().isActiveOnStart();
 
-        ctx.discovery().setCustomEventListener(
-            ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() {
-                @Override public void onCustomEvent(
-                    AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) {
-                    assert topVer != null;
-                    assert snd != null;
-                    assert msg != null;
+        globalState = DiscoveryDataClusterState.createState(activeOnStart);
 
-                    boolean activate = msg.activate();
+        ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+    }
 
-                    ChangeGlobalStateContext actx = lastCgsCtx;
+    public boolean onStateChangeMessage(AffinityTopologyVersion topVer,
+        ChangeGlobalStateMessage msg,
+        DiscoCache discoCache) {
+        if (globalState.transition()) {
+            if (globalState.active() != msg.activate()) {
+                GridChangeGlobalStateFuture fut = changeStateFuture(msg);
 
-                    if (actx != null && globalState == TRANSITION) {
-                        GridChangeGlobalStateFuture f = cgsLocFut.get();
+                if (fut != null)
+                    fut.onDone(concurrentStateChangeError(msg.activate()));
+            }
+            else {
+                final GridChangeGlobalStateFuture stateFut = changeStateFuture(msg);
+
+                if (stateFut != null) {
+                    // TODO GG-12389, check for result.
+                    IgniteInternalFuture<?> exchFut = ctx.cache().context().exchange().affinityReadyFuture(
+                        globalState.transitionTopologyVersion());
+
+                    if (exchFut == null)
+                        exchFut = new GridFinishedFuture<>();
+
+                    exchFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> exchFut) {
+                            stateFut.onDone();
+                        }
+                    });
+                }
+            }
+        }
+        else {
+            if (globalState.active() != msg.activate()) {
+//                if (!ctx.localNodeId().equals(msg.initiatorNodeId()))
+//                    cgsLocFut.compareAndSet(null, new GridChangeGlobalStateFuture(msg.requestId(), msg.activate(), ctx));
+// TODO GG-12389
+                Set<UUID> nodeIds = U.newHashSet(discoCache.allNodes().size());
 
-                        if (log.isDebugEnabled())
-                            log.debug("Concurrent " + prettyStr(activate) + " [id=" +
-                                ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]");
+                for (ClusterNode node : discoCache.allNodes())
+                    nodeIds.add(node.id());
 
-                        if (f != null && f.requestId.equals(msg.requestId()))
-                            f.onDone(new IgniteCheckedException(
-                                "Concurrent change state, now in progress=" + (activate)
-                                    + ", initiatingNodeId=" + actx.initiatingNodeId
-                                    + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId()
-                            ));
+                GridChangeGlobalStateFuture fut = changeStateFuture(msg);
 
-                        msg.concurrentChangeState();
-                    }
-                    else {
-                        if (log.isInfoEnabled())
-                            log.info("Create " + prettyStr(activate) + " context [id=" +
-                                ctx.localNodeId() + " topVer=" + topVer + ", reqId=" +
-                                msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]");
-
-                        lastCgsCtx = new ChangeGlobalStateContext(
-                            msg.requestId(),
-                            msg.initiatorNodeId(),
-                            msg.getDynamicCacheChangeBatch(),
-                            msg.activate());
-
-                        globalState = TRANSITION;
-                    }
-                }
-            });
+                if (fut != null)
+                    fut.setRemaining(nodeIds, topVer.nextMinorVersion());
 
-        ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+                globalState = DiscoveryDataClusterState.createTransitionState(msg.activate(), topVer, nodeIds);
+
+                ExchangeActions exchangeActions = new ExchangeActions();
+
+                StateChangeRequest req = new StateChangeRequest(msg, topVer.nextMinorVersion());
+
+                exchangeActions.stateChangeRequest(req);
+
+                ctx.cache().onStateChangeRequest(exchangeActions);
+
+                msg.exchangeActions(exchangeActions);
+
+                return true;
+            }
+            else {
+                GridChangeGlobalStateFuture stateFut = changeStateFuture(msg);
+
+                if (stateFut != null)
+                    stateFut.onDone();
+            }
+        }
+
+        return false;
     }
 
     /**
-     * @param data Joining node discovery data.
+     * @return Current cluster state, should be called only from discovery thread.
      */
-    public void cacheProcessorStarted(CacheJoinNodeDiscoveryData data) {
-        assert data != null;
+    public DiscoveryDataClusterState clusterState() {
+        return globalState;
+    }
+
+    @Nullable private GridChangeGlobalStateFuture changeStateFuture(ChangeGlobalStateMessage msg) {
+        if (msg.initiatorNodeId().equals(ctx.localNodeId())) {
+            GridChangeGlobalStateFuture fut = cgsLocFut.get();
+
+            if (fut != null && fut.requestId.equals(msg.requestId()))
+                return fut;
+        }
 
-        localCacheData = data;
+        return null;
+    }
+
+    private IgniteCheckedException concurrentStateChangeError(boolean activate) {
+        return new IgniteCheckedException("Failed to " + prettyStr(activate) +
+            ", because another state change operation is currently in progress: " + prettyStr(!activate));
+    }
 
+    /**
+     *
+     */
+    public void cacheProcessorStarted() {
         cacheProc = ctx.cache();
         sharedCtx = cacheProc.context();
 
@@ -221,49 +238,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         super.stop(cancel);
 
         sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class);
+
         ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(
+        IgniteCheckedException stopErr = new IgniteCheckedException(
             "Node is stopping: " + ctx.igniteInstanceName());
 
         GridChangeGlobalStateFuture f = cgsLocFut.get();
 
         if (f != null)
             f.onDone(stopErr);
-
-        cgsLocFut.set(null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        if (ctx.isDaemon())
-            return;
-
-        List<ClusterNode> nodes = ctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
-
-        assert localCacheData != null;
-
-        // First node started (coordinator).
-        if (nodes.isEmpty() || nodes.get(0).isLocal())
-            cacheData.putAll(localCacheData.caches());
-
-        if (globalState == INACTIVE) { // Accept inactivate state after join.
-            if (log != null && log.isInfoEnabled())
-                log.info("Got inactivate state from cluster during node join.");
-
-            // Revert start action if get INACTIVE state on join.
-            sharedCtx.snapshot().onDeActivate(ctx);
-
-            if (sharedCtx.pageStore() != null)
-                sharedCtx.pageStore().onDeActivate(ctx);
-
-            if (sharedCtx.wal() != null)
-                sharedCtx.wal().onDeActivate(ctx);
-
-            sharedCtx.database().onDeActivate(ctx);
-        }
     }
 
     /** {@inheritDoc} */
@@ -279,394 +263,85 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
-        ClusterState state = (ClusterState)data.commonData();
+        DiscoveryDataClusterState state = (DiscoveryDataClusterState)data.commonData();
+
+        assert state != null : data;
 
-        if (state != null)
-            globalState = state;
+        globalState = state;
     }
 
     /**
      *
      */
     public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
-        if (ctx.isDaemon()) {
-            GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
-
-            sendCompute(activate, fut);
-
-            return fut;
+        if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
+                " cluster (must invoke the method outside of an active transaction)."));
         }
 
-        if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null)
-            throw new IgniteException("Failed to " + prettyStr(activate) + " cluster (must invoke the " +
-                "method outside of an active transaction).");
+        DiscoveryDataClusterState curState = globalState;
 
-        if ((globalState == ACTIVE && activate) || (globalState == INACTIVE && !activate))
+        if (!curState.transition() && curState.active() == activate)
             return new GridFinishedFuture<>();
 
-        final UUID requestId = UUID.randomUUID();
-
-        final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx);
-
-        if (!cgsLocFut.compareAndSet(null, cgsFut)) {
-            GridChangeGlobalStateFuture locF = cgsLocFut.get();
-
-            if (locF.activate == activate)
-                return locF;
-
-            return new GridFinishedFuture<>(new IgniteException(
-                "Failed to " + prettyStr(activate) + ", because another state change operation is currently " +
-                    "in progress: " + prettyStr(locF.activate)));
-        }
-
-        if (globalState == ACTIVE && !activate && ctx.cache().context().snapshot().snapshotOperationInProgress()){
-            return new GridFinishedFuture<>(new IgniteException(
-                "Failed to " + prettyStr(activate) + ", because snapshot operation in progress."));
-        }
-
-        if (ctx.clientNode())
-            sendCompute(activate, cgsFut);
-        else {
-            try {
-                List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
-
-                DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
-                    requestId, activate ? ACTIVE : INACTIVE, ctx.localNodeId());
-
-                reqs.add(changeGlobalStateReq);
+        GridChangeGlobalStateFuture startedFut = null;
 
-                List<DynamicCacheChangeRequest> cacheReqs = activate ? startAllCachesRequests() : stopAllCachesRequests();
+        GridChangeGlobalStateFuture fut = cgsLocFut.get();
 
-                reqs.addAll(cacheReqs);
+        while (fut == null) {
+            fut = new GridChangeGlobalStateFuture(UUID.randomUUID(), activate, ctx);
 
-                printCacheInfo(cacheReqs, activate);
-
-                ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage(
-                    requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs));
-
-                try {
-                    ctx.discovery().sendCustomEvent(changeGlobalStateMsg);
-
-                    if (ctx.isStopping())
-                        cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
-                            "node is stopping."));
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to create or send global state change request: " + cgsFut, e);
+            if (cgsLocFut.compareAndSet(null, fut)) {
+                startedFut = fut;
 
-                    cgsFut.onDone(e);
-                }
-            }
-            catch (IgniteCheckedException e) {
-                cgsFut.onDone(e);
+                break;
             }
+            else
+                fut = cgsLocFut.get();
         }
 
-        return cgsFut;
-    }
-
-    /**
-     *
-     */
-    private void sendCompute(boolean activate, final GridFutureAdapter<Void> res) {
-        AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
-
-        IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
-
-        if (log.isInfoEnabled())
-            log.info("Sending " + prettyStr(activate) + " request from node [id=" +
-                ctx.localNodeId() + " topVer=" + topVer + " isClient=" + ctx.isDaemon() +
-                " isDaemon" + ctx.isDaemon() + "]");
-
-        IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate));
-
-        fut.listen(new CI1<IgniteFuture>() {
-            @Override public void apply(IgniteFuture fut) {
-                try {
-                    fut.get();
-
-                    res.onDone();
-                }
-                catch (Exception e) {
-                    res.onDone(e);
-                }
-            }
-        });
-    }
-    /**
-     * @param reqs Requests to print.
-     * @param active Active flag.
-     */
-    private void printCacheInfo(List<DynamicCacheChangeRequest> reqs, boolean active) {
-        assert reqs != null;
-
-        StringBuilder sb = new StringBuilder();
-
-        sb.append("[");
-
-        for (int i = 0; i < reqs.size() - 1; i++)
-            sb.append(reqs.get(i).cacheName()).append(", ");
-
-        sb.append(reqs.get(reqs.size() - 1).cacheName());
-
-        sb.append("]");
-
-        sb.append(" ").append(reqs.size())
-            .append(" caches will be ")
-            .append(active ? "started" : "stopped");
-
-        if (log.isInfoEnabled())
-            log.info(sb.toString());
-    }
-
-    /**
-     * @param req Cache being started.
-     */
-    public void onCacheStart(DynamicCacheChangeRequest req) {
-        CacheInfo cacheInfo = cacheData.get(req.cacheName());
-
-        if (cacheInfo == null)
-            cacheData.put(req.cacheName(),
-                new CacheInfo(
-                    new StoredCacheData(req.startCacheConfiguration()),
-                    req.cacheType(), req.sql(),
-                    0L)
-            );
-    }
-
-    /**
-     * @param req Cache being stopped.
-     */
-    public void onCacheStop(DynamicCacheChangeRequest req) {
-        CacheInfo cacheInfo = cacheData.get(req.cacheName());
-
-        if (cacheInfo != null)
-            cacheData.remove(req.cacheName());
-    }
-
-    /**
-     * @return All caches map.
-     */
-    private Map<String, CacheConfiguration> allCaches() {
-        Map<String, CacheConfiguration> cfgs = new HashMap<>();
-
-        for (Map.Entry<String, CacheInfo> entry : cacheData.entrySet())
-            if (cfgs.get(entry.getKey()) == null)
-                cfgs.put(entry.getKey(), entry.getValue().cacheData().config());
-
-        return cfgs;
-    }
-
-    /**
-     * @return Collection of all caches start requests.
-     * @throws IgniteCheckedException If failed to create requests.
-     */
-    private List<DynamicCacheChangeRequest> startAllCachesRequests() throws IgniteCheckedException {
-        assert !ctx.config().isDaemon();
-
-        Collection<CacheConfiguration> cacheCfgs = allCaches().values();
-
-        final List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
-
-        if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) {
-            Map<String, StoredCacheData> ccfgs = sharedCtx.pageStore().readCacheConfigurations();
-
-            for (Map.Entry<String, StoredCacheData> entry : ccfgs.entrySet())
-                reqs.add(createRequest(entry.getValue().config()));
-
-            for (CacheConfiguration cfg : cacheCfgs)
-                if (!ccfgs.keySet().contains(cfg.getName()))
-                    reqs.add(createRequest(cfg));
-
-            return reqs;
-        }
-        else {
-            for (CacheConfiguration cfg : cacheCfgs)
-                reqs.add(createRequest(cfg));
-
-            return reqs;
-        }
-    }
-
-    /**
-     * @return Collection of requests to stop caches.
-     */
-    private List<DynamicCacheChangeRequest> stopAllCachesRequests() {
-        Collection<CacheConfiguration> cacheCfgs = allCaches().values();
-
-        List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheCfgs.size());
-
-        for (CacheConfiguration cfg : cacheCfgs) {
-            DynamicCacheChangeRequest req = stopRequest(ctx, cfg.getName(), false, false);
-
-            reqs.add(req);
-        }
-
-        return reqs;
-    }
-
-    /**
-     * @param cfg Configuration to create request for.
-     * @return Dynamic cache change request.
-     */
-    private DynamicCacheChangeRequest createRequest(CacheConfiguration cfg) {
-        assert cfg != null;
-        assert cfg.getName() != null;
-
-        String cacheName = cfg.getName();
-
-        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
-            UUID.randomUUID(), cacheName, ctx.localNodeId());
-
-        req.startCacheConfiguration(cfg);
-        req.template(cfg.getName().endsWith("*"));
-        req.nearCacheConfiguration(cfg.getNearConfiguration());
-        req.deploymentId(IgniteUuid.randomUuid());
-        req.schema(new QuerySchema(cfg.getQueryEntities()));
-        req.cacheType(cacheProc.cacheType(cacheName));
-
-        return req;
-    }
-
-    /**
-     *
-     */
-    public boolean active() {
-        ChangeGlobalStateContext actx = lastCgsCtx;
-
-        if (actx != null && !actx.activate && globalState == TRANSITION)
-            return true;
-
-        if (actx != null && actx.activate && globalState == TRANSITION)
-            return false;
-
-        return globalState == ACTIVE;
-    }
-
-    /**
-     * @param cacheName Cache name to check.
-     * @return Locally configured flag.
-     */
-    public boolean isLocallyConfigured(String cacheName){
-        assert localCacheData != null;
-
-        return localCacheData.caches().containsKey(cacheName) || localCacheData.templates().containsKey(cacheName);
-    }
-
-    /**
-     * Invoked if cluster is inactive.
-     *
-     * @param dataBag Bag to collect data to.
-     */
-    public void collectGridNodeData0(DiscoveryDataBag dataBag) {
-        if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
-            dataBag.addGridCommonData(CACHE_PROC.ordinal(), cacheData);
-    }
-
-    /**
-     * @param data Joining node discovery data.
-     */
-    public void onJoiningNodeDataReceived0(JoiningNodeDiscoveryData data) {
-        if (data.hasJoiningNodeData()) {
-            if (data.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
-                CacheJoinNodeDiscoveryData data0 = (CacheJoinNodeDiscoveryData)data.joiningNodeData();
-
-                cacheData.putAll(data0.caches());
-            }
-            else if (data.joiningNodeData() instanceof CacheClientReconnectDiscoveryData) {
-                CacheClientReconnectDiscoveryData data0 = (CacheClientReconnectDiscoveryData)data.joiningNodeData();
-
-                // No-op.
+        if (startedFut == null) {
+            if (fut.activate != activate) {
+                return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
+                    ", because another state change operation is currently in progress: " + prettyStr(fut.activate)));
             }
+            else
+                return fut;
         }
-    }
-
-    public void onGridDataReceived0(DiscoveryDataBag.GridDiscoveryData data) {
-        // Receive data from active cluster.
-        if (data.commonData() instanceof CacheNodeCommonDiscoveryData) {
-            CacheNodeCommonDiscoveryData data0 = (CacheNodeCommonDiscoveryData)data.commonData();
 
-            Map<String, CacheData> caches = data0.caches();
-
-            Map<String, CacheInfo> cacheInfos = new HashMap<>();
-
-            for (Map.Entry<String, CacheData> entry : caches.entrySet()) {
-                CacheData val = entry.getValue();
-
-                CacheInfo info = new CacheInfo(
-                    new StoredCacheData(val.cacheConfiguration()),
-                    val.cacheType(),
-                    val.sql(),
-                    val.flags()
-                );
-
-                cacheInfos.put(entry.getKey(), info);
-            }
+        ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId, ctx.localNodeId(), activate);
 
-            cacheData.putAll(cacheInfos);
-
-        } // Receive data from inactive cluster.
-        else if (data.commonData() instanceof Map) {
-            Map<String, CacheInfo> data0 = (Map<String, CacheInfo>)data.commonData();
+        try {
+            ctx.discovery().sendCustomEvent(msg);
 
-            cacheData.putAll(data0);
+            if (ctx.isStopping())
+                startedFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
+                    "node is stopping."));
         }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send global state change request: " + activate, e);
 
-        cacheData.putAll(localCacheData.caches());
-    }
-
-    /**
-     * @param exchActions Requests.
-     * @param topVer Exchange topology version.
-     */
-    public boolean changeGlobalState(
-        ExchangeActions exchActions,
-        AffinityTopologyVersion topVer
-    ) {
-        assert exchActions != null;
-        assert topVer != null;
-
-        if (exchActions.newClusterState() != null) {
-            ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
-            assert cgsCtx != null : topVer;
-
-            cgsCtx.topologyVersion(topVer);
-
-            return true;
+            startedFut.onDone(e);
         }
 
-        return false;
+        return startedFut;
     }
 
     /**
      * Invoke from exchange future.
      */
-    public Exception onChangeGlobalState() {
-        GridChangeGlobalStateFuture f = cgsLocFut.get();
-
-        ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
-        assert cgsCtx != null;
-
-        if (f != null)
-            f.setRemaining(cgsCtx.topVer);
-
-        return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx);
+    public Exception changeGlobalState(StateChangeRequest req) {
+        return req.activate() ? onActivate(req.topologyVersion()) : onDeActivate(req.topologyVersion());
     }
 
     /**
      * @param exs Exs.
      */
-    public void onFullResponseMessage(Map<UUID, Exception> exs) {
+    public void onStateChangeError(Map<UUID, Exception> exs, StateChangeRequest req) {
         assert !F.isEmpty(exs);
 
-        ChangeGlobalStateContext actx = lastCgsCtx;
-
-        actx.setFail();
-
         // Revert change if activation request fail.
-        if (actx.activate) {
+        if (req.activate()) {
             try {
                 cacheProc.onKernalStopCaches(true);
 
@@ -697,13 +372,12 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
             //todo https://issues.apache.org/jira/browse/IGNITE-5480
         }
 
-        globalState = actx.activate ? INACTIVE : ACTIVE;
-
+        // TODO GG-12389.
         GridChangeGlobalStateFuture af = cgsLocFut.get();
 
-        if (af != null && af.requestId.equals(actx.requestId)) {
+        if (af != null && af.requestId.equals(req.requestId())) {
             IgniteCheckedException e = new IgniteCheckedException(
-                "Fail " + prettyStr(actx.activate),
+                "Fail " + prettyStr(req.activate()),
                 null,
                 false
             );
@@ -718,19 +392,12 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private Exception onActivate(ChangeGlobalStateContext cgsCtx) {
+    private Exception onActivate(AffinityTopologyVersion topVer) {
         final boolean client = ctx.clientNode();
 
         if (log.isInfoEnabled())
             log.info("Start activation process [nodeId=" + ctx.localNodeId() + ", client=" + client +
-                ", topVer=" + cgsCtx.topVer + "]");
-
-        Collection<StoredCacheData> cfgs = new ArrayList<>();
-
-        for (DynamicCacheChangeRequest req : cgsCtx.batch.requests()) {
-            if (req.startCacheConfiguration() != null)
-                cfgs.add(new StoredCacheData(req.startCacheConfiguration()));
-        }
+                ", topVer=" + topVer + "]");
 
         try {
             if (!client)
@@ -750,13 +417,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
             if (log.isInfoEnabled())
                 log.info("Successfully activated persistence managers [nodeId="
-                    + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+                    + ctx.localNodeId() + ", client=" + client + ", topVer=" + topVer + "]");
 
             return null;
         }
         catch (Exception e) {
             U.error(log, "Failed to activate persistence managers [nodeId=" + ctx.localNodeId() + ", client=" + client +
-                ", topVer=" + cgsCtx.topVer + "]", e);
+                ", topVer=" + topVer + "]", e);
 
             if (!client)
                 sharedCtx.database().unLock();
@@ -768,12 +435,12 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) {
+    public Exception onDeActivate(AffinityTopologyVersion topVer) {
         final boolean client = ctx.clientNode();
 
         if (log.isInfoEnabled())
             log.info("Starting deactivation [id=" + ctx.localNodeId() + ", client=" +
-                client + ", topVer=" + cgsCtx.topVer + "]");
+                client + ", topVer=" + topVer + "]");
 
         try {
             ctx.dataStructures().onDeActivate(ctx);
@@ -782,13 +449,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
             if (log.isInfoEnabled())
                 log.info("Successfully deactivated persistence processors [id=" + ctx.localNodeId() + ", client=" +
-                    client + ", topVer=" + cgsCtx.topVer + "]");
+                    client + ", topVer=" + topVer + "]");
 
             return null;
         }
         catch (Exception e) {
             U.error(log, "Failed to execute deactivation callback [nodeId=" + ctx.localNodeId() + ", client=" + client +
-                ", topVer=" + cgsCtx.topVer + "]", e);
+                ", topVer=" + topVer + "]", e);
 
             return e;
         }
@@ -797,17 +464,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) {
-        IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() {
+    private void onFinalActivate(final StateChangeRequest req) {
+        ctx.closure().runLocalSafe(new Runnable() {
             @Override public void run() {
                 boolean client = ctx.clientNode();
 
                 Exception e = null;
 
                 try {
-                    if (!ctx.config().isDaemon())
-                        ctx.cacheObjects().onUtilityCacheStarted();
-
                     ctx.service().onUtilityCacheStarted();
 
                     ctx.service().onActivate(ctx);
@@ -816,36 +480,30 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
                     if (log.isInfoEnabled())
                         log.info("Successfully performed final activation steps [nodeId="
-                            + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+                            + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
                 }
                 catch (Exception ex) {
                     e = ex;
 
                     U.error(log, "Failed to perform final activation steps [nodeId=" + ctx.localNodeId() +
-                        ", client=" + client + ", topVer=" + lastCgsCtx.topVer + "]", ex);
+                        ", client=" + client + ", topVer=" + req.topologyVersion() + "]", ex);
                 }
                 finally {
-                    globalState = ACTIVE;
-
-                    sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e);
-
-                    lastCgsCtx = null;
+                    sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), e);
                 }
             }
         });
-
-        cgsCtx.setAsyncActivateFut(asyncActivateFut);
     }
 
     /**
      *
      */
-    public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) {
+    private void onFinalDeActivate(final StateChangeRequest req) {
         final boolean client = ctx.clientNode();
 
         if (log.isInfoEnabled())
             log.info("Successfully performed final deactivation steps [nodeId="
-                + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+                + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
 
         Exception ex = null;
 
@@ -865,31 +523,21 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         catch (Exception e) {
             ex = e;
         }
-        finally {
-            globalState = INACTIVE;
-        }
-
-        sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex);
 
-        lastCgsCtx = null;
+        sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), ex);
     }
 
     /**
      *
      */
-    public void onExchangeDone() {
-        ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
-        assert cgsCtx != null;
-
-        if (!cgsCtx.isFail()) {
-            if (cgsCtx.activate)
-                onFinalActivate(cgsCtx);
+    public void onExchangeDone(boolean fail, StateChangeRequest req) {
+        // TODO GG-12389 pass correct fail flag.
+        if (!fail) {
+            if (req.activate())
+                onFinalActivate(req);
             else
-                onFinalDeActivate(cgsCtx);
+                onFinalDeActivate(req);
         }
-        else
-            lastCgsCtx = null;
     }
 
     /**
@@ -918,44 +566,44 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param nodeId Node ID.
      * @param msg Message.
      */
     private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) {
         assert nodeId != null;
         assert msg != null;
 
-        if (log.isDebugEnabled())
+        if (log.isDebugEnabled()) {
             log.debug("Received activation response [requestId=" + msg.getRequestId() +
                 ", nodeId=" + nodeId + "]");
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null) {
-            U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" +
-                msg.getRequestId() + ']');
-
-            return;
         }
 
         UUID requestId = msg.getRequestId();
 
         final GridChangeGlobalStateFuture fut = cgsLocFut.get();
 
-        if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) {
-            fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> f) {
-                    fut.onResponse(nodeId, msg);
-                }
-            });
+        if (fut != null && requestId.equals(fut.requestId)) {
+            if (fut.initFut.isDone())
+                fut.onResponse(nodeId, msg);
+            else {
+                fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        // initFut is completed from discovery thread, process response from other thread.
+                        ctx.getSystemExecutorService().execute(new Runnable() {
+                            @Override public void run() {
+                                fut.onResponse(nodeId, msg);
+                            }
+                        });
+                    }
+                });
+            }
         }
     }
 
-
-
     /**
      * @param activate Activate.
      */
-    private String prettyStr(boolean activate) {
+    private static String prettyStr(boolean activate) {
         return activate ? "activate" : "deactivate";
     }
 
@@ -1033,21 +681,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         /**
          *
          */
-        public void setRemaining(AffinityTopologyVersion topVer) {
-            Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
-
-            List<UUID> ids = new ArrayList<>(nodes.size());
-
-            for (ClusterNode n : nodes)
-                ids.add(n.id());
-
-            if (log.isDebugEnabled())
-                log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" +
-                    ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() +
-                    ", nodes=" + Arrays.toString(ids.toArray()) + "]");
+        void setRemaining(Set<UUID> nodesIds, AffinityTopologyVersion topVer) {
+            if (log.isDebugEnabled()) {
+                log.debug("Setup remaining node [id=" + ctx.localNodeId() +
+                    ", client=" + ctx.clientNode() +
+                    ", topVer=" + topVer +
+                    ", nodes=" + nodesIds + "]");
+            }
 
             synchronized (mux) {
-                remaining.addAll(ids);
+                remaining.addAll(nodesIds);
             }
 
             initFut.onDone();
@@ -1101,96 +744,18 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
         /** {@inheritDoc} */
         @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
-            ctx.state().cgsLocFut.set(null);
-
-            return super.onDone(res, err);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(GridChangeGlobalStateFuture.class, this);
-        }
-    }
-
-    /**
-     *
-     *
-     */
-    private static class ChangeGlobalStateContext {
-        /** Request id. */
-        private final UUID requestId;
-
-        /** Initiating node id. */
-        private final UUID initiatingNodeId;
-
-        /** Batch requests. */
-        private final DynamicCacheChangeBatch batch;
+            if (super.onDone(res, err)) {
+                ctx.state().cgsLocFut.compareAndSet(this, null);
 
-        /** Activate. */
-        private final boolean activate;
-
-        /** Topology version. */
-        private AffinityTopologyVersion topVer;
-
-        /** Fail. */
-        private boolean fail;
-
-        /** Async activate future. */
-        private IgniteInternalFuture<?> asyncActivateFut;
-
-        /**
-         *
-         */
-        ChangeGlobalStateContext(
-            UUID requestId,
-            UUID initiatingNodeId,
-            DynamicCacheChangeBatch batch,
-            boolean activate
-        ) {
-            this.requestId = requestId;
-            this.batch = batch;
-            this.activate = activate;
-            this.initiatingNodeId = initiatingNodeId;
-        }
-
-        /**
-         * @param topVer Topology version.
-         */
-        public void topologyVersion(AffinityTopologyVersion topVer) {
-            this.topVer = topVer;
-        }
-
-        /**
-         *
-         */
-        private void setFail() {
-            fail = true;
-        }
-
-        /**
-         *
-         */
-        private boolean isFail() {
-            return fail;
-        }
-
-        /**
-         *
-         */
-        public IgniteInternalFuture<?> getAsyncActivateFut() {
-            return asyncActivateFut;
-        }
+                return true;
+            }
 
-        /**
-         * @param asyncActivateFut Async activate future.
-         */
-        public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) {
-            this.asyncActivateFut = asyncActivateFut;
+            return false;
         }
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(ChangeGlobalStateContext.class, this);
+            return S.toString(GridChangeGlobalStateFuture.class, this);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 52cc9e9..cbca773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -175,8 +175,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.config().isDaemon() || !ctx.state().active())
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+        if (ctx.config().isDaemon() || !active)
             return;
 
         onKernalStart0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 2f6abb6..c567ac4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index e336474..c27770f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -35,7 +35,6 @@ import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSet;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 8712756..7eb61d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -101,8 +101,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 3c2f64d..244820f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -177,7 +177,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
         if (ctx.config().isDaemon())
             return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 716adf7..f528184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -503,7 +503,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
         if (isRestEnabled()) {
             for (GridRestProtocol proto : protos)
                 proto.onKernalStart();

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
index 909b524..6236026 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
@@ -64,7 +64,7 @@ public class GridChangeStateCommandHandler extends GridRestCommandHandlerAdapter
 
         try {
             if (req.command().equals(CLUSTER_CURRENT_STATE)) {
-                Boolean currentState = ctx.state().active();
+                Boolean currentState = ctx.state().publicApiActiveState();
 
                 res.setResponse(currentState);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 12be63b..06f5244 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -211,8 +211,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.isDaemon() || !ctx.state().active())
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+        if (ctx.isDaemon() || !active)
             return;
 
         onKernalStart0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 569a719..7a859a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -153,7 +153,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
         tasksMetaCache = ctx.security().enabled() && !ctx.isDaemon() ?
             ctx.cache().<GridTaskNameHashKey, String>utilityCache() : null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
index 5e85b62..b88eef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -128,7 +128,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
 
         mgr.start();
 
-        mgr.onKernalStart();
+        mgr.onKernalStart(true);
 
         assertTrue(mgr.enabled());
     }
@@ -143,7 +143,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
 
         assertTrue(mgr.enabled());
 
-        mgr.onKernalStart();
+        mgr.onKernalStart(true);
 
         mgr.onKernalStop(false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
index 4dfe69b..a890cf6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
@@ -152,7 +152,7 @@ public class NoOpPageStoreManager implements IgnitePageStoreManager {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean reconnect) throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active, boolean reconnect) throws IgniteCheckedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 0ef593f..13f1eb4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -92,7 +92,7 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean reconnect) throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active, boolean reconnect) throws IgniteCheckedException {
 
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10649c7b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 90af25e..bc07028 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -479,7 +479,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
 
         timeoutProcessor.start();
 
-        timeoutProcessor.onKernalStart();
+        timeoutProcessor.onKernalStart(true);
 
         for (int i = 0; i < getSpiCount(); i++) {
             CommunicationSpi<Message> spi = newCommunicationSpi();


Mime
View raw message