ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [39/50] [abbrv] ignite git commit: ignite-5682 Added stale version check for GridDhtPartFullMessage not related to exchange.
Date Fri, 28 Jul 2017 12:07:51 GMT
ignite-5682 Added stale version check for GridDhtPartFullMessage not related to exchange.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb9d06d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb9d06d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb9d06d9

Branch: refs/heads/ignite-5658
Commit: eb9d06d9ff89388318cc3a857276c154675bc6f1
Parents: 3a3650f
Author: Dmitry Pavlov <dpavlov.spb@gmail.com>
Authored: Thu Jul 27 14:51:25 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jul 27 14:51:25 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  5 +-
 .../processors/cache/GridCacheIoManager.java    |  2 +-
 .../GridCachePartitionExchangeManager.java      | 44 ++++++++++-----
 .../dht/GridClientPartitionTopology.java        | 12 +++-
 .../dht/GridDhtPartitionTopology.java           |  9 ++-
 .../dht/GridDhtPartitionTopologyImpl.java       | 34 +++++++-----
 .../preloader/GridDhtPartitionExchangeId.java   |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        | 36 ++++++++----
 .../preloader/GridDhtPartitionsFullMessage.java |  4 +-
 ...cingDelayedPartitionMapExchangeSelfTest.java | 58 ++++++++++++++++----
 .../junits/common/GridCommonAbstractTest.java   |  6 +-
 11 files changed, 151 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0f46a90..5a7f634 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -442,7 +442,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                                 grp.topology().update(topVer,
                                     clientTop.partitionMap(true),
                                     clientTop.updateCounters(false),
-                                    Collections.<Integer>emptySet());
+                                    Collections.<Integer>emptySet(),
+                                    null);
                             }
 
                             grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
@@ -504,7 +505,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
 
-                grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet());
+                grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(),
null);
 
                 topFut.validate(grp, discoCache.allNodes());
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 9f1873e..981c6e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -533,7 +533,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
     }
 
     /**
-     * @param nodeId Node ID.
+     * @param nodeId Sender Node ID.
      * @param cacheMsg Cache message.
      * @param c Handler closure.
      * @param plc Message policy.

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d4fe93f..6a7258f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -129,7 +129,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private static final int EXCHANGE_HISTORY_SIZE =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE,
1_000);
 
-    /** Atomic reference for pending timeout object. */
+    /** Atomic reference for pending partition resend timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
 
     /** Partition resend timeout after eviction. */
@@ -150,7 +150,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private final ConcurrentMap<Integer, GridClientPartitionTopology> clientTops =
new ConcurrentHashMap8<>();
 
     /** */
-    private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
+    @Nullable private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
 
     /** */
     private final AtomicReference<GridDhtTopologyFuture> lastFinishedFut = new AtomicReference<>();
@@ -877,6 +877,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * Partition refresh callback.
+     * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
+     * for non coordinator -  {@link GridDhtPartitionsSingleMessage SingleMessages} send
      */
     private void refreshPartitions() {
         ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
@@ -914,7 +916,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             if (log.isDebugEnabled())
                 log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
 
-            sendAllPartitions(rmts);
+            sendAllPartitions(rmts, rmtTopVer);
         }
         else {
             if (log.isDebugEnabled())
@@ -927,10 +929,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * @param nodes Nodes.
+     * @param msgTopVer Topology version. Will be added to full message.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes) {
+    private void sendAllPartitions(Collection<ClusterNode> nodes,
+        AffinityTopologyVersion msgTopVer) {
         GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null,
null);
 
+        m.topologyVersion(msgTopVer);
+
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" +
m + ']');
 
@@ -956,6 +962,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      *     finishUnmarshall methods are called).
      * @param exchId Non-null exchange ID if message is created for exchange.
      * @param lastVer Last version.
+     * @param partHistSuppliers
+     * @param partsToReload
      * @return Message.
      */
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@@ -1064,8 +1072,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node.
-     * @param id ID.
+     * @param node Destination cluster node.
+     * @param id Exchange ID.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId
id) {
         GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
@@ -1091,7 +1099,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * @param targetNode Target node.
-     * @param exchangeId ID.
+     * @param exchangeId Exchange ID.
      * @param clientOnlyExchange Client exchange flag.
      * @param sndCounters {@code True} if need send partition update counters.
      * @return Message.
@@ -1297,7 +1305,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node.
+     * @param node Sender cluster node.
      * @param msg Message.
      */
     private void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMessage
msg) {
@@ -1323,8 +1331,13 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                     else if (!grp.isLocal())
                         top = grp.topology();
 
-                    if (top != null)
-                        updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(),
grpId));
+                    if (top != null) {
+                        updated |= top.update(null,
+                            entry.getValue(),
+                            null,
+                            msg.partsToReload(cctx.localNodeId(), grpId),
+                            msg.topologyVersion());
+                    }
                 }
 
                 if (!cctx.kernalContext().clientNode() && updated)
@@ -1352,7 +1365,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node ID.
+     * @param node Sender cluster node.
      * @param msg Message.
      */
     private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage
msg) {
@@ -1418,7 +1431,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node ID.
+     * @param node Sender cluster node.
      * @param msg Message.
      */
     private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest
msg) {
@@ -2250,7 +2263,10 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** {@inheritDoc} */
+        /**
+         * @param nodeId Sender node ID.
+         * @param msg Message.
+         */
         @Override public void apply(UUID nodeId, M msg) {
             ClusterNode node = cctx.node(nodeId);
 
@@ -2268,7 +2284,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
 
         /**
-         * @param node Node.
+         * @param node Sender cluster node.
          * @param msg Message.
          */
         protected abstract void onMessage(ClusterNode node, M msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 7343dba..4b9826e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -594,8 +594,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         Map<Integer, T2<Long, Long>> cntrMap,
-        Set<Integer> partsToReload
-    ) {
+        Set<Integer> partsToReload,
+        @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts="
+ fullMapString() + ']');
 
@@ -610,6 +610,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
                 return false;
             }
 
+            if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer)
> 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Stale topology version for full partition map update message
(will ignore) " +
+                        "[lastExchId=" + lastExchangeVer + ", topVersion=" + msgTopVer +
']');
+
+                return false;
+            }
+
             boolean fullMapUpdated = (node2part == null);
 
             if (node2part != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d9e04a6..81d92e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -243,16 +243,21 @@ public interface GridDhtPartitionTopology {
     public void onRemoved(GridDhtCacheEntry e);
 
     /**
-     * @param exchangeVer Exchange version.
+     * @param exchangeVer Topology version from exchange. Value should be greater than previously
passed. Null value
+     *      means full map received is not related to exchange
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
+     * @param partsToReload
+     * @param msgTopVer Topology version from incoming message. This value is not null only
for case message is not
+     *      related to exchange. Value should be not less than previous 'Topology version
from exchange'.
      * @return {@code True} if local state was changed.
      */
     public boolean update(
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap,
-        Set<Integer> partsToReload);
+        Set<Integer> partsToReload,
+        @Nullable AffinityTopologyVersion msgTopVer);
 
     /**
      * @param exchId Exchange ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 842501e..a8e13a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -440,7 +440,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
                     if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
                         removeNode(exchId.nodeId());
-    
+
                     ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
                     if (log.isDebugEnabled()) {
@@ -1099,9 +1099,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     @Override public boolean update(
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap,
-        Set<Integer> partsToReload
-    ) {
+        @Nullable Map<Integer, T2<Long, Long>> incomeCntrMap,
+        Set<Integer> partsToReload,
+        @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts="
+ fullMapString() + ']');
 
@@ -1113,12 +1113,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             if (stopping)
                 return false;
 
-            if (cntrMap != null) {
+            if (incomeCntrMap != null) {
                 // update local map partition counters
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet())
{
-                    T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+                for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet())
{
+                    T2<Long, Long> existCntr = this.cntrMap.get(e.getKey());
 
-                    if (cntr == null || cntr.get2() < e.getValue().get2())
+                    if (existCntr == null || existCntr.get2() < e.getValue().get2())
                         this.cntrMap.put(e.getKey(), e.getValue());
                 }
 
@@ -1129,7 +1129,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                     if (part == null)
                         continue;
 
-                    T2<Long, Long> cntr = cntrMap.get(part.id());
+                    T2<Long, Long> cntr = incomeCntrMap.get(part.id());
 
                     if (cntr != null)
                         part.updateCounter(cntr.get2());
@@ -1144,6 +1144,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                 return false;
             }
 
+            if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer)
> 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Stale version for full partition map update message (will
ignore) [lastExch=" +
+                        lastExchangeVer + ", topVersion=" + msgTopVer + ']');
+
+                return false;
+            }
+
             boolean fullMapUpdated = (node2part == null);
 
             if (node2part != null) {
@@ -1244,8 +1252,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
                         assert locPart != null;
 
-                        if (cntrMap != null) {
-                            T2<Long, Long> cntr = cntrMap.get(p);
+                        if (incomeCntrMap != null) {
+                            T2<Long, Long> cntr = incomeCntrMap.get(p);
 
                             if (cntr != null && cntr.get2() > locPart.updateCounter())
                                 locPart.updateCounter(cntr.get2());
@@ -1271,8 +1279,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                             changed = true;
                         }
 
-                        if (cntrMap != null) {
-                            T2<Long, Long> cntr = cntrMap.get(p);
+                        if (incomeCntrMap != null) {
+                            T2<Long, Long> cntr = incomeCntrMap.get(p);
 
                             if (cntr != null && cntr.get2() > locPart.updateCounter())
                                 locPart.updateCounter(cntr.get2());

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index 0a49415..1a4dabf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -51,7 +51,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
     @GridToStringExclude
     private UUID nodeId;
 
-    /** Event. */
+    /** Event type. */
     @GridToStringExclude
     private int evt;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cdb4bb7..71e41b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -127,7 +127,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private final Set<UUID> remaining = new HashSet<>();
 
-    /** */
+    /** Guarded by this */
     @GridToStringExclude
     private int pendingSingleUpdates;
 
@@ -154,7 +154,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private final CountDownLatch evtLatch = new CountDownLatch(1);
 
-    /** */
+    /** Exchange future init method completes this future. */
     private GridFutureAdapter<Boolean> initFut;
 
     /** */
@@ -196,7 +196,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** Init timestamp. Used to track the amount of time spent to complete the future. */
     private long initTs;
 
-    /** */
+    /**
+     * Centralized affinity assignment required. Activated for node left of failed. For this
mode crd will send full
+     * partitions maps to nodes using discovery (ring) instead of communication.
+     */
     private boolean centralizedAff;
 
     /** Change global state exception. */
@@ -613,7 +616,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     top.update(topologyVersion(),
                         clientTop.partitionMap(true),
                         clientTop.updateCounters(false),
-                        Collections.<Integer>emptySet());
+                        Collections.<Integer>emptySet(),
+                        null);
                 }
             }
 
