Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A1AC317E26 for ; Tue, 17 Nov 2015 13:34:41 +0000 (UTC) Received: (qmail 98539 invoked by uid 500); 17 Nov 2015 13:34:41 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 98403 invoked by uid 500); 17 Nov 2015 13:34:41 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 97932 invoked by uid 99); 17 Nov 2015 13:34:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Nov 2015 13:34:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9AC86E03E4; Tue, 17 Nov 2015 13:34:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Tue, 17 Nov 2015 13:34:59 -0000 Message-Id: In-Reply-To: <9a025b701a88452c9409d225d860db8d@git.apache.org> References: <9a025b701a88452c9409d225d860db8d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/46] ignite git commit: Ignite-1093 Logging & Backward compatibility failover fixes. Ignite-1093 Logging & Backward compatibility failover fixes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/67f88584 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/67f88584 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/67f88584 Branch: refs/heads/ignite-1753-1282 Commit: 67f88584a4ab330bbda956b3d0d830468d28920f Parents: 37cafb6 Author: Anton Vinogradov Authored: Tue Nov 10 16:14:15 2015 +0300 Committer: Anton Vinogradov Committed: Tue Nov 10 16:14:15 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 34 +++++++------------- .../dht/preloader/GridDhtPartitionDemander.java | 25 ++++++++++++-- 2 files changed, 34 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/67f88584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 479a0b6..5b4fee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -617,13 +617,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** - * @return {@code True} if topology has changed. - */ - public boolean topologyChanged() { - return exchWorker.topologyChanged(); - } - - /** * @param exchFut Exchange future. * @param reassign Dummy reassign flag. */ @@ -673,7 +666,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); - Collection rmts = null; + Collection rmts; // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -1362,7 +1355,9 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (marshR != null || !rebalanceQ.isEmpty()) { if (futQ.isEmpty()) { - U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]"); + U.log(log, "Rebalancing required" + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); if (marshR != null) try { @@ -1404,13 +1399,15 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } }, /*system pool*/ true); } - else { - U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); - } - } - else { - U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); + else + U.log(log, "Skipping rebalancing (obsolete exchange ID) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); } + else + U.log(log, "Skipping rebalancing (nothing scheduled) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); } } catch (IgniteInterruptedCheckedException e) { @@ -1425,13 +1422,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } } } - - /** - * @return {@code True} if another exchange future has been queued up. - */ - boolean topologyChanged() { - return !futQ.isEmpty() || busy; - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/67f88584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 29ca5f4..40d3dc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -114,6 +114,10 @@ public class GridDhtPartitionDemander { @Deprecated//Backward compatibility. To be removed in future. private final AtomicInteger dmIdx = new AtomicInteger(); + /** DemandWorker. */ + @Deprecated//Backward compatibility. To be removed in future. + private volatile DemandWorker worker; + /** Cached rebalance topics. */ private final Map rebalanceTopics; @@ -166,6 +170,11 @@ public class GridDhtPartitionDemander { rebalanceFut.onDone(false); } + DemandWorker dw = worker; + + if (dw != null) + dw.cancel(); + lastExchangeFut = null; lastTimeoutObj.set(null); @@ -426,9 +435,9 @@ public class GridDhtPartitionDemander { d.timeout(cctx.config().getRebalanceTimeout()); d.workerId(0);//old api support. - DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); + worker = new DemandWorker(dmIdx.incrementAndGet(), fut); - dw.run(node, d); + worker.run(node, d); } } @@ -1137,6 +1146,13 @@ public class GridDhtPartitionDemander { return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx); } + /** */ + public void cancel() { + msgQ.clear(); + + msgQ.offer(new SupplyMessage(null, null)); + } + /** * @param node Node to demand from. * @param topVer Topology version. @@ -1159,7 +1175,7 @@ public class GridDhtPartitionDemander { d.topic(topic(cntr)); d.workerId(id); - if (topologyChanged(fut)) + if (fut.isDone() || topologyChanged(fut)) return; cctx.io().addOrderedHandler(d.topic(), new CI2() { @@ -1228,6 +1244,9 @@ public class GridDhtPartitionDemander { continue; // While. } + if (s.senderId() == null) + return; // Stopping now. + // Check that message was received from expected node. if (!s.senderId().equals(node.id())) { U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +