ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1534 Fixed races in dynamic cache start
Date Thu, 01 Oct 2015 13:31:50 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1534-1 a586686c4 -> 0f083fd18


ignite-1534 Fixed races in dynamic cache start


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f083fd1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f083fd1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f083fd1

Branch: refs/heads/ignite-1534-1
Commit: 0f083fd183cfdebde0f9c91244ee434180cd0ccf
Parents: a586686
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Oct 1 13:48:19 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Oct 1 16:31:00 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |   2 +-
 .../processors/cache/GridCacheMvccManager.java  |  20 ++-
 .../cache/distributed/dht/GridDhtGetFuture.java |   4 +-
 .../dht/GridPartitionedGetFuture.java           |   5 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   9 +-
 .../distributed/near/GridNearGetFuture.java     |   2 +
 .../CacheGetFutureHangsSelfTest.java            | 156 +++++++++----------
 .../testsuites/IgniteCacheTestSuite4.java       |   3 +
 8 files changed, 108 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 5385dec..3a1cee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1848,7 +1848,7 @@ public class GridCacheContext<K, V> implements Externalizable
{
         boolean deserializePortable,
         boolean cpy) {
         assert key != null;
-        assert val != null;
+        assert val != null || skipVals;
 
         if (!keepCacheObjects) {
             Object key0 = key.value(cacheObjCtx, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dd51da2..0960c9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -391,13 +391,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
     /**
      * @param futVer Future ID.
      * @param fut Future.
+     * @return {@code False} if future was forcibly completed with error.
      */
-    public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut)
{
+    public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?>
fut) {
         IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
 
         assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut
+ ", old=" + old + ']';
 
-        onFutureAdded(fut);
+        return onFutureAdded(fut);
     }
 
     /**
@@ -529,12 +530,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
 
     /**
      * @param fut Future.
+     * @return {@code False} if future was forcibly completed with error.
      */
-    private void onFutureAdded(IgniteInternalFuture<?> fut) {
-        if (stopping)
+    private boolean onFutureAdded(IgniteInternalFuture<?> fut) {
+        if (stopping) {
             ((GridFutureAdapter)fut).onDone(stopError());
-        else if (cctx.kernalContext().clientDisconnected())
+
+            return false;
+        }
+        else if (cctx.kernalContext().clientDisconnected()) {
             ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+
+            return false;
+        }
+
+        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 76aaf72..a67b1de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -447,8 +447,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
 
                             if (v == null)
                                 it.remove();
-                            else if (!skipVals)
-                                info.value((CacheObject)v);
+                            else
+                                info.value(skipVals ? null : (CacheObject)v);
                         }
 
                         return infos;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 0202c53..abbe7b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -587,8 +587,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         if (keysSize != 0) {
             Map<K, V> map = new GridLeanMap<>(keysSize);
 
-            for (GridCacheEntryInfo info : infos)
+            for (GridCacheEntryInfo info : infos) {
+                assert skipVals == (info.value() == null);
+
                 cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable,
false);
+            }
 
             return map;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index fb2c5ad..41df53a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -825,8 +825,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                 futVer = cctx.versions().next(topVer);
 
-                if (storeFuture())
-                    cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
+                if (storeFuture()) {
+                    if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this))
{
+                        assert isDone() : GridNearAtomicUpdateFuture.this;
+
+                        return;
+                    }
+                }
 
                 // Assign version on near node in CLOCK ordering mode even if fastMap is
false.
                 if (updVer == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index a7875f6..d9763f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -703,6 +703,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     CacheObject val = info.value();
                     KeyCacheObject key = info.key();
 
+                    assert skipVals == (info.value() == null);
+
                     cctx.addResult(map, key, val, skipVals, false, deserializePortable, false);
                 }
                 catch (GridCacheEntryRemovedException ignore) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
index 8e8447e..e8622aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
@@ -18,23 +18,21 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
@@ -45,22 +43,14 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest
{
     /** Grid count. */
     private static final int GRID_CNT = 8;
 
-    /** Grids. */
-    private static Ignite[] grids;
+    /** */
+    private AtomicReferenceArray<Ignite> nodes;
 
-    /** Ids. */
-    private static String[] ids;
-
-    /** Flags. */
-    private static AtomicBoolean[] flags;
-
-    /** Futs. */
-    private static Collection<IgniteInternalFuture> futs;
-
-    /** Alive grids. */
-    private static Set<Integer> aliveGrids;
+    /** */
+    private volatile boolean done;
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
@@ -81,17 +71,27 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest
{
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 5 * 60_000;
+    }
+
     /**
      * @throws Exception If failed.
      */
-    public void testFailover() throws Exception {
-        int cnt = 10;
+    public void testContainsKeyFailover() throws Exception {
+        int cnt = 3;
 
         for (int i = 0; i < cnt; i++) {
             try {
-                U.debug("*** Iteration " + (i + 1) + '/' + cnt);
-
-                init();
+                log.info("Iteration: " + (i + 1) + '/' + cnt);
 
                 doTestFailover();
             }
@@ -102,54 +102,34 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * Initializes test.
-     */
-    private void init() {
-        grids = new Ignite[GRID_CNT + 1];
-
-        ids = new String[GRID_CNT + 1];
-
-        aliveGrids = new HashSet<>();
-
-        flags = new AtomicBoolean[GRID_CNT + 1];
-
-        futs = new ArrayList<>();
-    }
-
-    /**
      * Executes one test iteration.
+     * @throws Exception If failed.
      */
     private void doTestFailover() throws Exception {
         try {
-            for (int i = 0; i < GRID_CNT + 1; i++) {
-                final IgniteEx grid = startGrid(i);
+            done = false;
 
-                grids[i] = grid;
+            nodes = new AtomicReferenceArray<>(GRID_CNT);
 
-                ids[i] = grid.localNode().id().toString();
+            startGridsMultiThreaded(GRID_CNT, false);
 
-                aliveGrids.add(i);
+            for (int i = 0; i < GRID_CNT ; i++)
+                assertTrue(nodes.compareAndSet(i, null, ignite(i)));
 
-                flags[i] = new AtomicBoolean();
-            }
+            List<IgniteInternalFuture> futs = new ArrayList<>();
 
             for (int i = 0; i < GRID_CNT + 1; i++) {
-                final int gridIdx = i;
-
                 futs.add(multithreadedAsync(new Runnable() {
                     @Override public void run() {
-                        IgniteCache cache = grids[gridIdx].cache(null);
+                        T2<Ignite, Integer> ignite;
 
-                        while (!flags[gridIdx].get()) {
-                            int idx = ThreadLocalRandom.current().nextInt(GRID_CNT + 1);
+                        while ((ignite = randomNode()) != null) {
+                            IgniteCache<Object, Object> cache = ignite.get1().cache(null);
 
-                            String id = ids[idx];
+                            for (int i = 0; i < 100; i++)
+                                cache.containsKey(ThreadLocalRandom.current().nextInt(100_000));
 
-                            if (id != null /*&& grids[gridIdx] != null*/) {
-                                //U.debug("!!! Grid containsKey start " + gridIdx);
-                                cache.containsKey(id);
-                                //U.debug("!!! Grid containsKey finished " + gridIdx);
-                            }
+                            assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1()));
 
                             try {
                                 Thread.sleep(ThreadLocalRandom.current().nextLong(50));
@@ -163,18 +143,15 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest
{
 
                 futs.add(multithreadedAsync(new Runnable() {
                     @Override public void run() {
-                        IgniteCache cache = grids[gridIdx].cache(null);
+                        T2<Ignite, Integer> ignite;
 
-                        while (!flags[gridIdx].get()) {
-                            int idx = ThreadLocalRandom.current().nextInt(GRID_CNT + 1);
+                        while ((ignite = randomNode()) != null) {
+                            IgniteCache<Object, Object> cache = ignite.get1().cache(null);
 
-                            String id = ids[idx];
+                            for (int i = 0; i < 100; i++)
+                                cache.put(ThreadLocalRandom.current().nextInt(100_000), UUID.randomUUID());
 
-                            if (id != null /*&& grids[gridIdx] != null*/) {
-                                //U.debug("!!! Grid put start " + gridIdx);
-                                cache.put(id, UUID.randomUUID());
-                                //U.debug("!!! Grid put finished " + gridIdx);
-                            }
+                            assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1()));
 
                             try {
                                 Thread.sleep(ThreadLocalRandom.current().nextLong(50));
@@ -187,35 +164,50 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest
{
                 }, 1, "put-thread-" + i));
             }
 
-            while (aliveGrids.size() > 1) {
-                final int gridToKill = ThreadLocalRandom.current().nextInt(GRID_CNT) + 1;
+            try {
+                int aliveGrids = GRID_CNT;
 
-                if (gridToKill > 0 && grids[gridToKill] != null) {
-                    U.debug("!!! Trying to kill grid " + gridToKill);
+                while (aliveGrids > 0) {
+                    T2<Ignite, Integer> ignite = randomNode();
 
-                    //synchronized (mons[gridToKill]) {
-                        U.debug("!!! Grid stop start " + gridToKill);
+                    assert ignite != null;
 
-                        grids[gridToKill].close();
+                    Ignite ignite0 = ignite.get1();
 
-                        aliveGrids.remove(gridToKill);
+                    log.info("Stop node: " + ignite0.name());
 
-                        grids[gridToKill] = null;
+                    ignite0.close();
 
-                        flags[gridToKill].set(true);
+                    log.info("Node stop finished: " + ignite0.name());
 
-                        U.debug("!!! Grid stop finished " + gridToKill);
-                    //}
+                    aliveGrids--;
                 }
             }
+            finally {
+                done = true;
+            }
 
-            Thread.sleep(ThreadLocalRandom.current().nextLong(100));
+            for (IgniteInternalFuture fut : futs)
+                fut.get();
         }
         finally {
-            flags[0].set(true);
+            done = true;
+        }
+    }
 
-            for (IgniteInternalFuture fut : futs)
-                fut.get();
+    /**
+     * @return Random node and its index.
+     */
+    @Nullable private T2<Ignite, Integer> randomNode() {
+        while (!done) {
+            int idx = ThreadLocalRandom.current().nextInt(GRID_CNT);
+
+            Ignite ignite = nodes.get(idx);
+
+            if (ignite != null && nodes.compareAndSet(idx, ignite, null))
+                return new T2<>(ignite, idx);
         }
+
+        return null;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 228d99c..b89bffd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction
 import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
@@ -280,6 +281,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CrossCacheLockTest.class);
         suite.addTestSuite(IgniteCrossCacheTxSelfTest.class);
 
+        suite.addTestSuite(CacheGetFutureHangsSelfTest.class);
+
         return suite;
     }
 }
\ No newline at end of file


Mime
View raw message