ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [34/39] incubator-ignite git commit: ignite-45 - Fixed cache deploy from config.
Date Mon, 16 Mar 2015 05:33:38 GMT
ignite-45 - Fixed cache deploy from config.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/84e31966
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/84e31966
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/84e31966

Branch: refs/heads/ignite-45
Commit: 84e31966d0b4e6c3671975da221b89c5a22f04e5
Parents: 41dd737
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Sun Mar 15 21:38:42 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Sun Mar 15 21:38:42 2015 -0700

----------------------------------------------------------------------
 .../cache/DynamicCacheDescriptor.java           |  33 ++++
 .../processors/cache/GridCacheContext.java      |  19 ++-
 .../processors/cache/GridCacheProcessor.java    | 158 ++++++++++++++-----
 .../GridDhtPartitionsExchangeFuture.java        |  29 +---
 .../cache/IgniteStaticCacheStartSelfTest.java   |  98 ++++++++++++
 5 files changed, 274 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84e31966/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 4ae96a4..3a900ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -43,6 +43,12 @@ public class DynamicCacheDescriptor {
     /** Locally configured flag. */
     private boolean locCfg;
 
+    /** Statically configured flag. */
+    private boolean staticCfg;
+
+    /** Started flag. */
+    private boolean started;
+
     /**
      * @param cacheCfg Cache configuration.
      */
@@ -80,6 +86,33 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * @return {@code True} if statically configured.
+     */
+    public boolean staticallyConfigured() {
+        return staticCfg;
+    }
+
+    /**
+     * @param staticCfg {@code True} if statically configured.
+     */
+    public void staticallyConfigured(boolean staticCfg) {
+        this.staticCfg = staticCfg;
+    }
+
+    /**
+     * @return {@code True} if started flag was flipped by this call.
+     */
+    public boolean onStart() {
+        if (!started) {
+            started = true;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @return Cache configuration.
      */
     public CacheConfiguration cacheConfiguration() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84e31966/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 9f30a7d..dfc1998 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -43,8 +43,6 @@ import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.cacheobject.*;
 import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.processors.offheap.*;
-import org.apache.ignite.internal.processors.portable.*;
-import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -197,6 +195,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** */
     private CacheObjectContext cacheObjCtx;
 
+    /** Start topology version. */
+    private AffinityTopologyVersion startTopVer;
+
     /** Dynamic cache deployment ID. */
     private IgniteUuid dynamicDeploymentId;
 
@@ -345,6 +346,20 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return Start topology version.
+     */
+    public AffinityTopologyVersion startTopologyVersion() {
+        return startTopVer;
+    }
+
+    /**
+     * @param startTopVer Start topology version.
+     */
+    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
+        this.startTopVer = startTopVer;
+    }
+
+    /**
      * @return Cache default {@link ExpiryPolicy}.
      */
     @Nullable public ExpiryPolicy expiry() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84e31966/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 993e50b..b9e766c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.datastructures.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
@@ -604,6 +605,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cfg, IgniteUuid.randomUuid());
 
             desc.locallyConfigured(true);
+            desc.staticallyConfigured(true);
 
             registeredCaches.put(maskNull(cfg.getName()), desc);
 
@@ -660,6 +662,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             // Check if validation failed on node start.
             desc.checkValid();
 
+            boolean started = desc.onStart();
+
+            assert started : "Failed to change started flag for locally configured cache:
" + desc;
+
             CacheConfiguration ccfg = desc.cacheConfiguration();
 
             IgnitePredicate filter = ccfg.getNodeFilter();
@@ -1240,40 +1246,88 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param req Start request.
+     * @param reqs Requests to start.
+     * @throws IgniteCheckedException If failed to start cache.
      */
-    public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException
{
-        assert req.isStart();
+    @SuppressWarnings("TypeMayBeWeakened")
+    public void prepareCachesStart(
+        Collection<DynamicCacheChangeRequest> reqs,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        for (DynamicCacheChangeRequest req : reqs) {
+            assert req.isStart();
+
+            prepareCacheStart(
+                req.startCacheConfiguration(),
+                req.nearCacheConfiguration(),
+                req.clientStartOnly(),
+                req.initiatingNodeId(),
+                req.deploymentId(),
+                topVer
+            );
+        }
+
+        // Start statically configured caches received from remote nodes during exchange.
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            if (desc.staticallyConfigured() && !desc.locallyConfigured()) {
+                if (desc.onStart()) {
+                    prepareCacheStart(
+                        desc.cacheConfiguration(),
+                        null,
+                        false,
+                        null,
+                        desc.deploymentId(),
+                        topVer
+                    );
+                }
+            }
+        }
+    }
 
-        CacheConfiguration ccfg = new CacheConfiguration(req.startCacheConfiguration());
+    /**
+     * @param cfg Start configuration.
+     * @param nearCfg Near configuration.
+     * @param clientStartOnly Client only start request.
+     * @param initiatingNodeId Initiating node ID.
+     * @param deploymentId Deployment ID.
+     */
+    private void prepareCacheStart(
+        CacheConfiguration cfg,
+        NearCacheConfiguration nearCfg,
+        boolean clientStartOnly,
+        UUID initiatingNodeId,
+        IgniteUuid deploymentId,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        CacheConfiguration ccfg = new CacheConfiguration(cfg);
 
         IgnitePredicate nodeFilter = ccfg.getNodeFilter();
 
         ClusterNode locNode = ctx.discovery().localNode();
 
-        if (req.isStart()) {
-            boolean affNodeStart = !req.clientStartOnly() && nodeFilter.apply(locNode);
-            boolean clientNodeStart = locNode.id().equals(req.initiatingNodeId());
+        boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode);
+        boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
 
-            if (affNodeStart || clientNodeStart) {
-                if (clientNodeStart && !affNodeStart) {
-                    if (req.nearCacheConfiguration() != null)
-                        ccfg.setNearConfiguration(req.nearCacheConfiguration());
-                }
+        if (affNodeStart || clientNodeStart) {
+            if (clientNodeStart && !affNodeStart) {
+                if (nearCfg != null)
+                    ccfg.setNearConfiguration(nearCfg);
+            }
 
-                CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null,
ccfg.getName(), ccfg);
+            CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, ccfg.getName(),
ccfg);
 
-                GridCacheContext cacheCtx = createCache(ccfg, cacheObjCtx);
+            GridCacheContext cacheCtx = createCache(ccfg, cacheObjCtx);
 
-                cacheCtx.dynamicDeploymentId(req.deploymentId());
+            cacheCtx.startTopologyVersion(topVer);
 
-                sharedCtx.addCacheContext(cacheCtx);
+            cacheCtx.dynamicDeploymentId(deploymentId);
 
-                startCache(cacheCtx.cache());
-                onKernalStart(cacheCtx.cache());
+            sharedCtx.addCacheContext(cacheCtx);
 
-                caches.put(maskNull(cacheCtx.name()), cacheCtx.cache());
-            }
+            startCache(cacheCtx.cache());
+            onKernalStart(cacheCtx.cache());
+
+            caches.put(maskNull(cacheCtx.name()), cacheCtx.cache());
         }
     }
 
