Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CF663178C0 for ; Tue, 1 Sep 2015 05:59:11 +0000 (UTC) Received: (qmail 84223 invoked by uid 500); 1 Sep 2015 05:59:11 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 84193 invoked by uid 500); 1 Sep 2015 05:59:11 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 84184 invoked by uid 99); 1 Sep 2015 05:59:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Sep 2015 05:59:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3ADDFDFB8A; Tue, 1 Sep 2015 05:59:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <152e7f40839c4829a625c294f63d12b5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-1334 Fixed concurrent destroyCache/node stop. Check initFut result in GridDhtPartitionsExchangeFuture. Date: Tue, 1 Sep 2015 05:59:11 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/master 2814d0e91 -> afce26996 ignite-1334 Fixed concurrent destroyCache/node stop. Check initFut result in GridDhtPartitionsExchangeFuture. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/afce2699 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/afce2699 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/afce2699 Branch: refs/heads/master Commit: afce2699644c8af1e50eb5ef595ed299734c68e5 Parents: 2814d0e Author: sboikov Authored: Tue Sep 1 08:59:01 2015 +0300 Committer: sboikov Committed: Tue Sep 1 08:59:01 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 27 ++++-- .../GridDhtPartitionsExchangeFuture.java | 94 +++++++++++-------- .../cache/IgniteDynamicCacheAndNodeStop.java | 95 ++++++++++++++++++++ .../testsuites/IgniteCacheTestSuite2.java | 3 +- 4 files changed, 175 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/afce2699/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index bf203b8..c5f8168 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -156,6 +156,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** */ private final Map> caches; + /** Caches stopped from onKernalStop callback. */ + private final Map stoppedCaches = new ConcurrentHashMap<>(); + /** Map of proxies. */ private final Map> jCacheProxies; @@ -893,14 +896,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { @SuppressWarnings("unchecked") @Override public void stop(boolean cancel) throws IgniteCheckedException { for (String cacheName : stopSeq) { - GridCacheAdapter cache = caches.remove(maskNull(cacheName)); + GridCacheAdapter cache = stoppedCaches.remove(maskNull(cacheName)); if (cache != null) stopCache(cache, cancel); } - for (GridCacheAdapter cache : caches.values()) - stopCache(cache, cancel); + for (GridCacheAdapter cache : stoppedCaches.values()) { + if (cache == stoppedCaches.remove(maskNull(cache.name()))) + stopCache(cache, cancel); + } List> mgrs = sharedCtx.managers(); @@ -932,15 +937,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheStartedLatch.countDown(); for (String cacheName : stopSeq) { - GridCacheAdapter cache = caches.get(maskNull(cacheName)); + GridCacheAdapter cache = caches.remove(maskNull(cacheName)); + + if (cache != null) { + stoppedCaches.put(maskNull(cacheName), cache); - if (cache != null) onKernalStop(cache, cancel); + } } for (Map.Entry> entry : caches.entrySet()) { - if (!stopSeq.contains(entry.getKey())) + GridCacheAdapter cache = entry.getValue(); + + if (cache == caches.remove(entry.getKey())) { + stoppedCaches.put(entry.getKey(), cache); + onKernalStop(entry.getValue(), cancel); + } } List> sharedMgrs = sharedCtx.managers(); @@ -3457,4 +3470,4 @@ public class GridCacheProcessor extends GridProcessorAdapter { // No-op. } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/afce2699/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 414a152..865bbdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1146,52 +1146,54 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter>() { - @Override public void apply(IgniteInternalFuture t) { + @Override public void apply(IgniteInternalFuture f) { try { - if (!t.get()) // Just to check if there was an error. + if (!f.get()) return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); - ClusterNode loc = cctx.localNode(); + return; + } - singleMsgs.put(nodeId, msg); + ClusterNode loc = cctx.localNode(); - boolean match = true; + singleMsgs.put(nodeId, msg); - // Check if oldest node has changed. - if (!oldestNode.get().equals(loc)) { - match = false; + boolean match = true; - synchronized (mux) { - // Double check. - if (oldestNode.get().equals(loc)) - match = true; - } + // Check if oldest node has changed. + if (!oldestNode.get().equals(loc)) { + match = false; + + synchronized (mux) { + // Double check. + if (oldestNode.get().equals(loc)) + match = true; } + } - if (match) { - boolean allReceived; + if (match) { + boolean allReceived; - synchronized (rcvdIds) { - if (rcvdIds.add(nodeId)) - updatePartitionSingleMap(msg); + synchronized (rcvdIds) { + if (rcvdIds.add(nodeId)) + updatePartitionSingleMap(msg); - allReceived = allReceived(); - } + allReceived = allReceived(); + } - // If got all replies, and initialization finished, and reply has not been sent yet. - if (allReceived && ready.get() && replied.compareAndSet(false, true)) { - spreadPartitions(); + // If got all replies, and initialization finished, and reply has not been sent yet. + if (allReceived && ready.get() && replied.compareAndSet(false, true)) { + spreadPartitions(); - onDone(exchId.topologyVersion()); - } - else if (log.isDebugEnabled()) - log.debug("Exchange future full map is not sent [allReceived=" + allReceived() + - ", ready=" + ready + ", replied=" + replied.get() + ", init=" + init.get() + - ", fut=" + this + ']'); + onDone(exchId.topologyVersion()); } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize exchange future: " + this, e); + else if (log.isDebugEnabled()) + log.debug("Exchange future full map is not sent [allReceived=" + allReceived() + + ", ready=" + ready + ", replied=" + replied.get() + ", init=" + init.get() + + ", fut=" + this + ']'); } } }); @@ -1254,7 +1256,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter>() { - @Override public void apply(IgniteInternalFuture t) { + @Override public void apply(IgniteInternalFuture f) { + try { + if (!f.get()) + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); + + return; + } + ClusterNode curOldest = oldestNode.get(); if (!nodeId.equals(curOldest.id())) { @@ -1343,8 +1355,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter>() { - @Override public void apply(IgniteInternalFuture f) { + initFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture f) { + try { + if (!f.get()) + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); + + return; + } + if (isDone()) return; @@ -1571,4 +1593,4 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter fut1 = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + barrier.await(); + + ignite.destroyCache(null); + + return null; + } + }); + + IgniteInternalFuture fut2 = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + barrier.await(); + + stopGrid(1); + + return null; + } + }); + + fut1.get(); + fut2.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/afce2699/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index e903115..9b9bbba 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -238,7 +238,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class)); suite.addTest(new TestSuite(GridCacheNearTxForceKeyTest.class)); suite.addTest(new TestSuite(CrossCacheTxRandomOperationsTest.class)); + suite.addTest(new TestSuite(IgniteDynamicCacheAndNodeStop.class)); return suite; } -} \ No newline at end of file +}