@@ -1092,7 +1096,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param node Node.
+     * @param node Target Node.
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
@@ -1191,7 +1195,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param oldestNode Oldest node.
+     * @param oldestNode Oldest node. Target node to send message to.
      */
     private void sendPartitions(ClusterNode oldestNode) {
         try {
@@ -1368,6 +1372,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Processing of received single message. Actual processing in future may be delayed
if init method was not
+     * completed, see {@link #initDone()}
+     *
      * @param node Sender node.
      * @param msg Single partition info.
      */
@@ -1409,11 +1416,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Note this method performs heavy updatePartitionSingleMap operation, this operation
is moved out from the
+     * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed
under critical section.
+     *
      * @param node Sender node.
-     * @param msg Message.
+     * @param msg Partition single message.
      */
     private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
-        boolean allReceived = false;
+        boolean allReceived = false; // Received all expected messages.
         boolean updateSingleMap = false;
 
         synchronized (this) {
@@ -1895,7 +1905,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @param node Sender node.
-     * @param msg Message.
+     * @param msg Message with full partition info.
      */
     private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
         assert exchId.equals(msg.exchangeId()) : msg;
@@ -1953,7 +1963,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 grp.topology().update(topologyVersion(),
                     entry.getValue(),
                     cntrMap,
-                    msg.partsToReload(cctx.localNodeId(), grpId));
+                    msg.partsToReload(cctx.localNodeId(), grpId),
+                    null);
             }
             else {
                 ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
@@ -1962,7 +1973,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     cctx.exchange().clientTopology(grpId, this).update(topologyVersion(),
                         entry.getValue(),
                         cntrMap,
-                        Collections.<Integer>emptySet());
+                        Collections.<Integer>emptySet(),
+                        null);
                 }
             }
         }
