ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [19/50] [abbrv] ignite git commit: ignite-gg-11650 Stabilize 8.0.2.ea1 branch after merging activation/deactivation fix join inactive node to active cluster
Date Thu, 29 Dec 2016 09:37:23 GMT
ignite-gg-11650 Stabilize 8.0.2.ea1 branch after merging activation/deactivation fix join inactive
node to active cluster


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/094a1b8c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/094a1b8c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/094a1b8c

Branch: refs/heads/ignite-3477
Commit: 094a1b8c5d6fe47e642c13b23904714e70cdacbf
Parents: 9f9be20
Author: Dmitriy Govorukhin <dgovorukhin@gridgain.com>
Authored: Thu Dec 22 15:21:14 2016 +0300
Committer: Dmitriy Govorukhin <dgovorukhin@gridgain.com>
Committed: Thu Dec 22 15:21:14 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 117 ++++++++++++++-----
 .../IgniteCacheDatabaseSharedManager.java       |   7 ++
 .../cluster/GridClusterStateProcessor.java      |  11 +-
 .../datastructures/DataStructuresProcessor.java |  11 +-
 .../service/GridServiceProcessor.java           |  42 +------
 5 files changed, 113 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/094a1b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 3834189..6514287 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -569,7 +569,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     "Deployment mode for cache is not CONTINUOUS or SHARED.");
         }
 
-        initializeInternalCachesNames();
+        initializeInternalCacheNames();
 
         sharedCtx = createSharedContext(
             ctx, CU.startStoreSessionListeners(ctx, ctx.config().getCacheStoreSessionListenerFactories()));
@@ -580,12 +580,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheSharedManager mgr : sharedCtx.managers())
             mgr.start(sharedCtx);
 
-        //if inActivate on start then skip start caches
+        //if inActivate on start then skip registrate caches
         if (!activeOnStart)
             return;
 
         CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
 
+        registerCacheFromConfig(cfgs);
+
+        registerCacheFromPersistentStore(cfgs);
+
+        if (log.isDebugEnabled())
+            log.debug("Started cache processor.");
+    }
+
+    /**
+     * @param cfgs Cache configurations.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException
{
         for (int i = 0; i < cfgs.length; i++) {
             if (ctx.config().isDaemon() && !CU.isMarshallerCache(cfgs[i].getName()))
                 continue;
@@ -596,7 +609,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             registerCache(cfg);
         }
+    }
 
+    /**
+     * @param cfgs Cache configurations.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException
{
         if (sharedCtx.pageStore() != null &&
             sharedCtx.database().persistenceEnabled() &&
             !ctx.config().isDaemon()) {
@@ -610,7 +629,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 savedCacheNames.remove(name);
 
             if (!F.isEmpty(savedCacheNames)) {
-                log.info("Will start persisted dynamic caches: " + savedCacheNames);
+                log.info("Registrate persistent caches: " + savedCacheNames);
 
                 for (String name : savedCacheNames) {
                     CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
@@ -620,9 +639,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 }
             }
         }
-
-        if (log.isDebugEnabled())
-            log.debug("Started cache processor.");
     }
 
     /**
@@ -664,8 +680,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
 
-        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template,
-            IgniteUuid.randomUuid());
+        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+            ctx, cfg, cacheType, template, IgniteUuid.randomUuid());
 
         desc.locallyConfigured(true);
         desc.staticallyConfigured(true);
@@ -697,8 +713,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         if (cfg.getName() == null) { // Use cache configuration with null name as template.
-            DynamicCacheDescriptor desc0 =
-                new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid());
+            DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(
+                ctx, cfg, cacheType, true, IgniteUuid.randomUuid());
 
             desc0.locallyConfigured(true);
             desc0.staticallyConfigured(true);
@@ -710,7 +726,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * Initialize internal cache names
      */
