ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-801: properly processing cluster topology exception when atomic stamped value is being modified. Finished refactoring the tests.
Date Mon, 14 Sep 2015 10:33:25 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-801 de632ace6 -> df42d4a94


ignite-801: properly processing cluster topology exception when atomic stamped value is being modified. Finished refactoring the tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/df42d4a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/df42d4a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/df42d4a9

Branch: refs/heads/ignite-801
Commit: df42d4a944178999594fd79fbe0ce45db4596442
Parents: de632ac
Author: Denis Magda <dmagda@gridgain.com>
Authored: Mon Sep 14 13:33:16 2015 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Mon Sep 14 13:33:16 2015 +0300

----------------------------------------------------------------------
 .../datastructures/DataStructuresProcessor.java |  23 +-
 .../GridCacheCountDownLatchImpl.java            |   1 +
 ...eAbstractDataStructuresFailoverSelfTest.java | 754 +++++++------------
 ...edOffheapDataStructuresFailoverSelfTest.java |   2 -
 ...eplicatedDataStructuresFailoverSelfTest.java |   5 -
 5 files changed, 286 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index a5561e9..ef66635 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -489,21 +489,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         if (dataStructure != null)
             return dataStructure;
 
-        if (!create)
-            return c.applyx();
-
         while (true) {
-            try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+            try {
+                if (!create)
+                    return c.applyx();
 
-                if (err != null)
-                    throw err;
+                try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                    err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
 
-                dataStructure = c.applyx();
+                    if (err != null)
+                        throw err;
 
-                tx.commit();
+                    dataStructure = c.applyx();
+
+                    tx.commit();
 
-                return dataStructure;
+                    return dataStructure;
+                }
             }
             catch (ClusterTopologyCheckedException e) {
                 IgniteInternalFuture<?> fut = e.retryReadyFuture();
@@ -513,6 +515,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
             catch (IgniteTxRollbackCheckedException ignore) {
                 // Safe to retry right away.
             }
+
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index cdd5f90..01d8c58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -344,6 +344,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
         @Override public Integer call() throws Exception {
             Integer val;
 
+            //REMOVE TR
             try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
                 GridCacheCountDownLatchValue latchVal = latchView.get(key);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 2fd40f6..0b12d63 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.datastructures;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicReference;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.GridTestUtils;
 
@@ -65,6 +68,15 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /** */
     private static final int TOP_CHANGE_THREAD_CNT = 3;
 
+    /** */
+    private static final int TOP_CHANGED_ERR_RETRY_CNT = 5;
+
+    /** */
+    private static final long TOP_CHANGED_ERR_RETRY_TIMEOUT = 3000;
+
+    /** */
+    private static final long READY_FUTURE_WAIT_TIMEOUT = 10_000;
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return TEST_TIMEOUT;
@@ -127,13 +139,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
         try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) {
             Ignite g = startGrid(NEW_GRID_NAME);
 
-            assert g.atomicLong(STRUCTURE_NAME, 10, true).get() == 10;
+            assertEquals(10, g.atomicLong(STRUCTURE_NAME, 10, false).get());
 
-            assert g.atomicLong(STRUCTURE_NAME, 10, true).addAndGet(10) == 20;
+            assertEquals(20, g.atomicLong(STRUCTURE_NAME, 10, false).addAndGet(10));
 
             stopGrid(NEW_GRID_NAME);
 
-            assert grid(0).atomicLong(STRUCTURE_NAME, 10, true).get() == 20;
+            assertEquals(20, grid(0).atomicLong(STRUCTURE_NAME, 10, true).get());
         }
     }
 
@@ -141,97 +153,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicLongConstantTopologyChange() throws Exception {
-        try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override
-                public void apply() {
-                    try {
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            String name = UUID.randomUUID().toString();
-
-                            try {
-                                Ignite g = startGrid(name);
-
-                                assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
-                }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
-            long val = s.get();
-
-            while (!fut.isDone()) {
-                assert s.get() == val;
-
-                assert s.incrementAndGet() == val + 1;
-
-                val++;
-            }
-
-            fut.get();
-
-            for (Ignite g : G.allGrids())
-                assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get());
-        }
+        doTestAtomicLong(new ConstantTopologyChangeWorker());
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
-        try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override public void apply() {
-                    try {
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            Collection<String> names = new GridLeanSet<>(3);
-
-                            try {
-                                for (int j = 0; j < 3; j++) {
-                                    String name = UUID.randomUUID().toString();
-
-                                    names.add(name);
+        doTestAtomicLong(new ConstantMultipleTopologyChangeWorker());
+    }
 
-                                    Ignite g = startGrid(name);
+    /**
+     * Tests IgniteAtomicLong.
+     *
+     * @param topWorker Topology change worker.
+     * @throws Exception If failed.
+     */
+    private void doTestAtomicLong(ConstantTopologyChangeWorker topWorker) throws Exception {
+        try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
+            IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+                @Override public Object apply(Ignite ignite) {
+                    assert ignite.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
 
-                                    assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
-                                }
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    for (String name : names)
-                                        stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
+                    return null;
                 }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+            });
 
             long val = s.get();
 
             while (!fut.isDone()) {
-                assert s.get() == val;
+                assertEquals(val, s.get());
 
-                assert s.incrementAndGet() == val + 1;
-
-                val++;
+                assertEquals(++val, s.incrementAndGet());
             }
 
             fut.get();
 
             for (Ignite g : G.allGrids())
-                assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get());
+                assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, false).get());
         }
     }
 
@@ -242,13 +201,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
         try (IgniteAtomicReference atomic = grid(0).atomicReference(STRUCTURE_NAME, 10, true)) {
             Ignite g = startGrid(NEW_GRID_NAME);
 
-            assert g.atomicReference(STRUCTURE_NAME, 10, true).get() == 10;
+            assertEquals((Integer)10, g.atomicReference(STRUCTURE_NAME, 10, false).get());
 
-            g.atomicReference(STRUCTURE_NAME, 10, true).set(20);
+            g.atomicReference(STRUCTURE_NAME, 10, false).set(20);
 
             stopGrid(NEW_GRID_NAME);
 
-            assertEquals(20, (int) grid(0).atomicReference(STRUCTURE_NAME, 10, true).get());
+            assertEquals((Integer)20, grid(0).atomicReference(STRUCTURE_NAME, 10, true).get());
         }
     }
 
@@ -256,85 +215,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicReferenceConstantTopologyChange() throws Exception {
-        try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override
-                public void apply() {
-                    try {
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            String name = UUID.randomUUID().toString();
-
-                            try {
-                                Ignite g = startGrid(name);
-
-                                assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0;
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
-                }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
-            int val = s.get();
-
-            while (!fut.isDone()) {
-                assert s.get() == val;
-
-                s.set(++val);
-            }
-
-            fut.get();
-
-            for (Ignite g : G.allGrids())
-                assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get());
-        }
+        doTestAtomicReference(new ConstantTopologyChangeWorker());
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception {
-        try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override public void apply() {
-                    try {
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            Collection<String> names = new GridLeanSet<>(3);
-
-                            try {
-                                for (int j = 0; j < 3; j++) {
-                                    String name = UUID.randomUUID().toString();
-
-                                    names.add(name);
+        doTestAtomicReference(new ConstantMultipleTopologyChangeWorker());
+    }
 
-                                    Ignite g = startGrid(name);
+    /**
+     * Tests atomic reference.
+     *
+     * @param topWorker Topology change worker.
+     * @throws Exception If failed.
+     */
+    private void doTestAtomicReference(ConstantTopologyChangeWorker topWorker) throws Exception {
+        try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
+            IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+                @Override public Object apply(Ignite ignite) {
+                    assert ignite.atomicReference(STRUCTURE_NAME, 1, false).get() > 0;
 
-                                    assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0;
-                                }
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    for (String name : names)
-                                        stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
+                    return null;
                 }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+            });
 
             int val = s.get();
 
             while (!fut.isDone()) {
-                assert s.get() == val;
+                assertEquals(val, (int)s.get());
 
                 s.set(++val);
             }
@@ -342,7 +252,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             fut.get();
 
             for (Ignite g : G.allGrids())
-                assert g.atomicReference(STRUCTURE_NAME, 1, true).get() == val;
+                assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get());
         }
     }
 
@@ -353,19 +263,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
         try (IgniteAtomicStamped atomic = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true)) {
             Ignite g = startGrid(NEW_GRID_NAME);
 
-            IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, true).get();
+            IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, false).get();
 
