ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [12/50] [abbrv] ignite git commit: ignite-1607 Fixes for serializable txs on changing topology
Date Sun, 08 Nov 2015 17:23:08 GMT
ignite-1607 Fixes for serializable txs on changing topology


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11d177f5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11d177f5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11d177f5

Branch: refs/heads/ignite-1702
Commit: 11d177f5d5d2c2066c2d1fa6e28c9b1a4052d6c6
Parents: 6ea3b56
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Nov 2 09:02:29 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Nov 2 09:02:29 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java |  44 +--
 ...arOptimisticSerializableTxPrepareFuture.java |  12 +-
 .../cache/transactions/IgniteTxManager.java     |   4 +-
 .../CacheSerializableTransactionsTest.java      | 313 ++++++++++++-------
 4 files changed, 240 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d806801..61975d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -573,27 +573,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (tx.onePhaseCommit() && tx.commitOnPrepare()) {
             assert last;
 
+            Throwable prepErr = this.err.get();
+
             // Must create prepare response before transaction is committed to grab correct
return value.
-            final GridNearTxPrepareResponse res = createPrepareResponse();
+            final GridNearTxPrepareResponse res = createPrepareResponse(prepErr);
 
             onComplete(res);
 
             if (tx.commitOnPrepare()) {
                 if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
-                    IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() ==
null ?
-                        tx.commitAsync() : tx.rollbackAsync();
+                    IgniteInternalFuture<IgniteInternalTx> fut = null;
 
-                    fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>()
{
-                        @Override public void applyx(IgniteInternalFuture<IgniteInternalTx>
fut) {
-                            try {
-                                if (replied.compareAndSet(false, true))
-                                    sendPrepareResponse(res);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(log, "Failed to send prepare response for transaction:
" + tx, e);
+                    if (prepErr == null)
+                        fut = tx.commitAsync();
+                    else if (!cctx.kernalContext().isStopping())
+                        fut = tx.rollbackAsync();
+
+                    if (fut != null) {
+                        fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>()
{
+                            @Override public void applyx(IgniteInternalFuture<IgniteInternalTx>
fut) {
+                                try {
+                                    if (replied.compareAndSet(false, true))
+                                        sendPrepareResponse(res);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    U.error(log, "Failed to send prepare response for transaction:
" + tx, e);
+                                }
                             }
-                        }
-                    });
+                        });
+                    }
                 }
             }
             else {
@@ -610,7 +618,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         }
         else {
             if (replied.compareAndSet(false, true)) {
-                GridNearTxPrepareResponse res = createPrepareResponse();
+                GridNearTxPrepareResponse res = createPrepareResponse(this.err.get());
 
                 try {
                     sendPrepareResponse(res);
@@ -659,12 +667,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
+     * @param prepErr Error.
      * @return Prepare response.
      */
-    private GridNearTxPrepareResponse createPrepareResponse() {
-        // Send reply back to originating near node.
-        Throwable prepErr = err.get();
-
+    private GridNearTxPrepareResponse createPrepareResponse(@Nullable Throwable prepErr)
{
         assert F.isEmpty(tx.invalidPartitions());
 
         GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
@@ -981,7 +987,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 if (err0 != null) {
                     err.compareAndSet(null, err0);
 
-                    final GridNearTxPrepareResponse res = createPrepareResponse();
+                    final GridNearTxPrepareResponse res = createPrepareResponse(err.get());
 
                     tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>()
{
                         @Override public void apply(IgniteInternalFuture<IgniteInternalTx>
fut) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 47c1d21..5488bb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -809,18 +809,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                                                         remap(res);
                                                     }
                                                     catch (IgniteCheckedException e) {
+                                                        err.compareAndSet(null, e);
+
                                                         onDone(e);
                                                     }
                                                 }
                                             });
                                         }
                                         else {
-                                            ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+                                            ClusterTopologyCheckedException err0 = new ClusterTopologyCheckedException(
                                                 "Cluster topology changed while client transaction
is preparing.");
 
-                                            err.retryReadyFuture(affFut);
+                                            err0.retryReadyFuture(affFut);
+
+                                            err.compareAndSet(null, err0);
 
-                                            onDone(err);
+                                            onDone(err0);
                                         }
                                     }
                                     catch (IgniteCheckedException e) {
@@ -829,6 +833,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                                                 GridNearOptimisticSerializableTxPrepareFuture.this);
                                         }
 
+                                        err.compareAndSet(null, e);
+
                                         onDone(e);
                                     }
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c1e9202..1f51b8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1617,8 +1617,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     {
         for (final IgniteInternalTx tx : txs()) {
             if (nearVer.equals(tx.nearXidVersion())) {
-                TransactionState state = tx.state();
-
                 IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
 
                 if (prepFut != null && !prepFut.isDone()) {
@@ -1648,6 +1646,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     return fut0;
                 }
 
+                TransactionState state = tx.state();
+
                 if (state == PREPARED || state == COMMITTING || state == COMMITTED) {
                     if (--txNum == 0) {
                         if (fut != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 8c135ad..ae64bb4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
@@ -60,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -115,6 +117,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         cfg.setClientMode(client);
 
         cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
@@ -187,7 +191,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 txStreamerLoad(ignite(SRVS), 10_000, cache.getName(), allowOverwrite);
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -290,7 +294,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -348,7 +352,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -414,7 +418,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -495,7 +499,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -562,7 +566,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -609,7 +613,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                     checkValue(key, null, cache.getName());
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -640,7 +644,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -718,7 +722,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -821,7 +825,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -914,7 +918,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1007,7 +1011,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1101,7 +1105,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1187,7 +1191,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1273,7 +1277,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1398,7 +1402,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1523,7 +1527,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1660,7 +1664,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1797,7 +1801,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -1968,7 +1972,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -2118,7 +2122,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                     checkValue(i, rmv ? null : i, cache.getName());
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -2208,7 +2212,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -2267,7 +2271,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -2340,7 +2344,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 checkValue(key2, 2, cache.getName());
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -2389,7 +2393,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             }
         }
         finally {
-            ignite0.destroyCache(cacheName);
+            destroyCache(cacheName);
         }
     }
 
@@ -2475,7 +2479,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             checkValue(key3, key3, cacheName);
         }
         finally {
-            ignite0.destroyCache(cacheName);
+            destroyCache(cacheName);
         }
     }
 
@@ -2555,7 +2559,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             checkValue(key3, key3, cacheName);
         }
         finally {
-            ignite0.destroyCache(cacheName);
+            destroyCache(cacheName);
         }
     }
 