@@ -1314,34 +1368,45 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * Callback invoked when first exchange future for dynamic cache is completed.
      *
-     * @param req Change request.
+     * @param topVer Completed topology version.
+     * @param reqs Change requests.
      */
     @SuppressWarnings("unchecked")
-    public void onExchangeDone(DynamicCacheChangeRequest req) {
-        String masked = maskNull(req.cacheName());
+    public void onExchangeDone(AffinityTopologyVersion topVer, Collection<DynamicCacheChangeRequest>
reqs) {
+        for (GridCacheAdapter<?, ?> cache : caches.values()) {
+            GridCacheContext<?, ?> cacheCtx = cache.context();
 
-        if (req.isStart()) {
-            GridCacheAdapter<?, ?> cache = caches.get(masked);
+            if (F.eq(cacheCtx.startTopologyVersion(), topVer)) {
+                cacheCtx.preloader().onInitialExchangeComplete(null);
+
+                String masked = maskNull(cacheCtx.name());
 
-            if (cache != null)
                 jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null,
false));
+            }
         }
-        else {
-            prepareCacheStop(req);
 
-            DynamicCacheDescriptor desc = registeredCaches.get(masked);
+        if (!F.isEmpty(reqs)) {
+            for (DynamicCacheChangeRequest req : reqs) {
+                String masked = maskNull(req.cacheName());
 
-            if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
-                registeredCaches.remove(masked, desc);
-        }
+                if (req.isStop()) {
+                    prepareCacheStop(req);
 
-        DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(masked);
+                    DynamicCacheDescriptor desc = registeredCaches.get(masked);
 
-        assert req.deploymentId() != null;
-        assert fut == null || fut.deploymentId != null;
+                    if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
+                        registeredCaches.remove(masked, desc);
+                }
 
-        if (fut != null && fut.deploymentId().equals(req.deploymentId()))
-            fut.onDone();
+                DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(masked);
+
+                assert req.deploymentId() != null;
+                assert fut == null || fut.deploymentId != null;
+
+                if (fut != null && fut.deploymentId().equals(req.deploymentId()))
+                    fut.onDone();
+            }
+        }
     }
 
     /**
@@ -1425,10 +1490,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                                     ccfg.getCacheMode() == LOCAL);
                             }
                         }
-                        else
-                            registeredCaches.put(maskNull(req.cacheName()), new DynamicCacheDescriptor(
+                        else {
+                            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
                                 ccfg,
-                                req.deploymentId()));
+                                req.deploymentId());
+
+                            // Received statically configured cache.
+                            if (req.initiatingNodeId() == null)
+                                desc.staticallyConfigured(true);
+
+                            registeredCaches.put(maskNull(req.cacheName()), desc);
+
+                            ctx.discovery().setCacheFilter(
+                                req.cacheName(),
+                                ccfg.getNodeFilter(),
+                                ccfg.getNearConfiguration() != null,
+                                ccfg.getCacheMode() == LOCAL);
+                        }
                     }
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84e31966/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a5bd7b6..19c609d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
@@ -414,8 +415,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 // will return corresponding nodes.
                 U.await(evtLatch);
 
-                if (!F.isEmpty(reqs))
-                    startCaches();
+                startCaches();
 
                 assert discoEvt != null;
 
@@ -576,10 +576,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * Starts dynamic caches.
      */
     private void startCaches() throws IgniteCheckedException {
-        for (DynamicCacheChangeRequest req : reqs) {
-            if (req.isStart())
-                cctx.cache().prepareCacheStart(req);
-        }
+        cctx.cache().prepareCachesStart(F.view(reqs, new IgnitePredicate<DynamicCacheChangeRequest>()
{
+            @Override public boolean apply(DynamicCacheChangeRequest req) {
+                return req.isStart();
+            }
+        }), exchId.topologyVersion());
     }
 
     /**
@@ -686,23 +687,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 if (!cacheCtx.isLocal())
                     cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10);
             }
-
-            if (!F.isEmpty(reqs)) {
-                for (DynamicCacheChangeRequest req : reqs) {
-                    if (F.eq(cacheCtx.name(), req.cacheName())) {
-                        if (req.isStart()) {
-                            if (!req.clientStartOnly() || cacheCtx.localNodeId().equals(req.initiatingNodeId()))
-                                cacheCtx.preloader().onInitialExchangeComplete(err);
-                        }
-                    }
-                }
-            }
         }
 
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs)
-                cctx.cache().onExchangeDone(req);
-        }
+        cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs);
 
         cctx.exchange().onExchangeDone(this);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84e31966/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStaticCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStaticCacheStartSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStaticCacheStartSelfTest.java
new file mode 100644
index 0000000..d34f01b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStaticCacheStartSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ * Tests cache deploy on topology from static configuration.
+ */
+public class IgniteStaticCacheStartSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME = "TestCache";
+
+    /** */
+    private boolean hasCache;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (hasCache) {
+            CacheConfiguration ccfg = new CacheConfiguration();
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+            ccfg.setBackups(1);
+            ccfg.setName(CACHE_NAME);
+            ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+            cfg.setCacheConfiguration(ccfg);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployCacheOnNodeStart() throws Exception {
+        startGrids(3);
+
+        try {
+            hasCache = true;
+
+            startGrid(3);
+
+            for (int i = 0; i < 4; i++) {
+                info("Checking ignite: " + i);
+
+                Ignite ignite = ignite(i);
+
+                IgniteCache<Object, Object> jcache = ignite.jcache(CACHE_NAME);
+
+                assertNotNull(jcache);
+
+                jcache.put(i, i);
+            }
+
+            hasCache = false;
+
+            startGrid(4);
+
+            for (int i = 0; i < 5; i++) {
+                info("Checking ignite: " + i);
+
+                Ignite ignite = ignite(i);
+
+                IgniteCache<Object, Object> jcache = ignite.jcache(CACHE_NAME);
+
+                assertNotNull(jcache);
+
+                if (i != 4)
+                    assertEquals(i, jcache.get(i));
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}


Mime
View raw message