Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 14C74200CD3 for ; Fri, 28 Jul 2017 14:07:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1318F16CA1A; Fri, 28 Jul 2017 12:07:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BAB4316C8BD for ; Fri, 28 Jul 2017 14:07:16 +0200 (CEST) Received: (qmail 34675 invoked by uid 500); 28 Jul 2017 12:07:15 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 34474 invoked by uid 99); 28 Jul 2017 12:07:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Jul 2017 12:07:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0DBB2F3324; Fri, 28 Jul 2017 12:07:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.apache.org Date: Fri, 28 Jul 2017 12:07:51 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] ignite git commit: ignite-5682 Added stale version check for GridDhtPartFullMessage not related to exchange. archived-at: Fri, 28 Jul 2017 12:07:20 -0000 ignite-5682 Added stale version check for GridDhtPartFullMessage not related to exchange. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb9d06d9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb9d06d9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb9d06d9 Branch: refs/heads/ignite-5658 Commit: eb9d06d9ff89388318cc3a857276c154675bc6f1 Parents: 3a3650f Author: Dmitry Pavlov Authored: Thu Jul 27 14:51:25 2017 +0300 Committer: sboikov Committed: Thu Jul 27 14:51:25 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 5 +- .../processors/cache/GridCacheIoManager.java | 2 +- .../GridCachePartitionExchangeManager.java | 44 ++++++++++----- .../dht/GridClientPartitionTopology.java | 12 +++- .../dht/GridDhtPartitionTopology.java | 9 ++- .../dht/GridDhtPartitionTopologyImpl.java | 34 +++++++----- .../preloader/GridDhtPartitionExchangeId.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 36 ++++++++---- .../preloader/GridDhtPartitionsFullMessage.java | 4 +- ...cingDelayedPartitionMapExchangeSelfTest.java | 58 ++++++++++++++++---- .../junits/common/GridCommonAbstractTest.java | 6 +- 11 files changed, 151 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 0f46a90..5a7f634 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -442,7 +442,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap grp.topology().update(topVer, clientTop.partitionMap(true), clientTop.updateCounters(false), - Collections.emptySet()); + Collections.emptySet(), + null); } grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); @@ -504,7 +505,7 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); - grp.topology().update(topVer, partMap, null, Collections.emptySet()); + grp.topology().update(topVer, partMap, null, Collections.emptySet(), null); topFut.validate(grp, discoCache.allNodes()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 9f1873e..981c6e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -533,7 +533,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** - * @param nodeId Node ID. + * @param nodeId Sender Node ID. * @param cacheMsg Cache message. * @param c Handler closure. * @param plc Message policy. http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d4fe93f..6a7258f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -129,7 +129,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana private static final int EXCHANGE_HISTORY_SIZE = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000); - /** Atomic reference for pending timeout object. */ + /** Atomic reference for pending partition resend timeout object. */ private AtomicReference pendingResend = new AtomicReference<>(); /** Partition resend timeout after eviction. */ @@ -150,7 +150,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana private final ConcurrentMap clientTops = new ConcurrentHashMap8<>(); /** */ - private volatile GridDhtPartitionsExchangeFuture lastInitializedFut; + @Nullable private volatile GridDhtPartitionsExchangeFuture lastInitializedFut; /** */ private final AtomicReference lastFinishedFut = new AtomicReference<>(); @@ -877,6 +877,8 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** * Partition refresh callback. + * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send, + * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send */ private void refreshPartitions() { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); @@ -914,7 +916,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); - sendAllPartitions(rmts); + sendAllPartitions(rmts, rmtTopVer); } else { if (log.isDebugEnabled()) @@ -927,10 +929,14 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** * @param nodes Nodes. + * @param msgTopVer Topology version. Will be added to full message. */ - private void sendAllPartitions(Collection nodes) { + private void sendAllPartitions(Collection nodes, + AffinityTopologyVersion msgTopVer) { GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null, null); + m.topologyVersion(msgTopVer); + if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); @@ -956,6 +962,8 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana * finishUnmarshall methods are called). * @param exchId Non-null exchange ID if message is created for exchange. * @param lastVer Last version. + * @param partHistSuppliers + * @param partsToReload * @return Message. */ public GridDhtPartitionsFullMessage createPartitionsFullMessage( @@ -1064,8 +1072,8 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** - * @param node Node. - * @param id ID. + * @param node Destination cluster node. + * @param id Exchange ID. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node, @@ -1091,7 +1099,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** * @param targetNode Target node. - * @param exchangeId ID. + * @param exchangeId Exchange ID. * @param clientOnlyExchange Client exchange flag. * @param sndCounters {@code True} if need send partition update counters. * @return Message. @@ -1297,7 +1305,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** - * @param node Node. + * @param node Sender cluster node. * @param msg Message. */ private void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMessage msg) { @@ -1323,8 +1331,13 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana else if (!grp.isLocal()) top = grp.topology(); - if (top != null) - updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId)); + if (top != null) { + updated |= top.update(null, + entry.getValue(), + null, + msg.partsToReload(cctx.localNodeId(), grpId), + msg.topologyVersion()); + } } if (!cctx.kernalContext().clientNode() && updated) @@ -1352,7 +1365,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** - * @param node Node ID. + * @param node Sender cluster node. * @param msg Message. */ private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { @@ -1418,7 +1431,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** - * @param node Node ID. + * @param node Sender cluster node. * @param msg Message. */ private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) { @@ -2250,7 +2263,10 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** */ private static final long serialVersionUID = 0L; - /** {@inheritDoc} */ + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ @Override public void apply(UUID nodeId, M msg) { ClusterNode node = cctx.node(nodeId); @@ -2268,7 +2284,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** - * @param node Node. + * @param node Sender cluster node. * @param msg Message. */ protected abstract void onMessage(ClusterNode node, M msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 7343dba..4b9826e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -594,8 +594,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, Map> cntrMap, - Set partsToReload - ) { + Set partsToReload, + @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); @@ -610,6 +610,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { return false; } + if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer) > 0) { + if (log.isDebugEnabled()) + log.debug("Stale topology version for full partition map update message (will ignore) " + + "[lastExchId=" + lastExchangeVer + ", topVersion=" + msgTopVer + ']'); + + return false; + } + boolean fullMapUpdated = (node2part == null); if (node2part != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d9e04a6..81d92e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -243,16 +243,21 @@ public interface GridDhtPartitionTopology { public void onRemoved(GridDhtCacheEntry e); /** - * @param exchangeVer Exchange version. + * @param exchangeVer Topology version from exchange. Value should be greater than previously passed. Null value + * means full map received is not related to exchange * @param partMap Update partition map. * @param cntrMap Partition update counters. + * @param partsToReload + * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not + * related to exchange. Value should be not less than previous 'Topology version from exchange'. * @return {@code True} if local state was changed. */ public boolean update( @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable Map> cntrMap, - Set partsToReload); + Set partsToReload, + @Nullable AffinityTopologyVersion msgTopVer); /** * @param exchId Exchange ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 842501e..a8e13a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -440,7 +440,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent()) removeNode(exchId.nodeId()); - + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (log.isDebugEnabled()) { @@ -1099,9 +1099,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Override public boolean update( @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, - @Nullable Map> cntrMap, - Set partsToReload - ) { + @Nullable Map> incomeCntrMap, + Set partsToReload, + @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); @@ -1113,12 +1113,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return false; - if (cntrMap != null) { + if (incomeCntrMap != null) { // update local map partition counters - for (Map.Entry> e : cntrMap.entrySet()) { - T2 cntr = this.cntrMap.get(e.getKey()); + for (Map.Entry> e : incomeCntrMap.entrySet()) { + T2 existCntr = this.cntrMap.get(e.getKey()); - if (cntr == null || cntr.get2() < e.getValue().get2()) + if (existCntr == null || existCntr.get2() < e.getValue().get2()) this.cntrMap.put(e.getKey(), e.getValue()); } @@ -1129,7 +1129,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part == null) continue; - T2 cntr = cntrMap.get(part.id()); + T2 cntr = incomeCntrMap.get(part.id()); if (cntr != null) part.updateCounter(cntr.get2()); @@ -1144,6 +1144,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return false; } + if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer) > 0) { + if (log.isDebugEnabled()) + log.debug("Stale version for full partition map update message (will ignore) [lastExch=" + + lastExchangeVer + ", topVersion=" + msgTopVer + ']'); + + return false; + } + boolean fullMapUpdated = (node2part == null); if (node2part != null) { @@ -1244,8 +1252,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert locPart != null; - if (cntrMap != null) { - T2 cntr = cntrMap.get(p); + if (incomeCntrMap != null) { + T2 cntr = incomeCntrMap.get(p); if (cntr != null && cntr.get2() > locPart.updateCounter()) locPart.updateCounter(cntr.get2()); @@ -1271,8 +1279,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { changed = true; } - if (cntrMap != null) { - T2 cntr = cntrMap.get(p); + if (incomeCntrMap != null) { + T2 cntr = incomeCntrMap.get(p); if (cntr != null && cntr.get2() > locPart.updateCounter()) locPart.updateCounter(cntr.get2()); http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java index 0a49415..1a4dabf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java @@ -51,7 +51,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable remaining = new HashSet<>(); - /** */ + /** Guarded by this */ @GridToStringExclude private int pendingSingleUpdates; @@ -154,7 +154,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private final CountDownLatch evtLatch = new CountDownLatch(1); - /** */ + /** Exchange future init method completes this future. */ private GridFutureAdapter initFut; /** */ @@ -196,7 +196,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** Init timestamp. Used to track the amount of time spent to complete the future. */ private long initTs; - /** */ + /** + * Centralized affinity assignment required. Activated for node left of failed. For this mode crd will send full + * partitions maps to nodes using discovery (ring) instead of communication. + */ private boolean centralizedAff; /** Change global state exception. */ @@ -613,7 +616,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte top.update(topologyVersion(), clientTop.partitionMap(true), clientTop.updateCounters(false), - Collections.emptySet()); + Collections.emptySet(), + null); } } @@ -1092,7 +1096,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @param node Node. + * @param node Target Node. * @throws IgniteCheckedException If failed. */ private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { @@ -1191,7 +1195,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @param oldestNode Oldest node. + * @param oldestNode Oldest node. Target node to send message to. */ private void sendPartitions(ClusterNode oldestNode) { try { @@ -1368,6 +1372,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Processing of received single message. Actual processing in future may be delayed if init method was not + * completed, see {@link #initDone()} + * * @param node Sender node. * @param msg Single partition info. */ @@ -1409,11 +1416,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the + * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section. + * * @param node Sender node. - * @param msg Message. + * @param msg Partition single message. */ private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { - boolean allReceived = false; + boolean allReceived = false; // Received all expected messages. boolean updateSingleMap = false; synchronized (this) { @@ -1895,7 +1905,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param node Sender node. - * @param msg Message. + * @param msg Message with full partition info. */ private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { assert exchId.equals(msg.exchangeId()) : msg; @@ -1953,7 +1963,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte grp.topology().update(topologyVersion(), entry.getValue(), cntrMap, - msg.partsToReload(cctx.localNodeId(), grpId)); + msg.partsToReload(cctx.localNodeId(), grpId), + null); } else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); @@ -1962,7 +1973,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.exchange().clientTopology(grpId, this).update(topologyVersion(), entry.getValue(), cntrMap, - Collections.emptySet()); + Collections.emptySet(), + null); } } } @@ -2054,7 +2066,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * + * Moves exchange future to state 'init done' using {@link #initFut}. */ private void initDone() { while (!isDone()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 75609b8..acc4dbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -109,7 +109,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param id Exchange ID. * @param lastVer Last version. - * @param topVer Topology version. + * @param topVer Topology version. For messages not related to exchange may be {@link AffinityTopologyVersion#NONE}. + * @param partHistSuppliers + * @param partsToReload */ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java index dc141db..f307b6a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java @@ -18,14 +18,17 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -45,12 +48,19 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri /** */ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** */ + /** Map of destination node ID to runnable with logic for real message sending. + * To apply real message sending use run method */ private final ConcurrentHashMap8 rs = new ConcurrentHashMap8<>(); - /** */ + /** + * Flag to redirect {@link GridDhtPartitionsFullMessage}s from real communication channel to {@link #rs} map. + * Applied only to messages not related to particular exchange + */ private volatile boolean record = false; + /** */ + private AtomicBoolean replay = new AtomicBoolean(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration iCfg = super.getConfiguration(igniteInstanceName); @@ -74,13 +84,26 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri final IgniteInClosure ackC) throws IgniteSpiException { final Object msg0 = ((GridIoMessage)msg).message(); + if (log.isDebugEnabled()) + log.debug("Message [thread=" + Thread.currentThread().getName() + ", msg=" + msg0 + ']'); + if (msg0 instanceof GridDhtPartitionsFullMessage && record && - ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) { - rs.putIfAbsent(node.id(), new Runnable() { + ((GridDhtPartitionsAbstractMessage)msg0).exchangeId() == null) { + if (log.isDebugEnabled()) + log.debug("Record message [toNode=" + node.id() + ", msg=" + msg + "]"); + + assert !replay.get() : "Record of message is not allowed after replay"; + + Runnable prevValue = rs.putIfAbsent(node.id(), new Runnable() { @Override public void run() { + if (log.isDebugEnabled()) + log.debug("Replay: " + msg); + DelayableCommunicationSpi.super.sendMessage(node, msg, ackC); } }); + + assert prevValue == null : "Duplicate message registered to [" + node.id() + "]"; } else try { @@ -94,10 +117,10 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri } /** - * @throws Exception e. + * @throws Exception e if failed. */ public void test() throws Exception { - IgniteKernal ignite = (IgniteKernal)startGrid(0); + IgniteEx ignite = startGrid(0); CacheConfiguration cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); @@ -144,10 +167,7 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri awaitPartitionMapExchange(); - for (Runnable r : rs.values()) - r.run(); - - U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. + replayMessages(); stopGrid(3); // Forces exchange at all nodes and cause assertion failure in case obsolete partition map accepted. @@ -167,6 +187,22 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri assert grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer2; } + /** + * Replays all saved messages from map, actual sent is performed. + * + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private void replayMessages() throws IgniteInterruptedCheckedException { + record = false; + + for (Runnable r : rs.values()) + r.run(); // Causes real messages sending. + + assertTrue(replay.compareAndSet(false, true)); + + U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index dc7e89d..c2cf41c 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -520,7 +520,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param waitEvicts If {@code true} will wait for evictions finished. * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. - * @param nodes Optional nodes. + * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will + * be filtered * @throws InterruptedException If interrupted. */ @SuppressWarnings("BusyWait") @@ -542,7 +543,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param waitEvicts If {@code true} will wait for evictions finished. * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. - * @param nodes Optional nodes. + * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will + * be filtered * @param printPartState If {@code true} will print partition state if evictions not happened. * @throws InterruptedException If interrupted. */