-            assert t.get1() == 10;
-            assert t.get2() == 10;
+            assertEquals((Integer)10, t.get1());
+            assertEquals((Integer)10, t.get2());
 
-            g.atomicStamped(STRUCTURE_NAME, 10, 10, true).set(20, 20);
+            g.atomicStamped(STRUCTURE_NAME, 10, 10, false).set(20, 20);
 
             stopGrid(NEW_GRID_NAME);
 
-            t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true).get();
+            t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, false).get();
 
-            assert t.get1() == 20;
-            assert t.get2() == 20;
+            assertEquals((Integer)20, t.get1());
+            assertEquals((Integer)20, t.get2());
         }
     }
 
@@ -373,107 +283,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicStampedConstantTopologyChange() throws Exception {
-        try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override
-                public void apply() {
-                    try {
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            String name = UUID.randomUUID().toString();
-
-                            try {
-                                Ignite g = startGrid(name);
-
-                                IgniteBiTuple<Integer, Integer> t =
-                                    g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
-
-                                assert t.get1() > 0;
-                                assert t.get2() > 0;
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
-                }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
-            int val = s.value();
-
-            while (!fut.isDone()) {
-                IgniteBiTuple<Integer, Integer> t = s.get();
-
-                assert t.get1() == val;
-                assert t.get2() == val;
-
-                val++;
-
-                s.set(val, val);
-            }
-
-            fut.get();
-
-            for (Ignite g : G.allGrids()) {
-                IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
-
-                assert t.get1() == val;
-                assert t.get2() == val;
-            }
-        }
+        doTestAtomicStamped(new ConstantTopologyChangeWorker());
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicStampedConstantMultipleTopologyChange() throws Exception {
-        try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override public void apply() {
-                    try {
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            Collection<String> names = new GridLeanSet<>(3);
-
-                            try {
-                                for (int j = 0; j < 3; j++) {
-                                    String name = UUID.randomUUID().toString();
-
-                                    names.add(name);
+        doTestAtomicStamped(new ConstantMultipleTopologyChangeWorker());
+    }
 
-                                    Ignite g = startGrid(name);
+    /**
+     * Tests atomic stamped value.
+     *
+     * @param topWorker Topology change worker.
+     * @throws Exception If failed.
+     */
+    private void doTestAtomicStamped(ConstantTopologyChangeWorker topWorker) throws Exception {
+        try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
+            IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+                @Override public Object apply(Ignite ignite) {
+                    IgniteBiTuple<Integer, Integer> t = ignite.atomicStamped(STRUCTURE_NAME, 1, 1, false).get();
 
-                                    IgniteBiTuple<Integer, Integer> t =
-                                        g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
+                    assert t.get1() > 0;
+                    assert t.get2() > 0;
 
-                                    assert t.get1() > 0;
-                                    assert t.get2() > 0;
-                                }
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    for (String name : names)
-                                        stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
+                    return null;
                 }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+            });
 
             int val = s.value();
 
             while (!fut.isDone()) {
                 IgniteBiTuple<Integer, Integer> t = s.get();
 
-                assert t.get1() == val;
-                assert t.get2() == val;
+                assertEquals(val, (int)t.get1());
+                assertEquals(val, (int)t.get2());
 
-                val++;
+                ++val;
 
                 s.set(val, val);
             }
@@ -481,10 +328,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             fut.get();
 
             for (Ignite g : G.allGrids()) {
-                IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
+                IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, false).get();
 
-                assert t.get1() == val;
-                assert t.get2() == val;
+                assertEquals(val, (int)t.get1());
+                assertEquals(val, (int)t.get2());
             }
         }
     }
@@ -497,16 +344,16 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             try {
                 Ignite g = startGrid(NEW_GRID_NAME);
 
-                assert g.countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 20;
+                assertEquals(20, g.countDownLatch(STRUCTURE_NAME, 20, true, false).count());
 
-                g.countDownLatch(STRUCTURE_NAME, 20, true, true).countDown(10);
+                g.countDownLatch(STRUCTURE_NAME, 20, true, false).countDown(10);
 
                 stopGrid(NEW_GRID_NAME);
 
-                assert grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 10;
+                assertEquals(10, grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).count());
             }
             finally {
-                grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).countDownAll();
+                grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).countDownAll();
             }
         }
     }