@@ -2817,8 +2821,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             }
         }
         finally {
-            ignite0.destroyCache(CACHE1);
-            ignite0.destroyCache(CACHE2);
+            destroyCache(CACHE1);
+            destroyCache(CACHE2);
         }
     }
 
@@ -2876,7 +2880,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }
             finally {
-                destroyCache(ignite0, ccfg.getName());
+                destroyCache(ccfg.getName());
             }
         }
     }
@@ -2975,30 +2979,118 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                     caches.add(client.<Integer, Integer>cache(cacheName));
             }
 
-            IgniteInternalFuture<?> restartFut = null;
+            IgniteInternalFuture<?> restartFut = restart ? restartFuture(stop, null)
: null;
+
+            for (int i = 0; i < 30; i++) {
+                final AtomicInteger cntr = new AtomicInteger();
+
+                final Integer key = i;
+
+                final AtomicInteger threadIdx = new AtomicInteger();
+
+                final int THREADS = 10;
+
+                final CyclicBarrier barrier = new CyclicBarrier(THREADS);
+
+                GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        int idx = threadIdx.getAndIncrement() % caches.size();
+
+                        IgniteCache<Integer, Integer> cache = caches.get(idx);
+
+                        Ignite ignite = cache.unwrap(Ignite.class);
 
-            if (restart) {
-                restartFut = GridTestUtils.runAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        while (!stop.get()) {
-                            stopGrid(0);
+                        IgniteTransactions txs = ignite.transactions();
 
-                            U.sleep(300);
+                        log.info("Started update thread: " + ignite.name());
+
+                        barrier.await();
 
-                            Ignite ignite = startGrid(0);
+                        for (int i = 0; i < 1000; i++) {
+                            try {
+                                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE))
{
+                                    Integer val = cache.get(key);
 
-                            assertFalse(ignite.configuration().isClientMode());
+                                    cache.put(key, val == null ? 1 : val + 1);
+
+                                    tx.commit();
+                                }
+
+                                cntr.incrementAndGet();
+                            }
+                            catch (TransactionOptimisticException ignore) {
+                                // Retry.
+                            }
+                            catch (IgniteException | CacheException e) {
+                                assertTrue("Unexpected exception [err=" + e + ", cause="
+ e.getCause() + ']',
+                                    restart && X.hasCause(e, ClusterTopologyCheckedException.class));
+                            }
                         }
 
                         return null;
                     }
-                });
+                }, THREADS, "update-thread").get();
+
+                log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']');
+
+                assertTrue(cntr.get() > 0);
+
+                checkValue(key, cntr.get(), cacheName, restart);
             }
 
-            for (int i = 0; i < 30; i++) {
+            stop.set(true);
+
+            if (restartFut != null)
+                restartFut.get();
+        }
+        finally {
+            stop.set(true);
+
+            destroyCache(cacheName);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncrementTxMultipleNodeRestart() throws Exception {
+        incrementTxMultiple(false, false, true);
+    }
+
+    /**
+     * @param nearCache If {@code true} near cache is enabled.
+     * @param store If {@code true} cache store is enabled.
+     * @param restart If {@code true} restarts one node.
+     * @throws Exception If failed.
+     */
+    private void incrementTxMultiple(boolean nearCache, boolean store, final boolean restart)
throws Exception {
+        final Ignite srv = ignite(1);
+
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED,
FULL_SYNC, 1, store, false);
+
+        final List<Ignite> clients = clients();
+
+        final String cacheName = srv.createCache(ccfg).getName();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        try {
+            final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
+
+            for (Ignite client : clients) {
+                if (nearCache)
+                    caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer,
Integer>()));
+                else
+                    caches.add(client.<Integer, Integer>cache(cacheName));
+            }
+
+            IgniteInternalFuture<?> restartFut = restart ? restartFuture(stop, null)
: null;
+
+            for (int i = 0; i < 20; i += 2) {
                 final AtomicInteger cntr = new AtomicInteger();
 
-                final Integer key = i;
+                final Integer key1 = i;
+                final Integer key2 = i + 1;
 
                 final AtomicInteger threadIdx = new AtomicInteger();
 
@@ -3006,6 +3098,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
 
                 final CyclicBarrier barrier = new CyclicBarrier(THREADS);
 
+                final ConcurrentSkipListSet<Integer> vals1 = new ConcurrentSkipListSet<>();
+                final ConcurrentSkipListSet<Integer> vals2 = new ConcurrentSkipListSet<>();
+
                 GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                     @Override public Void call() throws Exception {
                         int idx = threadIdx.getAndIncrement() % caches.size();
@@ -3023,11 +3118,19 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                         for (int i = 0; i < 1000; i++) {
                             try {
                                 try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE))
{
-                                    Integer val = cache.get(key);
+                                    Integer val1 = cache.get(key1);
+                                    Integer val2 = cache.get(key2);
 
-                                    cache.put(key, val == null ? 1 : val + 1);
+                                    Integer newVal1 = val1 == null ? 1 : val1 + 1;
+                                    Integer newVal2 = val2 == null ? 1 : val2 + 1;
+
+                                    cache.put(key1, newVal1);
+                                    cache.put(key2, newVal2);
 
                                     tx.commit();
+
+                                    assertTrue(vals1.add(newVal1));
+                                    assertTrue(vals2.add(newVal2));
                                 }
 
                                 cntr.incrementAndGet();
@@ -3049,7 +3152,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
 
                 assertTrue(cntr.get() > 0);
 
-                checkValue(key, cntr.get(), cacheName, restart);
+                checkValue(key1, cntr.get(), cacheName, restart);
+                checkValue(key2, cntr.get(), cacheName, restart);
             }
 
             stop.set(true);
@@ -3060,7 +3164,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
         finally {
             stop.set(true);
 
-            destroyCache(srv, cacheName);
+            destroyCache(cacheName);
         }
     }
 
@@ -3189,7 +3293,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             }
         }
         finally {
-            ignite0.destroyCache(cacheName);
+            destroyCache(cacheName);
         }
     }
 
@@ -3229,6 +3333,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testAccountTxNodeRestart() throws Exception {
+        accountTx(false, false, false, true, TestMemoryMode.HEAP);
+    }
+
+    /**
      * @param getAll If {@code true} uses getAll/putAll in transaction.
      * @param nearCache If {@code true} near cache is enabled.
      * @param nonSer If {@code true} starts threads executing non-serializable transactions.
@@ -3421,25 +3532,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 }
             }, THREADS, "tx-thread");
 
-            IgniteInternalFuture<?> restartFut = null;
-
-            if (restart) {
-                restartFut = GridTestUtils.runAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        while (!fut.isDone()) {
-                            stopGrid(0);
-
-                            U.sleep(300);
-
-                            Ignite ignite = startGrid(0);
-
-                            assertFalse(ignite.configuration().isClientMode());
-                        }
-
-                        return null;
-                    }
-                });
-            }
+            IgniteInternalFuture<?> restartFut = restart ? restartFuture(null, fut)
: null;
 
             fut.get(testTime + 30_000);
 
@@ -3506,7 +3599,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             }
         }
         finally {
-            srv.destroyCache(cacheName);
+            destroyCache(cacheName);
         }
     }
 
@@ -3558,21 +3651,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                 cacheNames.add(ccfg.getName());
             }
 
-            IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Object>()
{
-                @Override public Object call() throws Exception {
-                    while (!finished.get()) {
-                        stopGrid(0);
-
-                        U.sleep(300);
-
-                        Ignite ignite = startGrid(0);
-
-                        assertFalse(ignite.configuration().isClientMode());
-                    }
-
-                    return null;
-                }
-            });
+            IgniteInternalFuture<?> restartFut = restartFuture(finished, null);
 
             List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
@@ -3653,7 +3732,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             finished.set(true);
 
             for (String cacheName : cacheNames)
-                srv.destroyCache(cacheName);
+                destroyCache(cacheName);
         }
     }
 
@@ -3710,7 +3789,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             }
         }
         finally {
-            ignite.destroyCache(cacheName);
+            destroyCache(cacheName);
         }
     }
 
@@ -3785,27 +3864,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
 
             final AtomicBoolean finished = new AtomicBoolean();
 
-            IgniteInternalFuture<Object> fut = null;
+            IgniteInternalFuture<?> fut = restart ? restartFuture(finished, null) :
null;
 
             try {
-                if (restart) {
-                    fut = GridTestUtils.runAsync(new Callable<Object>() {
-                        @Override public Object call() throws Exception {
-                            while (!finished.get()) {
-                                stopGrid(0);
-
-                                U.sleep(300);
-
-                                Ignite ignite = startGrid(0);
-
-                                assertFalse(ignite.configuration().isClientMode());
-                            }
-
-                            return null;
-                        }
-                    });
-                }
-
                 for (int i = 0; i < 10; i++) {
                     log.info("Iteration: " + i);
 
@@ -3957,7 +4018,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
             }
         }
         finally {
-            destroyCache(srv, cacheName);
+            destroyCache(cacheName);
         }
     }
 
@@ -4152,16 +4213,20 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @param ignite Node.
      * @param cacheName Cache name.
      */
-    private void destroyCache(Ignite ignite, String cacheName) {
+    private void destroyCache(String cacheName) {
         storeMap.clear();
 
-        ignite.destroyCache(cacheName);
+        for (Ignite ignite : G.allGrids()) {
+            try {
+                ignite.destroyCache(cacheName);
+            }
+            catch (IgniteException ignore) {
+                // No-op.                
+            }
 
-        for (Ignite ignite0 : G.allGrids()) {
-            GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite0.configuration().getSwapSpaceSpi();
+            GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite.configuration().getSwapSpaceSpi();
 
             spi.clearAll();
         }
@@ -4220,6 +4285,36 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @param stop Stop flag.
+     * @param fut Future.
+     * @return Restart thread future.
+     */
+    private IgniteInternalFuture<?> restartFuture(final AtomicBoolean stop, final IgniteInternalFuture<?>
fut) {
+        return GridTestUtils.runAsync(new Callable<Object>() {
+            private boolean stop() {
+                if (stop != null)
+                    return stop.get();
+
+                return fut.isDone();
+            }
+
+            @Override public Object call() throws Exception {
+                while (!stop()) {
+                    Ignite ignite = startGrid(SRVS + CLIENTS);
+
+                    assertFalse(ignite.configuration().isClientMode());
+
+                    U.sleep(300);
+
+                    stopGrid(SRVS + CLIENTS);
+                }
+
+                return null;
+            }
+        }, "restart-thread");
+    }
+
+    /**
      *
      */
     private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>>
{


Mime
View raw message