ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/15] ignite git commit: ignite-1534 Fixed races in dynamic cache start
Date Thu, 01 Oct 2015 09:05:59 GMT
ignite-1534 Fixed races in dynamic cache start


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e250c7f5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e250c7f5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e250c7f5

Branch: refs/heads/ignite-1534-1
Commit: e250c7f5c42fcdde8c776242314963cc2e0a63b3
Parents: 2029b7b
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Oct 1 12:03:44 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Oct 1 12:03:44 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |   2 +-
 .../cache/DynamicCacheDescriptor.java           |  17 +++
 .../GridCachePartitionExchangeManager.java      |  72 +++--------
 .../processors/cache/GridCacheProcessor.java    |   8 +-
 .../distributed/IgniteCacheCreatePutTest.java   | 125 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 6 files changed, 165 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index aec36a2..1fe45b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -550,7 +550,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                         gridStartTime = getSpi().getGridStartTime();
 
                     updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()),
-                        new DiscoCache(localNode(), getSpi().getRemoteNodes()));
+                        new DiscoCache(localNode(), F.view(topSnapshot, F.remoteNodes(locNode.id()))));
 
                     startLatch.countDown();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 24df7e4..b100a31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -68,6 +68,9 @@ public class DynamicCacheDescriptor {
     /** */
     private AffinityTopologyVersion startTopVer;
 
+    /** */
+    private boolean rcvdOnDiscovery;
+
     /**
      * @param ctx Context.
      * @param cacheCfg Cache configuration.
@@ -236,6 +239,20 @@ public class DynamicCacheDescriptor {
         this.updatesAllowed = updatesAllowed;
     }
 
+    /**
+     * @return {@code True} if received in discovery data.
+     */
+    public boolean receivedOnDiscovery() {
+        return rcvdOnDiscovery;
+    }
+
+    /**
+     * @param rcvdOnDiscovery {@code True} if received in discovery data.
+     */
+    public void receivedOnDiscovery(boolean rcvdOnDiscovery) {
+        this.rcvdOnDiscovery = rcvdOnDiscovery;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e77e0d..adc2174 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -105,18 +105,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Partition resend timeout after eviction. */
     private final long partResendTimeout = getLong(IGNITE_PRELOAD_RESEND_TIMEOUT, DFLT_PRELOAD_RESEND_TIMEOUT);
 
-    /** Latch which completes after local exchange future is created. */
-    private GridFutureAdapter<?> locExchFut;
-
     /** */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
     /** Last partition refresh. */
     private final AtomicLong lastRefresh = new AtomicLong(-1);
 
-    /** Pending futures. */
-    private final Queue<GridDhtPartitionsExchangeFuture> pendingExchangeFuts = new
ConcurrentLinkedQueue<>();
-
     /** */
     @GridToStringInclude
     private ExchangeWorker exchWorker;
@@ -229,31 +223,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
 
                 if (exchId != null) {
-                    // Start exchange process.
-                    pendingExchangeFuts.add(exchFut);
+                    if (log.isDebugEnabled())
+                        log.debug("Discovery event (will start exchange): " + exchId);
 
                     // Event callback - without this callback future will never complete.
                     exchFut.onEvent(exchId, e);
 
+                    // Start exchange process.
+                    addFuture(exchFut);
+                }
+                else {
                     if (log.isDebugEnabled())
-                        log.debug("Discovery event (will start exchange): " + exchId);
-
-                    locExchFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                        @Override public void apply(IgniteInternalFuture<?> t) {
-                            if (!enterBusy())
-                                return;
-
-                            try {
-                                // Unwind in the order of discovery events.
-                                for (GridDhtPartitionsExchangeFuture f = pendingExchangeFuts.poll();
f != null;
-                                    f = pendingExchangeFuts.poll())
-                                    addFuture(f);
-                            }
-                            finally {
-                                leaveBusy();
-                            }
-                        }
-                    });
+                        log.debug("Do not start exchange for discovery event: " + evt);
                 }
             }
             finally {
@@ -266,8 +247,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        locExchFut = new GridFutureAdapter<>();
-
         exchWorker = new ExchangeWorker();
 
         cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT,
EVT_NODE_FAILED,
@@ -328,12 +307,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
-
-        onDiscoveryEvent(cctx.localNodeId(), fut);
+        exchWorker.futQ.addFirst(fut);
 
-        // Allow discovery events to get processed.
-        locExchFut.onDone();
+        new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
 
         if (reconnect) {
             fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
@@ -382,8 +358,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
             }
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                cacheCtx.preloader().onInitialExchangeComplete(null);
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (cacheCtx.startTopologyVersion() == null)
+                    cacheCtx.preloader().onInitialExchangeComplete(null);
+            }
 
             if (log.isDebugEnabled())
                 log.debug("Finished waiting for initial exchange: " + fut.exchangeId());
