Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6E588177A8 for ; Thu, 9 Apr 2015 17:34:38 +0000 (UTC) Received: (qmail 9423 invoked by uid 500); 9 Apr 2015 17:34:38 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 9352 invoked by uid 500); 9 Apr 2015 17:34:38 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 9334 invoked by uid 99); 9 Apr 2015 17:34:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Apr 2015 17:34:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 09 Apr 2015 17:34:36 +0000 Received: (qmail 7322 invoked by uid 99); 9 Apr 2015 17:34:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Apr 2015 17:34:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CD9DFE000D; Thu, 9 Apr 2015 17:34:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 09 Apr 2015 17:34:14 -0000 Message-Id: In-Reply-To: <4294682043474807a0fe5f496b891a35@git.apache.org> References: <4294682043474807a0fe5f496b891a35@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] incubator-ignite git commit: # ignite-694 X-Virus-Checked: Checked by ClamAV on apache.org # ignite-694 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/327a1086 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/327a1086 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/327a1086 Branch: refs/heads/ignite-692 Commit: 327a1086d60b46c49fe3fbfc73af49c21f1238f4 Parents: 76d80f4 Author: sboikov Authored: Wed Apr 8 12:13:37 2015 +0300 Committer: sboikov Committed: Wed Apr 8 12:13:37 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 44 +++++++++++++------- .../processors/cache/GridCacheMessage.java | 7 ++++ .../GridCachePartitionExchangeManager.java | 12 +++++- .../GridDhtPartitionsAbstractMessage.java | 15 ++++--- 4 files changed, 57 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 09fe2e0..6fefdfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -84,27 +84,43 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { final GridCacheMessage cacheMsg = (GridCacheMessage)msg; - AffinityTopologyVersion locAffVer = cctx.exchange().topologyVersion(); - AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion(); + IgniteInternalFuture fut = null; - if (locAffVer.compareTo(rmtAffVer) < 0) { - if (log.isDebugEnabled()) - log.debug("Received message has higher topology version [msg=" + msg + - ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); + if (cacheMsg.partitionExchangeMessage()) { + long locTopVer = cctx.discovery().topologyVersion(); + long rmtTopVer = cacheMsg.topologyVersion().topologyVersion(); - IgniteInternalFuture topFut = cctx.exchange().affinityReadyFuture(rmtAffVer); + if (locTopVer < rmtTopVer) { + if (log.isDebugEnabled()) + log.debug("Received message has higher topology version [msg=" + msg + + ", locTopVer=" + locTopVer + ", rmtTopVer=" + rmtTopVer + ']'); - if (topFut != null && !topFut.isDone()) { - topFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture t) { - handleMessage(nodeId, cacheMsg); - } - }); + fut = cctx.discovery().topologyFuture(rmtTopVer); + } + } + else { + AffinityTopologyVersion locAffVer = cctx.exchange().readyAffinityVersion(); + AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion(); - return; + if (locAffVer.compareTo(rmtAffVer) < 0) { + if (log.isDebugEnabled()) + log.debug("Received message has higher affinity topology version [msg=" + msg + + ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); + + fut = cctx.exchange().affinityReadyFuture(rmtAffVer); } } + if (fut != null && !fut.isDone()) { + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture t) { + handleMessage(nodeId, cacheMsg); + } + }); + + return; + } + handleMessage(nodeId, cacheMsg); } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index b9bce3e..fefd582 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -86,6 +86,13 @@ public abstract class GridCacheMessage implements Message { } /** + * @return {@code True} if this message is partition exchange message. + */ + public boolean partitionExchangeMessage() { + return false; + } + + /** * @return {@code True} if class loading errors should be ignored, false otherwise. */ public boolean ignoreClassErrors() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index e8e3ea1..0955328 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -372,13 +372,15 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** * @param cacheId Cache ID. + * @return Client partition topology. */ public GridClientPartitionTopology clearClientTopology(int cacheId) { return clientTops.remove(cacheId); } /** - * Gets topology version of last completed partition exchange. + * Gets topology version of last partition exchange, it is possible that last partition exchange + * is not completed yet. * * @return Topology version. */ @@ -390,6 +392,13 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** + * @return Topology version of latest completed partition exchange. + */ + public AffinityTopologyVersion readyAffinityVersion() { + return readyTopVer.get(); + } + + /** * @return Last completed topology future. */ public GridDhtTopologyFuture lastTopologyFuture() { @@ -796,7 +805,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (top != null) updated |= top.update(null, entry.getValue()) != null; - } if (updated) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 6af0072..5a8616d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -46,11 +46,6 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { // No-op. } - /** {@inheritDoc} */ - @Override public boolean allowForStartup() { - return true; - } - /** * @param exchId Exchange ID. * @param lastVer Last version. @@ -60,6 +55,16 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { this.lastVer = lastVer; } + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean partitionExchangeMessage() { + return true; + } + /** * @return Exchange ID. */