ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5578
Date Mon, 17 Jul 2017 15:17:39 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 8a6583195 -> 3e22eac26


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e22eac2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e22eac2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e22eac2

Branch: refs/heads/ignite-5578
Commit: 3e22eac26873de9da3f10b80afcc7987fed8a180
Parents: 8a65831
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jul 17 11:48:51 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jul 17 18:17:20 2017 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  29 ++
 .../cache/CacheAffinitySharedManager.java       |   6 +-
 .../cache/ExchangeDiscoveryEvents.java          |  24 +-
 .../GridCachePartitionExchangeManager.java      | 107 ++++++-
 .../dht/GridDhtPartitionTopologyImpl.java       |  51 ++--
 .../GridDhtPartitionsAbstractMessage.java       |   4 +
 .../GridDhtPartitionsExchangeFuture.java        | 290 +++++++++++++------
 .../preloader/GridDhtPartitionsFullMessage.java |  12 +-
 .../dht/preloader/GridDhtPreloader.java         |   6 +-
 9 files changed, 377 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a8ac825..1142c8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -360,6 +360,15 @@ public class GridAffinityAssignmentCache {
 
         return aff.assignment();
     }
+    /**
+     * @param topVer Topology version.
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> readyAssignments(AffinityTopologyVersion topVer) {
+        AffinityAssignment aff = readyAffinity(topVer);
+
+        return aff.assignment();
+    }
 
     /**
      * Gets future that will be completed after topology with version {@code topVer} is calculated.
@@ -463,6 +472,26 @@ public class GridAffinityAssignmentCache {
         return false;
     }
 
+    public AffinityAssignment readyAffinity(AffinityTopologyVersion topVer) {
+        AffinityAssignment cache = head.get();
+
+        if (!cache.topologyVersion().equals(topVer)) {
+            cache = affCache.get(topVer);
+
+            if (cache == null) {
+                throw new IllegalStateException("Affinity for topology version is " +
+                    "not initialized [locNode=" + ctx.discovery().localNode().id() +
+                    ", grp=" + cacheOrGrpName +
+                    ", topVer=" + topVer +
+                    ", head=" + head.get().topologyVersion() +
+                    ", history=" + affCache.keySet() +
+                    ']');
+            }
+        }
+
+        return cache;
+    }
+
     /**
      * Get cached affinity for specified topology version.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 4ea61a9..0ef2999 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1259,7 +1259,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     CacheGroupHolder cache = groupHolder(topVer, desc);
 
-                    cache.affinity().calculate(topVer, evts.event(), evts.discoveryCache());
+                    cache.affinity().calculate(topVer, evts.lastEvent(), evts.discoveryCache());
                 }
             });
 
@@ -1726,7 +1726,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (addedOnExchnage) {
             if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) {
                 List<List<ClusterNode>> newAff = aff.calculate(evts.topologyVersion(),
-                    evts.event(),
+                    evts.lastEvent(),
                     evts.discoveryCache());
 
                 aff.initialize(evts.topologyVersion(), newAff);
@@ -1744,7 +1744,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert aff.idealAssignment() != null : "Previous assignment is not available.";
 
-        List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts.event(), evts.discoveryCache());
+        List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index 7d3e256..f1c4bea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -19,17 +19,15 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
-import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
-import static com.sun.corba.se.impl.util.RepositoryId.cache;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -45,7 +43,7 @@ public class ExchangeDiscoveryEvents {
     private DiscoCache discoCache;
 
     /** */
-    private DiscoveryEvent evt;
+    private DiscoveryEvent lastEvt;
 
     /** */
     private List<DiscoveryEvent> evts = new ArrayList<>();
@@ -76,7 +74,7 @@ public class ExchangeDiscoveryEvents {
         evts.add(evt);
 
         this.topVer = topVer;
-        this.evt = evt;
+        this.lastEvt = evt;
         this.discoCache = cache;
 
         ClusterNode node = evt.eventNode();
@@ -92,23 +90,27 @@ public class ExchangeDiscoveryEvents {
         }
     }
 
