Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9EC7D18DD0 for ; Tue, 10 Nov 2015 09:47:59 +0000 (UTC) Received: (qmail 38338 invoked by uid 500); 10 Nov 2015 09:47:59 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 38269 invoked by uid 500); 10 Nov 2015 09:47:59 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 38201 invoked by uid 99); 10 Nov 2015 09:47:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2015 09:47:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 17FFCDFF6B; Tue, 10 Nov 2015 09:47:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dmagda@apache.org To: commits@ignite.apache.org Date: Tue, 10 Nov 2015 09:48:00 -0000 Message-Id: <745b7b2e626442d49a9d1925f00edce2@git.apache.org> In-Reply-To: <41f8bec000be4b0ca80d3342e01d5fb1@git.apache.org> References: <41f8bec000be4b0ca80d3342e01d5fb1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/50] [abbrv] ignite git commit: ignite-1607 Fixes for serializable txs on changing topology ignite-1607 Fixes for serializable txs on changing topology Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11d177f5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11d177f5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11d177f5 Branch: refs/heads/ignite-801 Commit: 11d177f5d5d2c2066c2d1fa6e28c9b1a4052d6c6 Parents: 6ea3b56 Author: sboikov Authored: Mon Nov 2 09:02:29 2015 +0300 Committer: sboikov Committed: Mon Nov 2 09:02:29 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 44 +-- ...arOptimisticSerializableTxPrepareFuture.java | 12 +- .../cache/transactions/IgniteTxManager.java | 4 +- .../CacheSerializableTransactionsTest.java | 313 ++++++++++++------- 4 files changed, 240 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index d806801..61975d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -573,27 +573,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture fut = this.err.get() == null ? - tx.commitAsync() : tx.rollbackAsync(); + IgniteInternalFuture fut = null; - fut.listen(new CIX1>() { - @Override public void applyx(IgniteInternalFuture fut) { - try { - if (replied.compareAndSet(false, true)) - sendPrepareResponse(res); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + if (prepErr == null) + fut = tx.commitAsync(); + else if (!cctx.kernalContext().isStopping()) + fut = tx.rollbackAsync(); + + if (fut != null) { + fut.listen(new CIX1>() { + @Override public void applyx(IgniteInternalFuture fut) { + try { + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send prepare response for transaction: " + tx, e); + } } - } - }); + }); + } } } else { @@ -610,7 +618,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture>() { @Override public void apply(IgniteInternalFuture fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 47c1d21..5488bb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -809,18 +809,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim remap(res); } catch (IgniteCheckedException e) { + err.compareAndSet(null, e); + onDone(e); } } }); } else { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( + ClusterTopologyCheckedException err0 = new ClusterTopologyCheckedException( "Cluster topology changed while client transaction is preparing."); - err.retryReadyFuture(affFut); + err0.retryReadyFuture(affFut); + + err.compareAndSet(null, err0); - onDone(err); + onDone(err0); } } catch (IgniteCheckedException e) { @@ -829,6 +833,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim GridNearOptimisticSerializableTxPrepareFuture.this); } + err.compareAndSet(null, e); + onDone(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c1e9202..1f51b8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1617,8 +1617,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { { for (final IgniteInternalTx tx : txs()) { if (nearVer.equals(tx.nearXidVersion())) { - TransactionState state = tx.state(); - IgniteInternalFuture prepFut = tx.currentPrepareFuture(); if (prepFut != null && !prepFut.isDone()) { @@ -1648,6 +1646,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return fut0; } + TransactionState state = tx.state(); + if (state == PREPARED || state == COMMITTING || state == COMMITTED) { if (--txNum == 0) { if (fut != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/11d177f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 8c135ad..ae64bb4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; @@ -60,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -115,6 +117,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + cfg.setClientMode(client); cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi()); @@ -187,7 +191,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { txStreamerLoad(ignite(SRVS), 10_000, cache.getName(), allowOverwrite); } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -290,7 +294,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -348,7 +352,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -414,7 +418,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -495,7 +499,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -562,7 +566,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -609,7 +613,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key, null, cache.getName()); } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -640,7 +644,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -718,7 +722,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -821,7 +825,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -914,7 +918,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1007,7 +1011,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1101,7 +1105,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1187,7 +1191,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1273,7 +1277,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1398,7 +1402,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1523,7 +1527,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1660,7 +1664,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1797,7 +1801,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -1968,7 +1972,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -2118,7 +2122,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(i, rmv ? null : i, cache.getName()); } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -2208,7 +2212,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -2267,7 +2271,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -2340,7 +2344,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key2, 2, cache.getName()); } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -2389,7 +2393,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - ignite0.destroyCache(cacheName); + destroyCache(cacheName); } } @@ -2475,7 +2479,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key3, key3, cacheName); } finally { - ignite0.destroyCache(cacheName); + destroyCache(cacheName); } } @@ -2555,7 +2559,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key3, key3, cacheName); } finally { - ignite0.destroyCache(cacheName); + destroyCache(cacheName); } } @@ -2817,8 +2821,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - ignite0.destroyCache(CACHE1); - ignite0.destroyCache(CACHE2); + destroyCache(CACHE1); + destroyCache(CACHE2); } } @@ -2876,7 +2880,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite0, ccfg.getName()); + destroyCache(ccfg.getName()); } } } @@ -2975,30 +2979,118 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { caches.add(client.cache(cacheName)); } - IgniteInternalFuture restartFut = null; + IgniteInternalFuture restartFut = restart ? restartFuture(stop, null) : null; + + for (int i = 0; i < 30; i++) { + final AtomicInteger cntr = new AtomicInteger(); + + final Integer key = i; + + final AtomicInteger threadIdx = new AtomicInteger(); + + final int THREADS = 10; + + final CyclicBarrier barrier = new CyclicBarrier(THREADS); + + GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Void call() throws Exception { + int idx = threadIdx.getAndIncrement() % caches.size(); + + IgniteCache cache = caches.get(idx); + + Ignite ignite = cache.unwrap(Ignite.class); - if (restart) { - restartFut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - while (!stop.get()) { - stopGrid(0); + IgniteTransactions txs = ignite.transactions(); - U.sleep(300); + log.info("Started update thread: " + ignite.name()); + + barrier.await(); - Ignite ignite = startGrid(0); + for (int i = 0; i < 1000; i++) { + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); - assertFalse(ignite.configuration().isClientMode()); + cache.put(key, val == null ? 1 : val + 1); + + tx.commit(); + } + + cntr.incrementAndGet(); + } + catch (TransactionOptimisticException ignore) { + // Retry. + } + catch (IgniteException | CacheException e) { + assertTrue("Unexpected exception [err=" + e + ", cause=" + e.getCause() + ']', + restart && X.hasCause(e, ClusterTopologyCheckedException.class)); + } } return null; } - }); + }, THREADS, "update-thread").get(); + + log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']'); + + assertTrue(cntr.get() > 0); + + checkValue(key, cntr.get(), cacheName, restart); } - for (int i = 0; i < 30; i++) { + stop.set(true); + + if (restartFut != null) + restartFut.get(); + } + finally { + stop.set(true); + + destroyCache(cacheName); + } + } + + /** + * @throws Exception If failed. + */ + public void testIncrementTxMultipleNodeRestart() throws Exception { + incrementTxMultiple(false, false, true); + } + + /** + * @param nearCache If {@code true} near cache is enabled. + * @param store If {@code true} cache store is enabled. + * @param restart If {@code true} restarts one node. + * @throws Exception If failed. + */ + private void incrementTxMultiple(boolean nearCache, boolean store, final boolean restart) throws Exception { + final Ignite srv = ignite(1); + + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, store, false); + + final List clients = clients(); + + final String cacheName = srv.createCache(ccfg).getName(); + + final AtomicBoolean stop = new AtomicBoolean(); + + try { + final List> caches = new ArrayList<>(); + + for (Ignite client : clients) { + if (nearCache) + caches.add(client.createNearCache(cacheName, new NearCacheConfiguration())); + else + caches.add(client.cache(cacheName)); + } + + IgniteInternalFuture restartFut = restart ? restartFuture(stop, null) : null; + + for (int i = 0; i < 20; i += 2) { final AtomicInteger cntr = new AtomicInteger(); - final Integer key = i; + final Integer key1 = i; + final Integer key2 = i + 1; final AtomicInteger threadIdx = new AtomicInteger(); @@ -3006,6 +3098,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { final CyclicBarrier barrier = new CyclicBarrier(THREADS); + final ConcurrentSkipListSet vals1 = new ConcurrentSkipListSet<>(); + final ConcurrentSkipListSet vals2 = new ConcurrentSkipListSet<>(); + GridTestUtils.runMultiThreadedAsync(new Callable() { @Override public Void call() throws Exception { int idx = threadIdx.getAndIncrement() % caches.size(); @@ -3023,11 +3118,19 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { for (int i = 0; i < 1000; i++) { try { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); + Integer val1 = cache.get(key1); + Integer val2 = cache.get(key2); - cache.put(key, val == null ? 1 : val + 1); + Integer newVal1 = val1 == null ? 1 : val1 + 1; + Integer newVal2 = val2 == null ? 1 : val2 + 1; + + cache.put(key1, newVal1); + cache.put(key2, newVal2); tx.commit(); + + assertTrue(vals1.add(newVal1)); + assertTrue(vals2.add(newVal2)); } cntr.incrementAndGet(); @@ -3049,7 +3152,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { assertTrue(cntr.get() > 0); - checkValue(key, cntr.get(), cacheName, restart); + checkValue(key1, cntr.get(), cacheName, restart); + checkValue(key2, cntr.get(), cacheName, restart); } stop.set(true); @@ -3060,7 +3164,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { finally { stop.set(true); - destroyCache(srv, cacheName); + destroyCache(cacheName); } } @@ -3189,7 +3293,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - ignite0.destroyCache(cacheName); + destroyCache(cacheName); } } @@ -3229,6 +3333,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testAccountTxNodeRestart() throws Exception { + accountTx(false, false, false, true, TestMemoryMode.HEAP); + } + + /** * @param getAll If {@code true} uses getAll/putAll in transaction. * @param nearCache If {@code true} near cache is enabled. * @param nonSer If {@code true} starts threads executing non-serializable transactions. @@ -3421,25 +3532,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } }, THREADS, "tx-thread"); - IgniteInternalFuture restartFut = null; - - if (restart) { - restartFut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - while (!fut.isDone()) { - stopGrid(0); - - U.sleep(300); - - Ignite ignite = startGrid(0); - - assertFalse(ignite.configuration().isClientMode()); - } - - return null; - } - }); - } + IgniteInternalFuture restartFut = restart ? restartFuture(null, fut) : null; fut.get(testTime + 30_000); @@ -3506,7 +3599,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - srv.destroyCache(cacheName); + destroyCache(cacheName); } } @@ -3558,21 +3651,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { cacheNames.add(ccfg.getName()); } - IgniteInternalFuture restartFut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - while (!finished.get()) { - stopGrid(0); - - U.sleep(300); - - Ignite ignite = startGrid(0); - - assertFalse(ignite.configuration().isClientMode()); - } - - return null; - } - }); + IgniteInternalFuture restartFut = restartFuture(finished, null); List> futs = new ArrayList<>(); @@ -3653,7 +3732,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { finished.set(true); for (String cacheName : cacheNames) - srv.destroyCache(cacheName); + destroyCache(cacheName); } } @@ -3710,7 +3789,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - ignite.destroyCache(cacheName); + destroyCache(cacheName); } } @@ -3785,27 +3864,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { final AtomicBoolean finished = new AtomicBoolean(); - IgniteInternalFuture fut = null; + IgniteInternalFuture fut = restart ? restartFuture(finished, null) : null; try { - if (restart) { - fut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - while (!finished.get()) { - stopGrid(0); - - U.sleep(300); - - Ignite ignite = startGrid(0); - - assertFalse(ignite.configuration().isClientMode()); - } - - return null; - } - }); - } - for (int i = 0; i < 10; i++) { log.info("Iteration: " + i); @@ -3957,7 +4018,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(srv, cacheName); + destroyCache(cacheName); } } @@ -4152,16 +4213,20 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } /** - * @param ignite Node. * @param cacheName Cache name. */ - private void destroyCache(Ignite ignite, String cacheName) { + private void destroyCache(String cacheName) { storeMap.clear(); - ignite.destroyCache(cacheName); + for (Ignite ignite : G.allGrids()) { + try { + ignite.destroyCache(cacheName); + } + catch (IgniteException ignore) { + // No-op. + } - for (Ignite ignite0 : G.allGrids()) { - GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite0.configuration().getSwapSpaceSpi(); + GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite.configuration().getSwapSpaceSpi(); spi.clearAll(); } @@ -4220,6 +4285,36 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } /** + * @param stop Stop flag. + * @param fut Future. + * @return Restart thread future. + */ + private IgniteInternalFuture restartFuture(final AtomicBoolean stop, final IgniteInternalFuture fut) { + return GridTestUtils.runAsync(new Callable() { + private boolean stop() { + if (stop != null) + return stop.get(); + + return fut.isDone(); + } + + @Override public Object call() throws Exception { + while (!stop()) { + Ignite ignite = startGrid(SRVS + CLIENTS); + + assertFalse(ignite.configuration().isClientMode()); + + U.sleep(300); + + stopGrid(SRVS + CLIENTS); + } + + return null; + } + }, "restart-thread"); + } + + /** * */ private static class TestStoreFactory implements Factory> {