Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 65BB918836 for ; Wed, 11 Nov 2015 07:13:15 +0000 (UTC) Received: (qmail 65441 invoked by uid 500); 11 Nov 2015 07:13:15 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 65396 invoked by uid 500); 11 Nov 2015 07:13:15 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 65381 invoked by uid 99); 11 Nov 2015 07:13:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 07:13:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35D57E024E; Wed, 11 Nov 2015 07:13:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 11 Nov 2015 07:13:15 -0000 Message-Id: <6b26fc1a0c0f4cce820939b06a76df13@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/9] ignite git commit: Ignite-1093 Logging & Backward compatibility failover fixes. Repository: ignite Updated Branches: refs/heads/ignite-1758-debug 45af5a939 -> 70ed06ea1 Ignite-1093 Logging & Backward compatibility failover fixes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/67f88584 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/67f88584 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/67f88584 Branch: refs/heads/ignite-1758-debug Commit: 67f88584a4ab330bbda956b3d0d830468d28920f Parents: 37cafb6 Author: Anton Vinogradov Authored: Tue Nov 10 16:14:15 2015 +0300 Committer: Anton Vinogradov Committed: Tue Nov 10 16:14:15 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 34 +++++++------------- .../dht/preloader/GridDhtPartitionDemander.java | 25 ++++++++++++-- 2 files changed, 34 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/67f88584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 479a0b6..5b4fee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -617,13 +617,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** - * @return {@code True} if topology has changed. - */ - public boolean topologyChanged() { - return exchWorker.topologyChanged(); - } - - /** * @param exchFut Exchange future. * @param reassign Dummy reassign flag. */ @@ -673,7 +666,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); - Collection rmts = null; + Collection rmts; // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -1362,7 +1355,9 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (marshR != null || !rebalanceQ.isEmpty()) { if (futQ.isEmpty()) { - U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]"); + U.log(log, "Rebalancing required" + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); if (marshR != null) try { @@ -1404,13 +1399,15 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } }, /*system pool*/ true); } - else { - U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); - } - } - else { - U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); + else + U.log(log, "Skipping rebalancing (obsolete exchange ID) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); } + else + U.log(log, "Skipping rebalancing (nothing scheduled) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); } } catch (IgniteInterruptedCheckedException e) { @@ -1425,13 +1422,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } } } - - /** - * @return {@code True} if another exchange future has been queued up. - */ - boolean topologyChanged() { - return !futQ.isEmpty() || busy; - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/67f88584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 29ca5f4..40d3dc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -114,6 +114,10 @@ public class GridDhtPartitionDemander { @Deprecated//Backward compatibility. To be removed in future. private final AtomicInteger dmIdx = new AtomicInteger(); + /** DemandWorker. */ + @Deprecated//Backward compatibility. To be removed in future. + private volatile DemandWorker worker; + /** Cached rebalance topics. */ private final Map rebalanceTopics; @@ -166,6 +170,11 @@ public class GridDhtPartitionDemander { rebalanceFut.onDone(false); } + DemandWorker dw = worker; + + if (dw != null) + dw.cancel(); + lastExchangeFut = null; lastTimeoutObj.set(null); @@ -426,9 +435,9 @@ public class GridDhtPartitionDemander { d.timeout(cctx.config().getRebalanceTimeout()); d.workerId(0);//old api support. - DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); + worker = new DemandWorker(dmIdx.incrementAndGet(), fut); - dw.run(node, d); + worker.run(node, d); } } @@ -1137,6 +1146,13 @@ public class GridDhtPartitionDemander { return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx); } + /** */ + public void cancel() { + msgQ.clear(); + + msgQ.offer(new SupplyMessage(null, null)); + } + /** * @param node Node to demand from. * @param topVer Topology version. @@ -1159,7 +1175,7 @@ public class GridDhtPartitionDemander { d.topic(topic(cntr)); d.workerId(id); - if (topologyChanged(fut)) + if (fut.isDone() || topologyChanged(fut)) return; cctx.io().addOrderedHandler(d.topic(), new CI2() { @@ -1228,6 +1244,9 @@ public class GridDhtPartitionDemander { continue; // While. } + if (s.senderId() == null) + return; // Stopping now. + // Check that message was received from expected node. if (!s.senderId().equals(node.id())) { U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +