ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/47] incubator-ignite git commit: # ignite-598: fix bugs and support remote validation.
Date Mon, 06 Apr 2015 09:07:32 GMT
# ignite-598: fix bugs and support remote validation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c280901d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c280901d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c280901d

Branch: refs/heads/ignite-gg-9998
Commit: c280901d67d7273573f4bf326d8d7304523d28be
Parents: b78dbff
Author: Artem Shutak <ashutak@gridgain.com>
Authored: Fri Apr 3 17:10:05 2015 +0300
Committer: Artem Shutak <ashutak@gridgain.com>
Committed: Fri Apr 3 17:10:05 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  2 +-
 .../ignite/internal/GridCachePluginContext.java |  6 ++
 .../processors/cache/GridCacheProcessor.java    | 64 +++++++++++++-------
 .../processors/plugin/CachePluginManager.java   | 51 ++++++++++++----
 .../ignite/plugin/CachePluginContext.java       | 11 ++++
 .../ignite/plugin/CachePluginProvider.java      |  4 +-
 6 files changed, 101 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 22148ad..f7f8e26 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -407,7 +407,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
     }
 
     /**
-     * Cache name. If not provided or {@code null}, then this will be considered a default
+     * Cache name or {@code null} if not provided, then this will be considered a default
      * cache which can be accessed via {@link Ignite#cache(String)} method. Otherwise, if
name
      * is provided, the cache will be accessed via {@link Ignite#cache(String)} method.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java
index 2334bae..7c80db6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridCachePluginContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.plugin.*;
 
@@ -64,6 +65,11 @@ public class GridCachePluginContext<C extends CachePluginConfiguration>
implemen
     @Override public Ignite grid() {        
         return ctx.grid();
     }
+    
+    /** {@inheritDoc} */
+    @Override public ClusterNode localNode() {
+        return ctx.discovery().localNode();
+    }
 
     /** {@inheritDoc} */
     @Override public IgniteLogger log(Class<?> cls) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c366520..f21eabc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -627,6 +627,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         ClusterNode locNode = ctx.discovery().localNode();
 
+        // Init cache plugin managers.
+        final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
+
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            CacheConfiguration locCcfg = desc.cacheConfiguration();
+            
+            CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
+
+            cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
+        }
+        
         if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
             for (ClusterNode n : ctx.discovery().remoteNodes()) {
                 checkTransactionConfiguration(n);
@@ -640,8 +651,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 for (DynamicCacheDescriptor desc : registeredCaches.values()) {
                     CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
 
-                    if (rmtCfg != null)
-                        checkCache(desc.cacheConfiguration(), rmtCfg, n);
+                    if (rmtCfg != null) {
+                        CacheConfiguration locCfg = desc.cacheConfiguration();
+                        
+                        checkCache(locCfg, rmtCfg, n);
+
+                        // Check plugin cache configurations.
+                        CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
+
+                        assert pluginMgr != null : " Map=" + cache2PluginMgr;
+
+                        pluginMgr.validateRemotes(rmtCfg, n);
+                    }
                 }
             }
         }
@@ -661,7 +682,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             if (filter.apply(locNode)) {
                 CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null,
ccfg.getName(), ccfg);
 
-                GridCacheContext ctx = createCache(ccfg, cacheObjCtx);
+                CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
+                
+                assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                
+                GridCacheContext ctx = createCache(ccfg, pluginMgr, cacheObjCtx);
 
                 ctx.dynamicDeploymentId(desc.deploymentId());
 
@@ -975,17 +1000,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param cfg Cache configuration to use to create cache.
+     * @param pluginMgr Cache plugin manager.
      * @return Cache context.
      * @throws IgniteCheckedException If failed to create cache.
      */
     @SuppressWarnings({"unchecked"})
-    private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, CacheObjectContext
cacheObjCtx) throws IgniteCheckedException {
+    private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, @Nullable CachePluginManager
pluginMgr,
+        CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
         assert cfg != null;
 
         CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create()
: null;
 
         validate(ctx.config(), cfg, cfgStore);
 
+        if (pluginMgr == null)
+            pluginMgr = new CachePluginManager(ctx, cfg);
+
+        pluginMgr.validate();
+
         CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName()
== null);
 
         jta.createTmLookup(cfg);
@@ -1017,16 +1049,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager();
         CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager();
         GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
-        CachePluginManager pluginMgr = new CachePluginManager(ctx, cfg);
         
-        pluginMgr.validate();
-
-        CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(ctx, cfg,
-            CacheConflictResolutionManager.class);
-
-        GridCacheDrManager drMgr = pluginMgr.createComponent(ctx, cfg,GridCacheDrManager.class);
-
-        CacheStoreManager storeMgr = pluginMgr.createComponent(ctx, cfg, CacheStoreManager.class);
+        CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(CacheConflictResolutionManager.class);
+        GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class);
+        CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class);
 
         storeMgr.initialize(cfgStore, sesHolders);
 
@@ -1156,7 +1182,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             evictMgr = new GridCacheEvictionManager();
             evtMgr = new GridCacheEventManager();
             pluginMgr = new CachePluginManager(ctx, cfg);
-            drMgr = pluginMgr.createComponent(ctx, cfg, GridCacheDrManager.class);
+            drMgr = pluginMgr.createComponent(GridCacheDrManager.class);
 
             cacheCtx = new GridCacheContext(
                 ctx,
@@ -1358,7 +1384,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, ccfg.getName(),
ccfg);
 
-            GridCacheContext cacheCtx = createCache(ccfg, cacheObjCtx);
+            GridCacheContext cacheCtx = createCache(ccfg, null, cacheObjCtx);
 
             cacheCtx.startTopologyVersion(topVer);
 
@@ -2028,14 +2054,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     true);
             }
         }
-        
-        // TODO 10006: implement remote configs validation.
-        // Check plugin configurations.
-//        for (CachePluginConfiguration locPluginCcfg : locCfg.getPluginConfigurations())
{
-//            CachePluginProvider provider = ...;
-//
-//            provider.validateRemote(locCfg, locPluginCcfg, rmtCfg, rmtNode);
-//        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
index 580ff49..640271a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.plugin;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -31,60 +32,71 @@ import java.util.*;
  * Cache plugin manager.
  */
 public class CachePluginManager extends GridCacheManagerAdapter {
+    /** Providers list. To have providers order. */
+    private final List<CachePluginProvider> providersList = new ArrayList<>();
+
+    /** */
+    private final Map<CachePluginContext, CachePluginProvider> providersMap = new HashMap<>();
+
     /** */
-    private final List<CachePluginProvider> providers = new ArrayList<>();
+    private final GridKernalContext ctx;
+
+    /** */
+    private final CacheConfiguration cfg;
 
     /**
      * @param ctx Context.
      * @param cfg Cache config.
      */
     public CachePluginManager(GridKernalContext ctx, CacheConfiguration cfg) {
+        this.ctx = ctx;
+        this.cfg = cfg;
+        
         if (cfg.getPluginConfigurations() != null) {
             for (CachePluginConfiguration cachePluginCfg : cfg.getPluginConfigurations())
{
                 CachePluginContext pluginCtx = new GridCachePluginContext(ctx, cfg, cachePluginCfg);
 
                 CachePluginProvider provider = cachePluginCfg.createProvider(pluginCtx);
 
-                providers.add(provider);
+                providersList.add(provider);
+                providersMap.put(pluginCtx, provider);
             }
         }
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStart0() throws IgniteCheckedException {
-        for (CachePluginProvider provider : providers)
+        for (CachePluginProvider provider : providersList)
             provider.onIgniteStart();
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
-        for (ListIterator<CachePluginProvider> iter = providers.listIterator(); iter.hasPrevious();)
+        for (ListIterator<CachePluginProvider> iter = providersList.listIterator();
iter.hasPrevious();)
             iter.previous().onIgniteStop(cancel);
     }
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        for (CachePluginProvider provider : providers)
+        for (CachePluginProvider provider : providersList)
             provider.start();
     }
 
     /** {@inheritDoc} */
     @Override protected void stop0(boolean cancel) {
-        for (ListIterator<CachePluginProvider> iter = providers.listIterator(); iter.hasPrevious();)
+        for (ListIterator<CachePluginProvider> iter = providersList.listIterator();
iter.hasPrevious();)
             iter.previous().stop(cancel);
     }
 
     /**
      * Creates optional component.
      *
-     * @param ctx Kernal context.
-     * @param cfg Cache configuration.
      * @param cls Component class.
      * @return Created component.
      */
     @SuppressWarnings("unchecked")
-    public <T> T createComponent(GridKernalContext ctx, CacheConfiguration cfg, Class<T>
cls) {
-        for (CachePluginProvider provider : providers) {
+    public <T> T createComponent(Class<T> cls) {
+        for (CachePluginProvider provider : providersList) {
             T res = (T)provider.createComponent(cls);
             
             if (res != null)
@@ -107,7 +119,24 @@ public class CachePluginManager extends GridCacheManagerAdapter {
      * @throws IgniteCheckedException If validation failed.
      */
     public void validate() throws IgniteCheckedException {
-        for (CachePluginProvider provider : providers)
+        for (CachePluginProvider provider : providersList)
             provider.validate();
     }
+
+    /**
+     * Checks that remote caches has configuration compatible with the local.
+     *    
+     * @param rmtCfg Remote cache configuration.
+     * @param rmtNode Remote rmtNode.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void validateRemotes(CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException
{
+        for (Map.Entry<CachePluginContext, CachePluginProvider> entry : providersMap.entrySet())
{
+            CachePluginContext cctx = entry.getKey();
+            CachePluginProvider provider = entry.getValue();
+            
+            provider.validateRemote(cctx.igniteCacheConfiguration(), cctx.cacheConfiguration(),
rmtCfg, rmtNode);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java
index 580b40d..f034312 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginContext.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.plugin;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 
 /**
  * Cache plugin context.
@@ -45,6 +47,15 @@ public interface CachePluginContext<C extends CachePluginConfiguration>
{
     public Ignite grid();
 
     /**
+     * Gets local grid node. Instance of local node is provided by underlying {@link DiscoverySpi}
+     * implementation used.
+     *
+     * @return Local grid node.
+     * @see DiscoverySpi
+     */
+    public ClusterNode localNode();
+
+    /**
      * Gets logger for given class.
      *
      * @param cls Class to get logger for.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c280901d/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
index 4006a70..ea5f795 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
@@ -77,8 +77,8 @@ public interface CachePluginProvider<C extends CachePluginConfiguration>
{
      * @param locCfg Local configuration.
      * @param locPluginCcfg Local plugin configuration.
      * @param rmtCfg Remote configuration.
-     * @param node Node.
+     * @param rmtNode Remote node.
      */
-    public void validateRemote(CacheConfiguration locCfg, C locPluginCcfg, CacheConfiguration
rmtCfg, ClusterNode node) 
+    public void validateRemote(CacheConfiguration locCfg, C locPluginCcfg, CacheConfiguration
rmtCfg, ClusterNode rmtNode)
         throws IgniteCheckedException;
 }


Mime
View raw message