ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [36/50] [abbrv] ignite git commit: IGNITE-264 - Fixed affinity topology version handling for nested internal transaction.
Date Thu, 03 Sep 2015 01:03:45 GMT
IGNITE-264 - Fixed affinity topology version handling for nested internal transaction.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d7602975
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d7602975
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d7602975

Branch: refs/heads/ignite-264
Commit: d7602975835ea790a3a03736ce62d0b52c064097
Parents: cc98802
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Wed Aug 26 17:55:53 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Wed Aug 26 17:55:53 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |   2 +-
 .../processors/cache/GridCacheIoManager.java    |   2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   2 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   8 +
 .../distributed/near/GridNearLockFuture.java    |   8 +
 .../near/GridNearOptimisticTxPrepareFuture.java |   8 +
 .../cache/transactions/IgniteTxAdapter.java     |   1 +
 .../cache/transactions/IgniteTxManager.java     |   4 +-
 .../datastructures/DataStructuresProcessor.java | 100 ++++---
 ...gniteAtomicLongChangingTopologySelfTest.java | 283 +++++++++++++++++++
 .../IgniteCacheFailoverTestSuite.java           |   1 +
 11 files changed, 377 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 5f17746..e1a38c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -440,7 +440,7 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return {@code True} if should use system transactions which are isolated from user
