ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [43/49] ignite git commit: ignite-gg-12221 more test, collect receive active/inactive flag first.
Date Fri, 02 Jun 2017 17:13:56 GMT
ignite-gg-12221 more test, collect receive active/inactive flag first.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8100f52b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8100f52b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8100f52b

Branch: refs/heads/ignite-5398
Commit: 8100f52b4ee4e2dd783e2d9c000ec8a6f656a7b8
Parents: 5c4d2a7
Author: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Authored: Wed May 31 14:20:56 2017 +0300
Committer: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Committed: Wed May 31 14:20:56 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  24 ++++-
 .../processors/cache/GridCacheProcessor.java    | 107 +++++++++++--------
 .../cluster/GridClusterStateProcessor.java      |  36 ++++++-
 3 files changed, 117 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8100f52b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index f7a82ba..81e5cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.service.GridServiceProcessor;
@@ -88,7 +89,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -662,7 +662,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
 
                 Map<Integer, Serializable> data = new HashMap<>();
 
+                Serializable val = ctx.state().collectDiscoveryData(nodeId);
+
+                int type = ctx.state().discoveryDataType().ordinal();
+
+                assert val != null;
+
+                data.put(type, val);
+
                 for (GridComponent comp : ctx.components()) {
+                    if (comp instanceof GridClusterStateProcessor)
+                        continue;
+
                     Serializable compData = comp.collectDiscoveryData(nodeId);
 
                     if (compData != null) {
@@ -676,6 +687,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
             }
 
             @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer,
Serializable> data) {
+                GridClusterStateProcessor stateProc = ctx.state();
+
+                int type = stateProc.discoveryDataType().ordinal();
+
+                Serializable data0 = data.get(type);
+
+                if (data0 != null)
+                    stateProc.onDiscoveryDataReceived(joiningNodeId, nodeId, data0);
+
                 for (Map.Entry<Integer, Serializable> e : data.entrySet()) {
                     GridComponent comp = null;
 
@@ -687,7 +707,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                         }
                     }
 
-                    if (comp != null)
+                    if (comp != null && !(comp instanceof GridClusterStateProcessor))
                         comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue());
                     else {
                         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/8100f52b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1df7e42..fa9353e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -600,7 +600,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheSharedManager mgr : sharedCtx.managers())
             mgr.start(sharedCtx);
 
-        //if inActivate on start then skip registrate caches
+        // If inActivate on start then skip registrate caches.
         if (!activeOnStart)
             return;
 
@@ -775,48 +775,45 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         try {
             checkConsistency();
 
-            boolean currStatus = ctx.state().active();
+            if (ctx.state().onJoinChanged()){
+                // Node was active on start and joint -> inActive cluster.
+                if (ctx.state().changeActiveToInactive()){
 
-            boolean changed = false;
-
-            if (currStatus != activeOnStart) {
-                activeOnStart = currStatus;
+                }
 
-                changed = true;
-            }
-            // If we start as inactive node, and join to active cluster, we must register
all caches
-            // which were received on join.
-            if (!ctx.isDaemon() && changed) {
-                List<CacheConfiguration> tmpCacheCfg = new ArrayList<>();
+                // Node was inActive on start and joint -> active cluster.
+                if (ctx.state().changeInActiveToActive()){
+                    if (!ctx.isDaemon()){
+                        List<CacheConfiguration> tmpCacheCfg = new ArrayList<>();
 
-                for (CacheConfiguration conf : ctx.config().getCacheConfiguration()) {
-                    for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                        CacheConfiguration c = desc.cacheConfiguration();
-                        IgnitePredicate filter = c.getNodeFilter();
+                        for (CacheConfiguration conf : ctx.config().getCacheConfiguration())
{
+                            for (DynamicCacheDescriptor desc : registeredCaches.values())
{
+                                CacheConfiguration c = desc.cacheConfiguration();
+                                IgnitePredicate filter = c.getNodeFilter();
 
-                        if (c.getName().equals(conf.getName()) &&
-                            ((desc.receivedOnDiscovery() && CU.affinityNode(locNode,
filter)) ||
-                                CU.isSystemCache(c.getName()))) {
+                                if (c.getName().equals(conf.getName()) &&
+                                    ((desc.receivedOnDiscovery() && CU.affinityNode(locNode,
filter)) ||
+                                        CU.isSystemCache(c.getName()))) {
 
-                            if (CU.isSystemCache(c.getName()))
-                                desc.locallyConfigured(true);
+                                    if (CU.isSystemCache(c.getName()))
+                                        desc.locallyConfigured(true);
 
-                            tmpCacheCfg.add(c);
+                                    tmpCacheCfg.add(c);
 
-                            break;
+                                    break;
+                                }
+                            }
                         }
-                    }
-                }
 
-                if (!tmpCacheCfg.isEmpty()) {
-                    CacheConfiguration[] newCacheCfg = new CacheConfiguration[tmpCacheCfg.size()];
+                        if (!tmpCacheCfg.isEmpty()) {
+                            CacheConfiguration[] newCacheCfg = new CacheConfiguration[tmpCacheCfg.size()];
 
-                    tmpCacheCfg.toArray(newCacheCfg);
+                            tmpCacheCfg.toArray(newCacheCfg);
 
-                    ctx.config().setCacheConfiguration(newCacheCfg);
+                            ctx.config().setCacheConfiguration(newCacheCfg);
+                        }
+                    }
                 }
-
-                activeOnStart = currStatus;
             }
 
             if (activeOnStart && !ctx.clientNode() && !ctx.isDaemon())
@@ -876,7 +873,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 mgr.onKernalStart(false);
 
         // Escape if start active on start false
-        if (!activeOnStart)
+        if (!ctx.state().active())
             return;
 
         for (GridCacheAdapter<?, ?> cache : caches.values())
@@ -2105,9 +2102,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
-        if (!sharedCtx.kernalContext().state().active())
-            return new DynamicCacheChangeBatch(Collections.<DynamicCacheChangeRequest>emptyList())
-                .restartingCaches(Collections.<String>emptySet());
+        if (!sharedCtx.kernalContext().state().active()) {
+            if (ctx.localNodeId().equals(nodeId)) {
+                CacheConfiguration[] cfgs = sharedCtx.gridConfig().getCacheConfiguration();
+
+                List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cfgs.length);
+
+                try {
+                    for (CacheConfiguration cfg : cfgs)
+                        reqs.add(createRequest(cfg, true));
+                }
+                catch (Exception e) {
+                    // Todo
+                }
+
+                return new DynamicCacheChangeBatch(reqs).restartingCaches(Collections.<String>emptySet());
+            }
+        }
 
         boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect
!= null;
 
@@ -2214,21 +2225,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable
data) {
-        if (!ctx.state().active()) {
-            if (!ctx.localNodeId().equals(joiningNodeId)){
-                if (data instanceof DynamicCacheChangeBatch)
-                    onJoinBatches.put(rmtNodeId, (DynamicCacheChangeBatch)data);
+        if (ctx.state().onJoinChanged()){
+            if (ctx.state().changeActiveToInactive()){
+                if (ctx.localNodeId().equals(joiningNodeId)) {
+                    for (DynamicCacheDescriptor desc : registeredCaches.values())
+                        ctx.discovery().removeCacheFilter(desc.cacheConfiguration().getName());
+
+                    registeredCaches.clear();
+                    registeredTemplates.clear();
+                }
 
                 return;
-            }else {
-                registeredCaches.clear();
-                registeredTemplates.clear();
+            }
+        }
 
-                for (DynamicCacheDescriptor desc : registeredCaches.values())
-                    ctx.discovery().removeCacheFilter(desc.cacheConfiguration().getName());
+        if (!ctx.state().active()) {
+            if (data instanceof DynamicCacheChangeBatch)
+                onJoinBatches.put(rmtNodeId, (DynamicCacheChangeBatch)data);
 
-                registeredCaches.clear();
-            }
+            return;
         }
 
         if (data instanceof DynamicCacheChangeBatch) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8100f52b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index a7c5cb8..3ffbda9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -130,7 +130,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
         super.start(activeOnStart);
 
-        globalState = activeOnStart ? ACTIVE : INACTIVE;
+        globalState = state(activeOnStart);
         cacheProc = ctx.cache();
         sharedCtx = cacheProc.context();
 
@@ -215,11 +215,15 @@ public class GridClusterStateProcessor extends GridProcessorAdapter
{
 
     /** {@inheritDoc} */
     @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
+        assert globalState != null;
+
         return globalState;
     }
 
     /** {@inheritDoc} */
     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable
data) {
+        assert data != null;
+
         if (ctx.localNodeId().equals(joiningNodeId))
             globalState = (ClusterState)data;
     }
@@ -313,6 +317,34 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param flag Flag.
+     */
+    private ClusterState state(boolean flag){
+        return flag ? ACTIVE : INACTIVE;
+    }
+
+    /**
+     *
+     */
+    public boolean onJoinChanged() {
+        return state(sharedCtx.gridConfig().isActiveOnStart()) != globalState;
+    }
+
+    /**
+     *
+     */
+    public boolean changeActiveToInactive(){
+        return state(sharedCtx.gridConfig().isActiveOnStart()) == ACTIVE && globalState
== INACTIVE;
+    }
+
+    /**
+     *
+     */
+    public boolean changeInActiveToActive(){
+        return state(sharedCtx.gridConfig().isActiveOnStart()) == INACTIVE && globalState
== ACTIVE;
+    }
+
+    /**
      *
      */
     public boolean active() {
@@ -406,7 +438,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
             //todo revert change if deactivate request fail
         }
 
-        globalState = actx.activate ? INACTIVE : ACTIVE;
+        globalState = state(actx.activate);
 
         GridChangeGlobalStateFuture af = cgsLocFut.get();
 


Mime
View raw message