-    DiscoCache discoveryCache() {
+    public List<DiscoveryEvent> events() {
+        return evts;
+    }
+
+    public DiscoCache discoveryCache() {
         return discoCache;
     }
 
-    DiscoveryEvent event() {
-        return evt;
+    public DiscoveryEvent lastEvent() {
+        return lastEvt;
     }
 
-    AffinityTopologyVersion topologyVersion() {
+    public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
-    boolean serverJoin() {
+    public boolean serverJoin() {
         return srvJoin;
     }
 
-    boolean serverLeft() {
+    public boolean serverLeft() {
         return srvLeft;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 90ea1fe..1db4b09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -1236,14 +1237,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param exchFut Exchange.
+     * @param topVer Topology version.
      * @param err Error.
      */
-    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) {
-        AffinityTopologyVersion topVer = exchFut.topologyVersion();
-
+    public void onExchangeDone(AffinityTopologyVersion topVer, @Nullable Throwable err) {
         if (log.isDebugEnabled())
-            log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+            log.debug("Exchange done [topVer=" + topVer + ", err=" + err + ']');
 
         if (err == null) {
             while (true) {
@@ -1284,7 +1283,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             int skipped = 0;
 
             for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) {
-                if (exchFut.exchangeId().topologyVersion().compareTo(fut.exchangeId().topologyVersion()) < 0)
+                if (topVer.compareTo(fut.exchangeId().topologyVersion()) < 0)
                     continue;
 
                 skipped++;
@@ -1755,11 +1754,43 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         this.exchMergeTestWaitVer = exchMergeTestWaitVer;
     }
 
-    public boolean mergeExchanges(GridDhtPartitionsExchangeFuture curFut) {
+    public void mergeExchanges(GridDhtPartitionsExchangeFuture curFut, AffinityTopologyVersion resVer)
+        throws IgniteInterruptedCheckedException {
+        exchWorker.waitForExchangeFuture(resVer);
+
+        for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+            if (task instanceof GridDhtPartitionsExchangeFuture) {
+                GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture) task;
+
+                if (fut.topologyVersion().compareTo(resVer) > 0)
+                    break;
+
+                log.info("Merge exchange future on finish [curFut=" + curFut.topologyVersion() +
+                    ", mergedFut=" + fut.topologyVersion() + ']');
+
+                curFut.context().events().addEvent(fut.topologyVersion(),
+                    fut.discoveryEvent(),
+                    fut.discoCache());
+
+                exchWorker.futQ.remove(fut);
+            }
+        }
+
+        ExchangeDiscoveryEvents evts = curFut.context().events();
+
+        assert evts.topologyVersion().equals(resVer) : "Invalid exchange merge result [ver=" + evts.topologyVersion()
+            + ", expVer=" + resVer + ']';
+    }
+
+    /**
+     * @param curFut Current active exchange future.
+     * @return {@code False} if need wait messages for merged exchanges.
+     */
+    public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFut) {
         AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;
 
         if (exchMergeTestWaitVer != null) {
-            log.info("Coalesce test, waiting for version [exch=" + curFut.topologyVersion() +
+            log.info("Exchange merge test, waiting for version [exch=" + curFut.topologyVersion() +
                 ", waitVer=" + exchMergeTestWaitVer + ']');
 
             long end = U.currentTimeMillis() + 10_000;
@@ -1772,7 +1803,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
 
                         if (exchMergeTestWaitVer.equals(fut.topologyVersion())) {
-                            log.info("Coalesce test, found awaited version: " + exchMergeTestWaitVer);
+                            log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);
 
                             found = true;
 
@@ -1808,8 +1839,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (!supportsMergeExchanges(node))
                         break;
 
-                    if (evt.type() == EVT_NODE_JOINED && !CU.clientNode(node))
-                        fut.mergeServerJoinExchange(curFut);
+                    log.info("Merge exchange future [curFut=" + curFut.topologyVersion() +
+                        ", mergedFut=" + fut.topologyVersion() + ']');
+
+                    curFut.context().events().addEvent(fut.topologyVersion(),
+                        fut.discoveryEvent(),
+                        fut.discoCache());
+
+                    if (evt.type() == EVT_NODE_JOINED && !CU.clientNode(node)) {
+                        if (fut.mergeServerJoinExchange(curFut))
+                            awaited++;
+                    }
 
                     exchWorker.futQ.remove(fut);
                 }
@@ -1830,6 +1870,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
             new LinkedBlockingDeque<>();
 
+        /** */
+        private AffinityTopologyVersion lastFutVer;
+
         /** Busy flag used as performance optimization to stop current preloading. */
         private volatile boolean busy;
 
@@ -1868,10 +1911,39 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             futQ.offer(exchFut);
 
+            synchronized (this) {
+                lastFutVer = exchFut.topologyVersion();
+
+                notifyAll();
+            }
+
             if (log.isDebugEnabled())
                 log.debug("Added exchange future to exchange worker: " + exchFut);
         }
 
+        private void waitForExchangeFuture(AffinityTopologyVersion resVer) throws IgniteInterruptedCheckedException {
+            synchronized (this) {
+                while (lastFutVer.compareTo(resVer) < 0)
+                    U.wait(this);
+            }
+        }
+
+        private void onExchangeDone(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture exchFut)
+            throws IgniteInterruptedCheckedException {
+            if (resVer.compareTo(exchFut.exchangeId().topologyVersion()) != 0) {
+                waitForExchangeFuture(resVer);
+
+                for (CachePartitionExchangeWorkerTask task : futQ) {
+                    if (task instanceof GridDhtPartitionsExchangeFuture) {
+                        GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task;
+
+                        if (resVer.compareTo(fut0.topologyVersion()) >= 0)
+                            futQ.remove(fut0);
+                    }
+                }
+            }
+        }
+
         /** {@inheritDoc} */
         @Override public void cancel() {
             synchronized (interruptLock) {
@@ -2005,6 +2077,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     GridDhtPartitionsExchangeFuture exchFut = null;
 
+                    AffinityTopologyVersion resVer = null;
+
                     try {
                         if (isCancelled())
                             break;
@@ -2038,7 +2112,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             while (true) {
                                 try {
-                                    exchFut.get(futTimeout, TimeUnit.MILLISECONDS);
+                                    resVer = exchFut.get(futTimeout, TimeUnit.MILLISECONDS);
 
                                     break;
                                 }
@@ -2067,6 +2141,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 }
                             }
 
+                            onExchangeDone(resVer, exchFut);
 
                             if (log.isDebugEnabled())
                                 log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
@@ -2175,7 +2250,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                         if (assignsCancelled) { // Pending exchange.
                             U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                                "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+                                "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
                                 ", node=" + exchId.nodeId() + ']');
                         }
                         else if (r != null) {
@@ -2185,19 +2260,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             if (!hasPendingExchange()) {
                                 U.log(log, "Rebalancing started " +
-                                    "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+                                    "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
                                     ", node=" + exchId.nodeId() + ']');
 
                                 r.run(); // Starts rebalancing routine.
                             }
                             else
                                 U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                                    "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+                                    "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
                                     ", node=" + exchId.nodeId() + ']');
                         }
                         else
                             U.log(log, "Skipping rebalancing (nothing scheduled) " +
-                                "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+                                "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
                                 ", node=" + exchId.nodeId() + ']');
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5ef499c..e477a82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -60,6 +61,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
@@ -297,7 +300,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param updateSeq Update sequence.
      */
     private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
-        List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
+        AffinityTopologyVersion resTopVer = exchFut.context().events().topologyVersion();
+
+        List<List<ClusterNode>> aff = grp.affinity().readyAssignments(resTopVer);
 
         if (grp.affinityNode()) {
             ClusterNode loc = ctx.localNode();
@@ -306,15 +311,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-            assert topVer.equals(exchFut.topologyVersion()) :
-                "Invalid topology [topVer=" + topVer +
-                    ", grp=" + grp.cacheOrGroupName() +
-                    ", futVer=" + exchFut.topologyVersion() +
-                    ", fut=" + exchFut + ']';
-            assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) :
+            assert grp.affinity().lastVersion().equals(resTopVer) :
                 "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
                     ", grp=" + grp.cacheOrGroupName() +
-                    ", futVer=" + exchFut.topologyVersion() +
+                    ", futVer=" + resTopVer +
                     ", fut=" + exchFut + ']';
 
             int num = grp.affinity().partitions();
@@ -433,18 +433,22 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (stopping)
                         return;
 
-                    GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+                    ExchangeDiscoveryEvents evts = exchFut.context().events();
 
-                    assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer +
-                        ", exchId=" + exchId + ']';
+                    assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" + topVer +
+                        ", exchId=" + exchFut.exchangeId() + ']';
 
-                    if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
-                        removeNode(exchId.nodeId());
+                    topVer = evts.topologyVersion();
+
+                    for (DiscoveryEvent evt : evts.events()) {
+                        if ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && !CU.clientNode(evt.eventNode()))
+                            removeNode(evt.eventNode().id());
+                    }
     
                     ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
                     if (log.isDebugEnabled()) {
-                        log.debug("Partition map beforeExchange [exchId=" + exchId +
+                        log.debug("Partition map beforeExchange [exchId=" + exchFut.exchangeId() +
                             ", fullMap=" + fullMapString() + ']');
                     }
 
@@ -461,7 +465,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                             if (log.isDebugEnabled())
                                 log.debug("Created brand new full topology map on oldest node [exchId=" +
-                                    exchId + ", fullMap=" + fullMapString() + ']');
+                                    exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
                         }
                         else if (!node2part.valid()) {
                             node2part = new GridDhtPartitionFullMap(oldest.id(),
@@ -471,7 +475,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 false);
 
                             if (log.isDebugEnabled()) {
-                                log.debug("Created new full topology map on oldest node [exchId=" + exchId +
+                                log.debug("Created new full topology map on oldest node [exchId=" + exchFut.exchangeId() +
                                     ", fullMap=" + node2part + ']');
                             }
                         }
@@ -484,7 +488,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                             if (log.isDebugEnabled()) {
                                 log.debug("Copied old map into new map on oldest node (previous oldest node left) [" +
-                                    "exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
+                                    "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
                             }
                         }
                     }
@@ -504,7 +508,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     consistencyCheck();
 
                     if (log.isDebugEnabled()) {
-                        log.debug("Partition map after beforeExchange [exchId=" + exchId +
+                        log.debug("Partition map after beforeExchange [exchId=" + exchFut.exchangeId() +
                             ", fullMap=" + fullMapString() + ']');
                     }
                 }
@@ -533,7 +537,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         int num = grp.affinity().partitions();
 
-        AffinityTopologyVersion topVer = exchFut.topologyVersion();
+        AffinityTopologyVersion topVer = exchFut.context().events().topologyVersion();
 
         assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " +
             "[topVer=" + topVer +
@@ -546,9 +550,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return false;
 
-            assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" +
-                topVer + ", exchId=" + exchFut.exchangeId() + ']';
-
             if (log.isDebugEnabled())
                 log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" +
                     fullMapString() + ']');
@@ -1113,6 +1114,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return false;
 
+            if (exchangeVer != null) {
+                assert exchangeVer.compareTo(topVer) >= 0 : exchangeVer;
+
+                topVer = exchangeVer;
+            }
+
             if (cntrMap != null) {
                 // update local map partition counters
                 for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 20b33e7..466ec03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -100,6 +100,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
         return exchId;
     }
 
+    public void exchangeId(GridDhtPartitionExchangeId exchId) {
+        this.exchId = exchId;
+    }
+
     /**
      * @param grpId Cache group ID.
      * @return Parition update counters.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 9b2a82b..190a417 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.CacheEvent;
@@ -298,6 +298,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         this.affChangeMsg = affChangeMsg;
     }
 
+    private AffinityTopologyVersion initTopologyVersion() {
+        return exchId.topologyVersion();
+    }
+
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return exchId.topologyVersion();
@@ -520,8 +524,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (exchCtx.canMergeExchanges()) {
                     if (cctx.kernalContext().clientNode() || CU.clientNode(discoEvt.eventNode()))
                         exchange = onClientNodeEvent(crdNode);
-                    else
+                    else {
+                        if (localJoinExchange())
+                            onServerNodeEvent(crdNode);
+
                         exchange = ExchangeType.ALL_2;
+                    }
                 }
                 else {
                     exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
@@ -1235,6 +1243,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
 
+        if (exchCtx.canMergeExchanges())
+            msg.resultTopologyVersion(exchCtx.events().topologyVersion());
+
         GridDhtPartitionsFullMessage msgWithAff = null;
 
         assert !nodes.contains(cctx.localNode());
@@ -1256,8 +1267,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
 
                     if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
-                        if (msgWithAff == null)
-                            msgWithAff = msg.copyWithAffinity(cachesAff);
+                        if (msgWithAff == null) {
+                            msgWithAff = msg.copy();
+
+                            msgWithAff.cachesAffinity(cachesAff);
+                        }
 
                         sndMsg = msgWithAff;
                     }
@@ -1265,6 +1279,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
 
             try {
+                GridDhtPartitionsSingleMessage mergedMsg = mergedJoinExchMsgs.get(node.id());
+
+                if (mergedMsg != null) {
+                    sndMsg = sndMsg.copy();
+
+                    sndMsg.exchangeId(mergedMsg.exchangeId());
+                }
+
                 cctx.io().send(node, sndMsg, SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
@@ -1310,6 +1332,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (!done.compareAndSet(false, true))
             return false;
 
+        log.info("Finish exchange future [startVer=" + topologyVersion() + ", resVer=" + res + ']');
+
+        assert res != null || err != null;
+
         if (err == null &&
             !cctx.kernalContext().clientNode() &&
             (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
@@ -1317,7 +1343,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
                     continue;
 
-                cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion());
+                cacheCtx.continuousQueries().flushBackupQueue(res);
             }
        }
 
@@ -1341,7 +1367,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 if (drCacheCtx.isDrEnabled()) {
                     try {
-                        drCacheCtx.dr().onExchange(topologyVersion(), exchId.isLeft());
+                        drCacheCtx.dr().onExchange(res, exchId.isLeft());
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to notify DR: " + e, e);
@@ -1365,9 +1391,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         tryToPerformLocalSnapshotOperation();
 
-        cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err);
+        cctx.cache().onExchangeDone(res, exchActions, err);
 
-        cctx.exchange().onExchangeDone(this, err);
+        cctx.exchange().onExchangeDone(res, err);
 
         if (exchActions != null && err == null)
             exchActions.completeRequestFutures(cctx);
@@ -1394,7 +1420,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (err == null) {
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (!grp.isLocal())
-                    grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false);
+                    grp.topology().onExchangeDone(grp.affinity().readyAffinity(res), false);
             }
         }
 
@@ -1459,31 +1485,49 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private GridDhtPartitionsExchangeFuture mergedWith;
 
     /** */
-    private GridDhtPartitionsSingleMessage pendingSingleMsg;
+    private GridDhtPartitionsSingleMessage pendingSrvJoinMsg;
 
     /** */
     private Map<ClusterNode, GridDhtPartitionsSingleMessage> pendingClientMsgs;
 
-    private void addMergedJoinExchange(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+    private boolean addMergedJoinExchange(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
         if (mergedJoinExchMsgs == null)
             mergedJoinExchMsgs = new LinkedHashMap<>();
 
-        if (msg != null)
+        boolean wait = false;
+
+        if (msg != null) {
+            log.info("Merge server join exchange, message received [curFut=" + topologyVersion() +
+                ", node=" + nodeId + ']');
+
             mergedJoinExchMsgs.put(nodeId, msg);
+        }
         else {
-            if (cctx.discovery().alive(nodeId))
+            if (cctx.discovery().alive(nodeId)) {
+                log.info("Merge server join exchange, wait for message [curFut=" + topologyVersion() +
+                    ", node=" + nodeId + ']');
+
+                wait = true;
+
                 awaitMergedMsgs++;
-            else
+            }
+            else {
+                log.info("Merge server join exchange, awaited node left [curFut=" + topologyVersion() +
+                    ", node=" + nodeId + ']');
+
                 mergedJoinExchMsgs.put(nodeId, null);
+            }
         }
+
+        return wait;
     }
 
     /**
      * @param fut Current exchange to merge with.
      */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    public void mergeServerJoinExchange(final GridDhtPartitionsExchangeFuture fut) {
-        log.info("Merge exchange future [fut=" + topologyVersion() + ", mergeWith=" + fut.topologyVersion() + ']');
+    public boolean mergeServerJoinExchange(final GridDhtPartitionsExchangeFuture fut) {
+        boolean wait;
 
         synchronized (this) {
             assert !isDone();
@@ -1493,17 +1537,31 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             mergedWith = fut;
 
-            fut.addMergedJoinExchange(discoEvt.eventNode().id(), pendingSingleMsg);
+            wait = fut.addMergedJoinExchange(discoEvt.eventNode().id(), pendingSrvJoinMsg);
 
             // TODO 5578 client messages.
         }
+
+        return wait;
     }
 
+    /**
+     * @param node
+     * @param msg
+     */
     void onReceiveMerged(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
         boolean done = false;
 
         synchronized (this) {
-            if (mergedJoinExchMsgs != null && !mergedJoinExchMsgs.containsKey(node.id())) {
+            boolean process = mergedJoinExchMsgs != null && !mergedJoinExchMsgs.containsKey(node.id());
+
+            log.info("Merge server join exchange, received message [curFut=" + topologyVersion() +
+                ", node=" + node.id() +
+                ", msgVer=" + msg.exchangeId().topologyVersion() +
+                ", process=" + process +
+                ", awaited=" + awaitMergedMsgs + ']');
+
+            if (process) {
                 mergedJoinExchMsgs.put(node.id(), msg);
 
                 assert awaitMergedMsgs > 0 : awaitMergedMsgs;
@@ -1514,9 +1572,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
         }
 
-        if (done) {
-
-        }
+        if (done)
+            finishExchangeOnCoordinator();
     }
 
     /**
@@ -1549,9 +1606,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 else {
                     if (msg.client()) {
                         assert false;
+
+                        // TODO IGNITE-5578
+                    }
+                    else if (exchangeId().isJoined() && node.id().equals(exchId.nodeId())) {
+                        assert !CU.clientNode(node) : node;
+
+                        pendingSrvJoinMsg = msg;
                     }
-                    else if (exchangeId().isJoined() && node.id().equals(exchId.nodeId()))
-                        pendingSingleMsg = msg;
                 }
             }
 
@@ -1869,8 +1931,30 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
 
             if (exchCtx.canMergeExchanges()) {
-                cctx.exchange().mergeExchanges(this);
+                boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this);
+
+                if (!finish)
+                    return;
+            }
+
+            finishExchangeOnCoordinator();
+        }
+        catch (IgniteCheckedException e) {
+            if (reconnectOnError(e))
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            else
+                onDone(e);
+        }
+    }
+
+    /**
+     *
+     */
+    private void finishExchangeOnCoordinator() {
+        try {
+            log.info("finishExchangeOnCoordinator [topVer=" + topologyVersion() + ", resVer=" + exchCtx.events().topologyVersion() + ']');
 
+            if (exchCtx.canMergeExchanges()) {
                 cctx.affinity().onTopologyChange(this, true);
 
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
@@ -1881,6 +1965,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
+            synchronized (this) {
+                if (mergedJoinExchMsgs != null) {
+                    for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) {
+                        if (e.getValue() != null)
+                            msgs.put(e.getKey(), e.getValue());
+                    }
+                }
+            }
+
+            AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion();
+
             Map<Integer, CacheGroupAffinityMessage> cachesAff = null;
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
@@ -1905,7 +2000,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 if (affReq != null) {
                     cachesAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
-                        topologyVersion(),
+                        resTopVer,
                         affReq,
                         cachesAff);
 
@@ -1920,12 +2015,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             if (partMap == null) {
                                 partMap = new GridDhtPartitionMap(nodeId,
                                     1L,
-                                    topologyVersion(),
+                                    resTopVer,
                                     new GridPartitionStateMap(),
                                     false);
                             }
 
-                            AffinityAssignment aff = cctx.affinity().affinity(grpId).cachedAffinity(topologyVersion());
+                            AffinityAssignment aff = cctx.affinity().affinity(grpId).cachedAffinity(resTopVer);
 
                             for (int p = 0; p < aff.assignment().size(); p++) {
                                 if (aff.getIds(p).contains(nodeId))
@@ -1987,6 +2082,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     srvNodes.remove(cctx.localNode());
 
                     nodes = new ArrayList<>(srvNodes);
+
+                    if (mergedJoinExchMsgs != null) {
+                        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) {
+                            if (e.getValue() != null) {
+                                ClusterNode node = cctx.discovery().node(e.getKey());
+
+                                if (node != null)
+                                    nodes.add(node);
+                            }
+                        }
+                    }
                 }
 
                 IgniteCheckedException err = null;
@@ -2016,7 +2122,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (!nodes.isEmpty())
                     sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null);
 
-                onDone(exchangeId().topologyVersion(), err);
+                onDone(exchCtx.events().topologyVersion(), err);
             }
         }
         catch (IgniteCheckedException e) {
@@ -2166,98 +2272,100 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param msg Message.
      */
     private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
-        assert exchId.equals(msg.exchangeId()) : msg;
-        assert msg.lastVersion() != null : msg;
+        try {
+            assert exchId.equals(msg.exchangeId()) : msg;
+            assert msg.lastVersion() != null : msg;
 
-        synchronized (this) {
-            if (crd == null || finishState != null)
-                return;
+            synchronized (this) {
+                if (crd == null || finishState != null)
+                    return;
 
-            if (!crd.equals(node)) {
-                if (log.isDebugEnabled())
-                    log.debug("Received full partition map from unexpected node [oldest=" + crd.id() +
-                        ", nodeId=" + node.id() + ']');
+                if (!crd.equals(node)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received full partition map from unexpected node [oldest=" + crd.id() +
+                            ", nodeId=" + node.id() + ']');
 
-                if (node.order() > crd.order())
-                    fullMsgs.put(node, msg);
+                    if (node.order() > crd.order())
+                        fullMsgs.put(node, msg);
 
-                return;
+                    return;
+                }
+
+                finishState = new FinishState(crd.id(), msg.resultTopologyVersion());
             }
 
-            finishState = new FinishState(crd.id(), msg.resultTopologyVersion());
-        }
+            if (exchCtx.canMergeExchanges()) {
+                if (msg.resultTopologyVersion() != null && !initTopologyVersion().equals(msg.resultTopologyVersion())) {
+                    log.info("Received full message, need merge [curFut=" + topologyVersion() +
+                        ", resVer=" + msg.resultTopologyVersion() + ']');
 
-        if (exchCtx.canMergeExchanges()) {
-            try {
-                onServerNodeEvent(true);
+                    cctx.exchange().mergeExchanges(this, msg.resultTopologyVersion());
+                }
 
-                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
-                        continue;
+                if (localJoinExchange()) {
+                    Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
 
-                    grp.topology().beforeExchange(this, true);
-                }
-            }
-            catch (IgniteCheckedException e) {
-                // TODO 5578.
-                U.error(log, "Failed: " + e, e);
-            }
-        }
+                    ExchangeDiscoveryEvents evts = exchCtx.events();
 
-        Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
+                    Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 
-        if (localJoinExchange() && affReq != null) {
-            Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+                    Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity();
 
-            Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity();
+                    assert !F.isEmpty(cachesAff) : msg;
+                    assert cachesAff.size() >= affReq.size();
 
-            assert !F.isEmpty(cachesAff) : msg;
-            assert cachesAff.size() >= affReq.size();
+                    int cnt = 0;
 
-            int cnt = 0;
+                    for (CacheGroupAffinityMessage aff : cachesAff) {
+                        if (affReq.contains(aff.groupId())) {
+                            CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
 
-            for (CacheGroupAffinityMessage aff : cachesAff) {
-                if (affReq.contains(aff.groupId())) {
-                    CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+                            assert grp != null : aff.groupId();
+                            assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
 
-                    assert grp != null : aff.groupId();
-                    assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
+                            List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, evts.discoveryCache());
 
-                    List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, discoCache);
+                            // Calculate ideal assignments.
+                            if (!grp.affinity().centralizedAffinityFunction())
+                                grp.affinity().calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
 
-                    // Calculate ideal assignments.
-                    if (!grp.affinity().centralizedAffinityFunction())
-                        grp.affinity().calculate(topologyVersion(), discoEvt, discoCache);
+                            grp.affinity().initialize(evts.topologyVersion(), assignments);
 
-                    grp.affinity().initialize(topologyVersion(), assignments);
+                            grp.topology().initPartitions(this);
 
-                    try {
-                        grp.topology().initPartitions(this);
+                            cnt++;
+                        }
                     }
-                    catch (IgniteInterruptedCheckedException e) {
-                        U.warn(log, "Interrupted when initialize local partitions.");
 
-                        return;
-                    }
+                    assert affReq.size() == cnt : cnt;
+                }
+                else {
+                    cctx.affinity().onTopologyChange(this, false);
+
+                    for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                        if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                            continue;
 
-                    cnt++;
+                        grp.topology().beforeExchange(this, true);
+                    }
                 }
             }
 
-            assert affReq.size() == cnt : cnt;
-        }
+            updatePartitionFullMap(msg);
 
-        updatePartitionFullMap(msg);
+            IgniteCheckedException err = null;
 
-        IgniteCheckedException err = null;
+            if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
+                err = new IgniteCheckedException("Cluster state change failed");
 
-        if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
-            err = new IgniteCheckedException("Cluster state change failed");
+                cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
+            }
 
-            cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
+            onDone(exchCtx.events().topologyVersion(), err);
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
         }
-
-        onDone(exchId.topologyVersion(), err);
     }
 
     /**
@@ -2280,7 +2388,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
             if (grp != null) {
-                grp.topology().update(topologyVersion(),
+                grp.topology().update(exchCtx.events().topologyVersion(),
                     entry.getValue(),
                     cntrMap,
                     msg.partsToReload(cctx.localNodeId(), grpId));
@@ -2289,7 +2397,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal()) {
-                    cctx.exchange().clientTopology(grpId, this).update(topologyVersion(),
+                    cctx.exchange().clientTopology(grpId, this).update(exchCtx.events().topologyVersion(),
                         entry.getValue(),
                         cntrMap,
                         Collections.<Integer>emptySet());

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8a5dbbb..8c11ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -152,6 +152,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         cp.partsToReloadBytes = partsToReloadBytes;
         cp.topVer = topVer;
         cp.cachesAff = cachesAff;
+        cp.resTopVer = resTopVer;
+    }
+
+    public void resultTopologyVersion(AffinityTopologyVersion resTopVer) {
+        this.resTopVer = resTopVer;
     }
 
     AffinityTopologyVersion resultTopologyVersion() {
@@ -159,18 +164,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * @param cachesAff Affinity.
      * @return Message copy.
      */
-    GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
-        assert !F.isEmpty(cachesAff) : cachesAff;
-
+    GridDhtPartitionsFullMessage copy() {
         GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
 
         copyStateTo(cp);
 
-        cp.cachesAff = cachesAff;
-
         return cp;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 7efd4aa..37de068 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -188,7 +188,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         int partCnt = grp.affinity().partitions();
 
-        assert exchFut == null || exchFut.topologyVersion().equals(top.topologyVersion()) :
+        assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.topologyVersion()) :
             "Topology version mismatch [exchId=" + exchId +
                 ", grp=" + grp.name() +
                 ", topVer=" + top.topologyVersion() + ']';
@@ -242,7 +242,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     if (msg == null) {
                         assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage(
                             top.updateSequence(),
-                            exchId.topologyVersion(),
+                            assigns.topologyVersion(),
                             grp.groupId()));
                     }
 
@@ -309,7 +309,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     if (msg == null) {
                         assigns.put(n, msg = new GridDhtPartitionDemandMessage(
                             top.updateSequence(),
-                            exchId.topologyVersion(),
+                            assigns.topologyVersion(),
                             grp.groupId()));
                     }
 


Mime
View raw message