@@ -2054,7 +2066,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     *
+     * Moves exchange future to state 'init done' using {@link #initFut}.
      */
     private void initDone() {
         while (!isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 75609b8..acc4dbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -109,7 +109,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /**
      * @param id Exchange ID.
      * @param lastVer Last version.
-     * @param topVer Topology version.
+     * @param topVer Topology version. For messages not related to exchange may be {@link
AffinityTopologyVersion#NONE}.
+     * @param partHistSuppliers
+     * @param partsToReload
      */
     public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
         @Nullable GridCacheVersion lastVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index dc141db..f307b6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -18,14 +18,17 @@
 package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -45,12 +48,19 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends
Gri
     /** */
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    /** */
+    /** Map of destination node ID to runnable with logic for real message sending.
+     * To apply real message sending use run method */
     private final ConcurrentHashMap8<UUID, Runnable> rs = new ConcurrentHashMap8<>();
 
-    /** */
+    /**
+     * Flag to redirect {@link GridDhtPartitionsFullMessage}s from real communication channel
to {@link #rs} map.
+     * Applied only to messages not related to particular exchange
+     */
     private volatile boolean record = false;
 
+    /** */
+    private AtomicBoolean replay = new AtomicBoolean();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
         IgniteConfiguration iCfg = super.getConfiguration(igniteInstanceName);
@@ -74,13 +84,26 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends
Gri
             final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException
{
             final Object msg0 = ((GridIoMessage)msg).message();
 
+            if (log.isDebugEnabled())
+                log.debug("Message [thread=" + Thread.currentThread().getName() + ", msg="
+ msg0 + ']');
+
             if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
-                ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
-                rs.putIfAbsent(node.id(), new Runnable() {
+                ((GridDhtPartitionsAbstractMessage)msg0).exchangeId() == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Record message [toNode=" + node.id() + ", msg=" + msg + "]");
+
+                assert !replay.get() : "Record of message is not allowed after replay";
+
+                Runnable prevValue = rs.putIfAbsent(node.id(), new Runnable() {
                     @Override public void run() {
+                        if (log.isDebugEnabled())
+                            log.debug("Replay: " + msg);
+
                         DelayableCommunicationSpi.super.sendMessage(node, msg, ackC);
                     }
                 });
+
+                assert prevValue == null : "Duplicate message registered to [" + node.id()
+ "]";
             }
             else
                 try {
@@ -94,10 +117,10 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest
extends Gri
     }
 
     /**
-     * @throws Exception e.
+     * @throws Exception e if failed.
      */
     public void test() throws Exception {
-        IgniteKernal ignite = (IgniteKernal)startGrid(0);
+        IgniteEx ignite = startGrid(0);
 
         CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
@@ -144,10 +167,7 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest
extends Gri
 
         awaitPartitionMapExchange();
 
-        for (Runnable r : rs.values())
-            r.run();
-
-        U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
+        replayMessages();
 
         stopGrid(3); // Forces exchange at all nodes and cause assertion failure in case
obsolete partition map accepted.
 
@@ -167,6 +187,22 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest
extends Gri
         assert grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion()
> topVer2;
     }
 
+    /**
+     * Replays all saved messages from map, actual sent is performed.
+     *
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private void replayMessages() throws IgniteInterruptedCheckedException {
+        record = false;
+
+        for (Runnable r : rs.values())
+            r.run(); // Causes real messages sending.
+
+        assertTrue(replay.compareAndSet(false, true));
+
+        U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index dc7e89d..c2cf41c 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -520,7 +520,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest
{
     /**
      * @param waitEvicts If {@code true} will wait for evictions finished.
      * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update
finished.
-     * @param nodes Optional nodes.
+     * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non
null collection nodes will
+     *      be filtered
      * @throws InterruptedException If interrupted.
      */
     @SuppressWarnings("BusyWait")
@@ -542,7 +543,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest
{
     /**
      * @param waitEvicts If {@code true} will wait for evictions finished.
      * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update
finished.
-     * @param nodes Optional nodes.
+     * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non
null collection nodes will
+     *      be filtered
      * @param printPartState If {@code true} will print partition state if evictions not
happened.
      * @throws InterruptedException If interrupted.
      */


Mime
View raw message