-    private void initializeInternalCachesNames() {
+    private void initializeInternalCacheNames() {
         FileSystemConfiguration[] igfsCfgs = ctx.grid().configuration().getFileSystemConfiguration();
 
         if (igfsCfgs != null) {
@@ -734,13 +750,43 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         try {
             checkConsistency();
 
-            //must be here, because we must start database before start first cache
-            if (activeOnStart) {
+            boolean currentStatus = ctx.state().active();
+
+            //if we start as inactive node, and join to active cluster, we must registrate
all caches which
+            //was receive on join
+            if (!ctx.isDaemon() && currentStatus && !activeOnStart) {
+                CacheConfiguration[] cacheCfgs = ctx.config().getCacheConfiguration();
+
+                CacheConfiguration[] newCacheCfg = new CacheConfiguration[cacheCfgs.length];
+
+                for (int i = 0; i < cacheCfgs.length; i++) {
+                    CacheConfiguration conf = cacheCfgs[i];
+
+                    for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                        CacheConfiguration c = desc.cacheConfiguration();
+                        IgnitePredicate filter = conf.getNodeFilter();
+
+                        if (c.getName().equals(conf.getName()) &&
+                            (desc.receivedOnDiscovery() || CU.isSystemCache(c.getName())))
{
+
+                            newCacheCfg[i] = c;
+
+                            break;
+                        }
+                    }
+                }
+
+                ctx.config().setCacheConfiguration(newCacheCfg);
+
+                activeOnStart = ctx.state().active();
+            }
+
+            if (activeOnStart)
                 if (!ctx.clientNode())
                     sharedCtx.database().lock();
 
-                sharedCtx.database().onKernalStart(false);
-            }
+            //must start database before start first cache
+            sharedCtx.database().onKernalStart(false);
 
             // Start dynamic caches received from collect discovery data.
             for (DynamicCacheDescriptor desc : registeredCaches.values()) {
@@ -783,6 +829,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 }
             }
         }
+        //todo if in active caches not started on start
         finally {
             cacheStartedLatch.countDown();
         }
@@ -792,6 +839,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             if (sharedCtx.database() != mgr)
                 mgr.onKernalStart(false);
 
+        // Escape if start active on start false
         if (!activeOnStart)
             return;
 
@@ -811,7 +859,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             IgnitePredicate filter = cfg.getNodeFilter();
 
-            if (desc.locallyConfigured() || desc.receivedOnDiscovery() && CU.affinityNode(locNode,
filter)) {
+            if (desc.locallyConfigured() || (desc.receivedOnDiscovery() && CU.affinityNode(locNode,
filter))) {
                 GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
 
                 if (cache != null) {
@@ -843,7 +891,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 DeploymentMode locDepMode = ctx.config().getDeploymentMode();
                 DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 
-                CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment
mode",
+                CU.checkAttributeMismatch(
+                    log, null, n.id(), "deploymentMode", "Deployment mode",
                     locDepMode, rmtDepMode, true);
 
                 for (DynamicCacheDescriptor desc : registeredCaches.values()) {
@@ -1784,6 +1833,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
 
             cacheCtx.startTopologyVersion(topVer);
+
             cacheCtx.dynamicDeploymentId(deploymentId);
 
             GridCacheAdapter cache = cacheCtx.cache();
@@ -2024,7 +2074,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size()
+ 1);
 
             for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(),
desc.cacheConfiguration().getName(), null);
+                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
+                    UUID.randomUUID(), desc.cacheConfiguration().getName(), null);
 
                 req.startCacheConfiguration(desc.cacheConfiguration());
 
@@ -2038,7 +2089,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
 
             for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
-                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(),
desc.cacheConfiguration().getName(), null);
+                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
+                    UUID.randomUUID(), desc.cacheConfiguration().getName(), null);
 
                 req.startCacheConfiguration(desc.cacheConfiguration());
 
@@ -2551,29 +2603,29 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public Collection<DynamicCacheChangeRequest> startAllCachesRequests() throws IgniteCheckedException
{
         List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 
-        if (!ctx.config().isDaemon() && sharedCtx.pageStore() != null &&
sharedCtx.database().persistenceEnabled()) {
+        if (!ctx.config().isDaemon() &&
+            sharedCtx.pageStore() != null &&
+            sharedCtx.database().persistenceEnabled()) {
             Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
 
             for (String name : savedCacheNames) {
                 CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
 
                 if (cfg != null)
-                    reqs.add(createRequest(cfg));
+                    reqs.add(createRequest(cfg, false));
             }
 
             for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) {
                 if (!savedCacheNames.contains(cfg.getName()))
-                    reqs.add(createRequest(cfg));
+                    reqs.add(createRequest(cfg, true));
             }
-
-            return reqs;
         }
         else {
             for (CacheConfiguration cfg : ctx.config().getCacheConfiguration())
-                reqs.add(createRequest(cfg));
-
-            return reqs;
+                reqs.add(createRequest(cfg, true));
         }
+
+        return reqs;
     }
 
     /**
@@ -2601,14 +2653,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param cfg Cache configuration.
      */