transactions.
      */
     public boolean systemTx() {
-        return cacheType == CacheType.UTILITY;
+        return cacheType == CacheType.UTILITY || (cacheType == CacheType.INTERNAL &&
transactional());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0ef190e..55e133e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -603,7 +603,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
                 if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id()))
                     throw new ClusterTopologyCheckedException("Node left grid while sending
message to: " + node.id(), e);
 
-                if (cnt == retryCnt)
+                if (cnt == retryCnt || cctx.kernalContext().isStopping())
                     throw e;
                 else if (log.isDebugEnabled())
                     log.debug("Failed to send message to node (will retry): " + node.id());

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 07ec808..f6bb315 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -320,7 +320,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     public void map() {
         AffinityTopologyVersion topVer = null;
 
-        IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
+        IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(null);
 
         if (tx != null && tx.topologyVersionSnapshot() != null)
             topVer = tx.topologyVersionSnapshot();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 90ca8df..7a2e717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -518,6 +518,14 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         // Obtain the topology version to use.
         AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
+        // If there is another system transaction in progress, use it's topology version
to prevent deadlock.
+        if (topVer == null && tx != null && tx.system()) {
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+            if (tx0 != null)
+                topVer = tx0.topologyVersionSnapshot();
+        }
+
         if (topVer != null && tx != null)
             tx.topologyVersion(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 2815194..daec1ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -657,6 +657,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         // Obtain the topology version to use.
         AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
+        // If there is another system transaction in progress, use it's topology version
to prevent deadlock.
+        if (topVer == null && tx != null && tx.system()) {
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+            if (tx0 != null)
+                topVer = tx0.topologyVersionSnapshot();
+        }
+
         if (topVer != null && tx != null)
             tx.topologyVersion(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e51dcb0..4111c41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -234,6 +234,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         // Obtain the topology version to use.
         AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
+        // If there is another system transaction in progress, use it's topology version
to prevent deadlock.
+        if (topVer == null && tx != null && tx.system()) {
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+            if (tx0 != null)
+                topVer = tx0.topologyVersionSnapshot();
+        }
+
         if (topVer != null) {
             tx.topologyVersion(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index f9b2437..907a251 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -184,6 +184,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new
AtomicReference<>();
 
     /** Topology version. */
+    @GridToStringInclude
     protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
 
     /** Mutex. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4554c6f..c96edd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -646,7 +646,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /**
      * @return Any transaction associated with the current thread.
      */
-    public IgniteInternalTx anyActiveThreadTx() {
+    public IgniteInternalTx anyActiveThreadTx(IgniteInternalTx ignore) {
         long threadId = Thread.currentThread().getId();
 
         IgniteInternalTx tx = threadMap.get(threadId);
@@ -660,7 +660,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
             tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
 
-            if (tx != null && tx.topologyVersionSnapshot() != null)
+            if (tx != null && tx != ignore && tx.topologyVersionSnapshot()
!= null)
                 return tx;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 57b16f0..87c5208 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -445,18 +445,28 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         if (!create)
             return c.applyx();
 
-        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ))
{
-            err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+        while (true) {
+            try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ))
{
+                err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
 
-            if (err != null)
-                throw err;
+                if (err != null)
+                    throw err;
 
-            dataStructure = c.applyx();
+                dataStructure = c.applyx();
 
-            tx.commit();
-        }
+                tx.commit();
+
+                return dataStructure;
+            }
+            catch (ClusterTopologyCheckedException e) {
+                IgniteInternalFuture<?> fut = e.retryReadyFuture();
 
-        return dataStructure;
+                fut.get();
+            }
+            catch (IgniteTxRollbackCheckedException ignore) {
+                // Safe to retry right away.
+            }
+        }
     }
 
     /**
@@ -512,31 +522,39 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         if (err != null)
             throw err;
 
-        T rmvInfo;
+        while (true) {
+            try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ))
{
+                T2<Boolean, IgniteCheckedException> res =
+                    utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
 
-        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ))
{
-            T2<Boolean, IgniteCheckedException> res =
-                utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
+                err = res.get2();
 
-            err = res.get2();
+                if (err != null)
+                    throw err;
 
-            if (err != null)
-                throw err;
+                assert res.get1() != null;
 
-            assert res.get1() != null;
+                boolean exists = res.get1();
 
-            boolean exists = res.get1();
+                if (!exists)
+                    return;
 
-            if (!exists)
-                return;
+                T rmvInfo = c.applyx();
 
-            rmvInfo = c.applyx();
+                tx.commit();
 
-            tx.commit();
-        }
+                if (afterRmv != null && rmvInfo != null)
+                    afterRmv.applyx(rmvInfo);
+            }
+            catch (ClusterTopologyCheckedException e) {
+                IgniteInternalFuture<?> fut = e.retryReadyFuture();
 
-        if (afterRmv != null && rmvInfo != null)
-            afterRmv.applyx(rmvInfo);
+                fut.get();
+            }
+            catch (IgniteTxRollbackCheckedException ignore) {
+                // Safe to retry right away.
+            }
+        }
     }
 
     /**
@@ -906,27 +924,35 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
             return c.applyx(cacheCtx);
         }
 
-        T col;
+        while (true) {
+            try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ))
{
+                T2<String, IgniteCheckedException> res =
+                    utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
 
-        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ))
{
-            T2<String, IgniteCheckedException> res =
-                utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
+                err = res.get2();
 
-            err = res.get2();
+                if (err != null)
+                    throw err;
 
-            if (err != null)
-                throw err;
+                String cacheName = res.get1();
 
-            String cacheName = res.get1();
+                final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
 
-            final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
+                T col = c.applyx(cacheCtx);
 
-            col = c.applyx(cacheCtx);
+                tx.commit();
 
-            tx.commit();
-        }
+                return col;
+            }
+            catch (ClusterTopologyCheckedException e) {
+                IgniteInternalFuture<?> fut = e.retryReadyFuture();
 
-        return col;
+                fut.get();
+            }
+            catch (IgniteTxRollbackCheckedException ignore) {
+                // Safe to retry right away.
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
new file mode 100644
index 0000000..cee54b8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstractTest {
+    /**
+     * Grid count.
+     */
+    private static final int GRID_CNT = 5;
+
+    /**
+     * Restart count.
+     */
+    private static final int RESTART_CNT = 15;
+
+    /**
+     * Atomic long name.
+     */
+    private static final String ATOMIC_LONG_NAME = "test-atomic-long";
+
+    /**
+     * Queue.
+     */
+    private final Queue<Long> queue = new ConcurrentLinkedQueue<>();
+
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        AtomicConfiguration atomicCfg = new AtomicConfiguration();
+        atomicCfg.setCacheMode(CacheMode.PARTITIONED);
+        atomicCfg.setBackups(1);
+
+        cfg.setAtomicConfiguration(atomicCfg);
+
+        return cfg;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        queue.clear();
+    }
+
+    /**
+     *
+     */
+    public void testQueueCreateNodesJoin() throws Exception {
+        CountDownLatch startLatch = new CountDownLatch(GRID_CNT);
+        final AtomicBoolean run = new AtomicBoolean(true);
+
+        Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < GRID_CNT; i++)
+            futs.add(startNodeAndCreaterThread(i, startLatch, run));
+
+        startLatch.await();
+
+        info("All nodes started.");
+
+        Thread.sleep(10_000);
+
+        run.set(false);
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get();
+
+        info("Increments: " + queue.size());
+
+        assert !queue.isEmpty();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncrementConsistency() throws Exception {
+        startGrids(GRID_CNT);
+
+        final AtomicBoolean run = new AtomicBoolean(true);
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
+            /** {@inheritDoc} */
+            @Override
+            public Void call() throws Exception {
+                IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME, 0, true);
+
+                while (run.get())
+                    queue.add(cntr.getAndIncrement());
+
+                return null;
+            }
+        }, 4, "increment-runner");
+
+        for (int i = 0; i < RESTART_CNT; i++) {
+            int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1;
+
+            stopGrid(restartIdx);
+
+            U.sleep(500);
+
+            startGrid(restartIdx);
+        }
+
+        run.set(false);
+
+        fut.get();
+
+        info("Increments: " + queue.size());
+
+        checkQueue();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueClose() throws Exception {
+        startGrids(GRID_CNT);
+
+        int threads = 4;
+
+        final AtomicBoolean run = new AtomicBoolean(true);
+        final AtomicInteger idx = new AtomicInteger();
+        final AtomicReferenceArray<Exception> arr = new AtomicReferenceArray<>(threads);
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
+            /** {@inheritDoc} */
+            @Override
+            public Void call() throws Exception {
+                int base = idx.getAndIncrement();
+
+                try {
+                    int delta = 0;
+
+                    while (run.get()) {
+                        IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME + "-"
+ base + "-" + delta, 0, true);
+
+                        for (int i = 0; i < 5; i++)
+                            queue.add(cntr.getAndIncrement());
+
+                        cntr.close();
+
+                        delta++;
+                    }
+                }
+                catch (Exception e) {
+                    arr.set(base, e);
+
+                    throw e;
+                }
+                finally {
+                    info("RUNNER THREAD IS STOPPING");
+                }
+
+                return null;
+            }
+        }, threads, "increment-runner");
+
+        for (int i = 0; i < RESTART_CNT; i++) {
+            int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1;
+
+            stopGrid(restartIdx);
+
+            U.sleep(500);
+
+            startGrid(restartIdx);
+        }
+
+        run.set(false);
+
+        fut.get();
+
+        for (int i = 0; i < threads; i++) {
+            Exception err = arr.get(i);
+
+            if (err != null)
+                throw err;
+        }
+    }
+
+    /**
+     *
+     */
+    private void checkQueue() {
+        List<Long> list = new ArrayList<>(queue);
+
+        Collections.sort(list);
+
+        boolean failed = false;
+
+        int delta = 0;
+
+        for (int i = 0; i < list.size(); i++) {
+            Long exp = (long)(i + delta);
+
+            Long actual = list.get(i);
+
+            if (!exp.equals(actual)) {
+                failed = true;
+
+                delta++;
+
+                info(">>> Expected " + exp + ", actual " + actual);
+            }
+        }
+
+        assertFalse(failed);
+    }
+
+    /**
+     * @param i Node index.
+     */
+    private IgniteInternalFuture<?> startNodeAndCreaterThread(final int i, final CountDownLatch
startLatch, final AtomicBoolean run)
+        throws Exception {
+        return multithreadedAsync(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Ignite ignite = startGrid(i);
+
+                    startLatch.countDown();
+
+                    while (run.get()) {
+                        IgniteAtomicLong cntr = ignite.atomicLong(ATOMIC_LONG_NAME, 0, true);
+
+                        queue.add(cntr.getAndIncrement());
+                    }
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 1, "grunner-" + i);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d7602975/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index b64471b..42e0f6f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -81,6 +81,7 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheSizeFailoverTest.class);
 
         suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class);
+        suite.addTestSuite(IgniteAtomicLongChangingTopologySelfTest.class);
 
         return suite;
     }


Mime
View raw message