@@ -515,102 +362,45 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testCountDownLatchConstantTopologyChange() throws Exception {
-        try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
-            try {
-                IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                    @Override public void apply() {
-                        try {
-                            for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                                String name = UUID.randomUUID().toString();
-
-                                try {
-                                    Ignite g = startGrid(name);
-
-                                    assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null;
-                                }
-                                finally {
-                                    if (i != TOP_CHANGE_CNT - 1)
-                                        stopGrid(name);
-                                }
-                            }
-                        }
-                        catch (Exception e) {
-                            throw F.wrap(e);
-                        }
-                    }
-                }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
-                int val = s.count();
-
-                while (!fut.isDone()) {
-                    assert s.count() == val;
-
-                    assert s.countDown() == val - 1;
-
-                    val--;
-                }
-
-                fut.get();
-
-                for (Ignite g : G.allGrids())
-                    assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val;
-            }
-            finally {
-                grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll();
-            }
-        }
+        doTestCountDownLatch(new ConstantTopologyChangeWorker());
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
+        doTestCountDownLatch(new ConstantMultipleTopologyChangeWorker());
+    }
+
+    /**
+     * Tests distributed count down latch.
+     *
+     * @param topWorker Topology change worker.
+     * @throws Exception If failed.
+     */
+    private void doTestCountDownLatch(ConstantTopologyChangeWorker topWorker) throws Exception {
         try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
             try {
-                IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                    @Override public void apply() {
-                        try {
-                            for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                                Collection<String> names = new GridLeanSet<>(3);
-
-                                try {
-                                    for (int j = 0; j < 3; j++) {
-                                        String name = UUID.randomUUID().toString();
-
-                                        names.add(name);
-
-                                        Ignite g = startGrid(name);
+                IgniteInternalFuture<?> fut = topWorker.startChangingTopology(
+                    new IgniteClosure<Ignite, Object>() {
+                        @Override public Object apply(Ignite ignite) {
+                            assert ignite.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() > 0;
 
-                                        assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null;
-                                    }
-                                }
-                                finally {
-                                    if (i != TOP_CHANGE_CNT - 1)
-                                        for (String name : names)
-                                            stopGrid(name);
-                                }
-                            }
-                        }
-                        catch (Exception e) {
-                            throw F.wrap(e);
+                            return null;
                         }
-                    }
-                }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+                    });
 
                 int val = s.count();
 
                 while (!fut.isDone()) {
-                    assert s.count() == val;
-
-                    assert s.countDown() == val - 1;
-
-                    val--;
+                    assertEquals(val, s.count());
+                    assertEquals(--val, s.countDown());
                 }
 
                 fut.get();
 
                 for (Ignite g : G.allGrids())
-                    assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count());
+                    assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count());
             }
             finally {
                 grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll();
@@ -627,13 +417,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
             Ignite g = startGrid(NEW_GRID_NAME);
 
-            assert g.<Integer>queue(STRUCTURE_NAME, 0, null).poll() == 10;
+            assertEquals(10, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).poll());
 
             g.queue(STRUCTURE_NAME, 0, null).put(20);
 
             stopGrid(NEW_GRID_NAME);
 
-            assert grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek() == 20;
+            assertEquals(20, (int)grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek());
         }
         finally {
             grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).close();
@@ -644,31 +434,33 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testQueueConstantTopologyChange() throws Exception {
+        doTestQueue(new ConstantTopologyChangeWorker());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueConstantMultipleTopologyChange() throws Exception {
+        doTestQueue(new ConstantMultipleTopologyChangeWorker());
+    }
+
+    /**
+     * Tests the queue.
+     *
+     * @param topWorker Topology change worker.
+     * @throws Exception If failed.
+     */
+    private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception {
         try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
             s.put(1);
 
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override public void apply() {
-                    try {
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            String name = UUID.randomUUID().toString();
-
-                            try {
-                                Ignite g = startGrid(name);
+            IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+                @Override public Object apply(Ignite ignite) {
+                    assert ignite.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
 
-                                assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
+                    return null;
                 }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+            });
 
             int val = s.peek();
 
@@ -680,71 +472,71 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             fut.get();
 
             for (Ignite g : G.allGrids())
-                assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
+                assertEquals(origVal, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek());
         }
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testQueueConstantMultipleTopologyChange() throws Exception {
-        try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
-            s.put(1);
+    public void testAtomicSequenceInitialization() throws Exception {
+        int threadCnt = 3;
 
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override public void apply() {
-                    try {
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            Collection<String> names = new GridLeanSet<>(3);
+        final AtomicInteger idx = new AtomicInteger(gridCount());
 
-                            try {
-                                for (int j = 0; j < 3; j++) {
-                                    String name = UUID.randomUUID().toString();
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+            @Override public void apply() {
+                int id = idx.getAndIncrement();
 
-                                    names.add(name);
+                try {
+                    startGrid(id);
 
-                                    Ignite g = startGrid(name);
+                    Thread.sleep(1000);
 
-                                    assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
-                                }
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    for (String name : names)
-                                        stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
                 }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+                catch (Exception e) {
+                    throw F.wrap(e);
+                }
+                finally {
+                    stopGrid(id);
 
-            int val = s.peek();
+                    info("Thread finished.");
+                }
+            }
+        }, threadCnt, "test-thread");
 
-            int origVal = val;
+        while (!fut.isDone()) {
+            grid(0).compute().call(new IgniteCallable<Object>() {
+                /** */
+                @IgniteInstanceResource
+                private Ignite g;
 
-            while (!fut.isDone())
-                s.put(++val);
+                @Override public Object call() throws Exception {
+                    IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
 
-            fut.get();
+                    assert seq != null;
 
-            for (Ignite g : G.allGrids())
-                assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
+                    for (int i = 0; i < 1000; i++)
+                        seq.getAndIncrement();
+
+                    return null;
+                }
+            });
         }
+
+        fut.get();
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicSequenceTopologyChange() throws Exception {
-        try (IgniteAtomicSequence s = grid().atomicSequence(STRUCTURE_NAME, 10, true)) {
+        try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 10, true)) {
             Ignite g = startGrid(NEW_GRID_NAME);
 
-            assert g.atomicSequence(STRUCTURE_NAME, 10, false).get() == 1010;
+            assertEquals(1010, g.atomicSequence(STRUCTURE_NAME, 10, false).get());
 
-            assert g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10) == 1020;
+            assertEquals(1020, g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10));
 
             stopGrid(NEW_GRID_NAME);
         }
@@ -754,29 +546,31 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testAtomicSequenceConstantTopologyChange() throws Exception {
-        try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-                @Override public void apply() {
-                    try {
-                        String name = UUID.randomUUID().toString();
+        doTestAtomicSequence(new ConstantTopologyChangeWorker());
+    }
 
-                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            try {
-                                Ignite g = startGrid(name);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
+        doTestAtomicSequence(new ConstantMultipleTopologyChangeWorker());
+    }
 
-                                assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
-                            }
-                            finally {
-                                if (i != TOP_CHANGE_CNT - 1)
-                                    stopGrid(name);
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        throw F.wrap(e);
-                    }
+    /**
+     * Tests atomic sequence.
+     *
+     * @param topWorker Topology change worker.
+     * @throws Exception If failed.
+     */
+    private void doTestAtomicSequence(ConstantTopologyChangeWorker topWorker) throws Exception {
+        try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
+            IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+                @Override public Object apply(Ignite ignite) {
+                    assertTrue(ignite.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
+
+                    return null;
                 }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+            });
 
             long old = s.get();
 
@@ -797,135 +591,131 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /**
      * @throws Exception If failed.
      */
-    public void testAtomicSequenceInitialization() throws Exception {
-        int threadCnt = 3;
+    public void testUncommitedTxLeave() throws Exception {
+        final int val = 10;
 
-        final AtomicInteger idx = new AtomicInteger(gridCount());
+        grid(0).atomicLong(STRUCTURE_NAME, val, true);
 
-        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
-            @Override public void apply() {
-                int id = idx.getAndIncrement();
+        GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Ignite g = startGrid(NEW_GRID_NAME);
 
                 try {
-                    startGrid(id);
+                    g.transactions().txStart();
 
-                    Thread.sleep(1000);
+                    g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1);
 
-                }
-                catch (Exception e) {
-                    throw F.wrap(e);
+                    assertEquals(val + 1, g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet());
                 }
                 finally {
-                    stopGrid(id);
-
-                    info("Thread finished.");
+                    stopGrid(NEW_GRID_NAME);
                 }
-            }
-        }, threadCnt, "test-thread");
-
-        while (!fut.isDone()) {
-            grid(0).compute().call(new IgniteCallable<Object>() {
-                    /** */
-                    @IgniteInstanceResource
-                    private Ignite g;
-
-                    @Override public Object call() throws Exception {
-                        IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
 
-                        assert seq != null;
-
-                        for (int i = 0; i < 1000; i++)
-                            seq.getAndIncrement();
+                return null;
+            }
+        }).get();
 
-                        return null;
-                    }
-                });
-        }
+        waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()]));
 
-        fut.get();
+        assertEquals(val + 1, grid(0).atomicLong(STRUCTURE_NAME, val, false).get());
     }
 
     /**
-     * @throws Exception If failed.
+     *
      */
-    public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
-        try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
+    private class ConstantTopologyChangeWorker {
+        /** */
+        protected final AtomicBoolean failed = new AtomicBoolean(false);
+
+        /**
+         * Starts changing cluster's topology.
+         *
+         * @return Future.
+         */
+        IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
             IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
                 @Override public void apply() {
                     try {
                         for (int i = 0; i < TOP_CHANGE_CNT; i++) {
-                            Collection<String> names = new GridLeanSet<>(3);
-
-                            try {
-                                for (int j = 0; j < 3; j++) {
-                                    String name = UUID.randomUUID().toString();
+                            if (failed.get())
+                                return;
 
-                                    names.add(name);
+                            String name = UUID.randomUUID().toString();
 
-                                    Ignite g = startGrid(name);
+                            try {
+                                Ignite g = startGrid(name);
 
-                                    assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
-                                }
+                                callback.apply(g);
                             }
                             finally {
                                 if (i != TOP_CHANGE_CNT - 1)
-                                    for (String name : names)
-                                        stopGrid(name);
+                                    stopGrid(name);
                             }
                         }
                     }
                     catch (Exception e) {
+                        failed.set(true);
+
                         throw F.wrap(e);
                     }
                 }
             }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
 
-            long old = s.get();
-
-            while (!fut.isDone()) {
-                assertEquals(old, s.get());
-
-                long val = s.incrementAndGet();
-
-                assertTrue(val > old);
-
-                old = val;
-            }
-
-            fut.get();
+            return fut;
         }
     }
 
     /**
-     * @throws Exception If failed.
+     *
      */
-    public void testUncommitedTxLeave() throws Exception {
-        final int val = 10;
+    private class ConstantMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
+        /**
+         * Starts changing cluster's topology.
+         *
+         * @return Future.
+         */
+        @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+                @Override public void apply() {
+                    try {
+                        for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+                            if (failed.get())
+                                return;
 
-        grid(0).atomicLong(STRUCTURE_NAME, val, true);
+                            Collection<String> names = new GridLeanSet<>(3);
 
-        GridTestUtils.runAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                Ignite g = startGrid(NEW_GRID_NAME);
+                            try {
+                                for (int j = 0; j < 3; j++) {
+                                    if (failed.get())
+                                        return;
 
-                try {
-                    g.transactions().txStart();
+                                    String name = UUID.randomUUID().toString();
 
+                                    Ignite g = startGrid(name);
 
-                    g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1);
+                                    names.add(name);
 
-                    assert g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet() == val + 1;
-                }
-                finally {
-                    stopGrid(NEW_GRID_NAME);
-                }
+                                    callback.apply(g);
+                                }
+                            }
+                            finally {
+                                if (i != TOP_CHANGE_CNT - 1) {
 
-                return null;
-            }
-        }).get();
+                                    for (String name : names)
+                                        stopGrid(name);
+                                }
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        failed.set(true);
 
-        waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()]));
+                        throw F.wrap(e);
+                    }
+                }
+            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
 
-        assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1;
+            return fut;
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
index 86b763a..a9cd470 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
@@ -34,6 +34,4 @@ public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends G
     @Override protected CacheMemoryMode collectionMemoryMode() {
         return OFFHEAP_TIERED;
     }
-
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
index 69de7cd..902ba44 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
@@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
 public class GridCacheReplicatedDataStructuresFailoverSelfTest
     extends GridCacheAbstractDataStructuresFailoverSelfTest {
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-801");
-    }
-
-    /** {@inheritDoc} */
     @Override protected CacheMode collectionCacheMode() {
         return REPLICATED;
     }


Mime
View raw message