-    private DynamicCacheChangeRequest createRequest(CacheConfiguration cfg) throws IgniteCheckedException
{
+    private DynamicCacheChangeRequest createRequest(
+        CacheConfiguration cfg,
+        boolean needInit
+    ) throws IgniteCheckedException {
         assert cfg != null;
 
         cloneCheckSerializable(cfg);
 
-        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
+        if (needInit){
+            CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
 
-        initialize(cfg, cacheObjCtx);
+            initialize(cfg, cacheObjCtx);
+        }
 
         String cacheName = cfg.getName();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/094a1b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
index 1fb1543..7c3d06d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
@@ -75,6 +75,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     }
 
     /**
+     *
+     */
+    public void initDataBase() throws IgniteCheckedException{
+
+    }
+
+    /**
      * @return Node-global free list.
      */
     public FreeList globalFreeList() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/094a1b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index c1ff5e4..725c9f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -128,7 +128,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
         super.start(activeOnStart);
 
-        //todo get file lock if active on start true
         globalState = activeOnStart ? ACTIVE : INACTIVE;
         cacheProc = ctx.cache();
         sharedCtx = cacheProc.context();
@@ -440,8 +439,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
             if (!client) {
                 sharedCtx.database().lock();
 
+                if (sharedCtx.pageStore() != null)
+                    sharedCtx.pageStore().onActivate(ctx);
+
                 sharedCtx.wal().onActivate(ctx);
 
+                sharedCtx.database().initDataBase();
+
                 for (CacheConfiguration cfg : cfgs)
                     if (CU.isSystemCache(cfg.getName()))
                         sharedCtx.pageStore().initializeForCache(cfg);
@@ -451,9 +455,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
                         sharedCtx.pageStore().initializeForCache(cfg);
 
                 sharedCtx.database().onActivate(ctx);
-
-                if (sharedCtx.pageStore() != null)
-                    sharedCtx.pageStore().onActivate(ctx);
             }
 
             if (log.isInfoEnabled())
@@ -571,6 +572,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
                     sharedCtx.pageStore().onDeActivate(ctx);
 
                 sharedCtx.wal().onDeActivate(ctx);
+
+                sharedCtx.affinity().removeAllCacheInfo();
             }
         }
         catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/094a1b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 2f6b7ce..77fc295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -215,16 +215,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
implemen
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException
{
-        if (ctx.config().isDaemon() || !activeOnStart)
+        if (ctx.config().isDaemon() || !ctx.state().active())
             return;
 
-        onKernalStart0();
+        onKernalStart0(activeOnStart);
     }
 
     /**
      *
      */
-    private void onKernalStart0(){
+    private void onKernalStart0(boolean activeOnStart){
+        if (!activeOnStart && ctx.state().active())
+            ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
         utilityCache = (IgniteInternalCache)ctx.cache().utilityCache();
 
         utilityDataCache = (IgniteInternalCache)ctx.cache().utilityCache();
@@ -313,7 +316,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
implemen
 
         ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        onKernalStart0();
+        onKernalStart0(true);
 
         for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet())
{
             GridCacheRemovable v = e.getValue();

http://git-wip-us.apache.org/repos/asf/ignite/blob/094a1b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index b160f95..e65354e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -153,7 +153,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
     private final List<ComputeJobContext> pendingJobCtxs = new ArrayList<>(0);
 
     /** Deployment executor service. */
-    private final ExecutorService depExe;
+    private volatile ExecutorService depExe;
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -244,7 +244,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException
{
-        if (ctx.isDaemon() || !activeOnStart)
+        if (ctx.isDaemon() || !ctx.state().active())
             return;
 
         cache = ctx.cache().utilityCache();
@@ -368,6 +368,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
             log.debug("Activate service processor [nodeId=" + ctx.localNodeId() +
                 " topVer=" + ctx.discovery().topologyVersionEx() + " ]");
 
+        depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.gridName(),
"srvc-deploy"));
+
         start(true);
 
         onKernalStart(true);
@@ -379,41 +381,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
             log.debug("DeActivate service processor [nodeId=" + ctx.localNodeId() +
                 " topVer=" + ctx.discovery().topologyVersionEx() + " ]");
 
-        busyLock.block();
-
-        try {
-            if (ctx.isDaemon())
-                return;
-
-            if (!ctx.clientNode())
-                ctx.event().removeLocalEventListener(topLsnr);
-
-            Collection<ServiceContextImpl> ctxs = new ArrayList<>();
-
-            synchronized (locSvcs) {
-                for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
-                    ctxs.addAll(ctxs0);
-            }
-
-            for (ServiceContextImpl ctx : ctxs) {
-                ctx.setCancelled(true);
-
-                Service svc = ctx.service();
-
-                if (svc != null)
-                    svc.cancel(ctx);
-            }
-
-            Exception err = new IgniteCheckedException("Operation has been cancelled (node
is in active status).");
-
-            cancelFutures(depFuts, err);
-            cancelFutures(undepFuts, err);
-
-            if (log.isDebugEnabled())
-                log.debug("Deactivate service processor.");
-        }finally {
-            busyLock.unblock();
-        }
+        onKernalStop(true);
     }
 
     /** {@inheritDoc} */


Mime
View raw message