@@ -414,12 +392,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         for (AffinityReadyFuture f : readyFuts.values())
             f.onDone(stopErr);
 
-        for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts)
-            f.onDone(stopErr);
-
-        if (locExchFut != null)
-            locExchFut.onDone(stopErr);
-
         U.cancel(exchWorker);
 
         if (log.isDebugEnabled())
@@ -583,22 +555,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param nodeId New node ID.
-     * @param fut Exchange future.
-     */
-    void onDiscoveryEvent(UUID nodeId, GridDhtPartitionsExchangeFuture fut) {
-        if (!enterBusy())
-            return;
-
-        try {
-            addFuture(fut);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
      * @param evt Discovery event.
      * @return Affinity topology version.
      */
@@ -1033,7 +989,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         U.warn(log, "Pending exchange futures:");
 
-        for (GridDhtPartitionsExchangeFuture fut : pendingExchangeFuts)
+        for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
             U.warn(log, ">>> " + fut);
 
         ExchangeFutureSet exchFuts = this.exchFuts;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5591fa6..2dad84e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -805,7 +805,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 boolean loc = desc.locallyConfigured();
 
-                if (loc || CU.affinityNode(locNode, filter)) {
+                if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode,
filter))) {
                     CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
                     CachePluginManager pluginMgr = desc.pluginManager();
@@ -1958,7 +1958,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             if (req.initiatingNodeId() == null)
                                 desc.staticallyConfigured(true);
 
-                            registeredCaches.put(maskNull(req.cacheName()), desc);
+                            DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()),
desc);
+
+                            assert old == null : old;
+
+                            desc.receivedOnDiscovery(true);
 
                             ctx.discovery().setCacheFilter(
                                 req.cacheName(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
new file mode 100644
index 0000000..8b3d9d3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        OptimizedMarshaller marsh = new OptimizedMarshaller();
+        marsh.setRequireSerializable(false);
+
+        cfg.setMarshaller(marsh);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName("cache*");
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 3 * 60 * 1000L;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartNodes() throws Exception {
+        long stopTime = System.currentTimeMillis() + 2 * 60_000;
+
+        try {
+            int iter = 0;
+
+            while (System.currentTimeMillis() < stopTime) {
+                log.info("Iteration: " + iter++);
+
+                try {
+                    final AtomicInteger idx = new AtomicInteger();
+
+                    GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            int node = idx.getAndIncrement();
+
+                            Ignite ignite = startGrid(node);
+
+                            IgniteCache<Object, Object> cache = ignite.getOrCreateCache("cache1");
+
+                            assertNotNull(cache);
+
+                            for (int i = 0; i < 100; i++)
+                                cache.put(i, i);
+
+                            return null;
+                        }
+                    }, GRID_CNT, "start");
+                }
+                finally {
+                    stopAllGrids();
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index f8c9d26..228d99c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest
 import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -205,6 +206,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(IgniteDynamicCacheStartNoExchangeTimeoutTest.class);
         suite.addTestSuite(CacheAffinityEarlyTest.class);
         suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class);
+        suite.addTestSuite(IgniteCacheCreatePutTest.class);
 
         suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
 


Mime
View raw message