ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-5578
Date Fri, 28 Jul 2017 10:41:33 GMT
ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6f06d95
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6f06d95
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6f06d95

Branch: refs/heads/ignite-5578
Commit: a6f06d9586bfb72967ae4e80791a8dbd6de13a94
Parents: 68e4e45
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 28 13:39:50 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 28 13:39:50 2017 +0300

----------------------------------------------------------------------
 .../affinity/AffinityTopologyVersion.java       |   7 +
 .../cache/CacheAffinitySharedManager.java       |  14 +-
 .../GridCachePartitionExchangeManager.java      |   9 +-
 .../dht/GridClientPartitionTopology.java        |  14 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |   2 +-
 .../dht/GridDhtPartitionTopology.java           |  19 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 320 +++++++++++--------
 .../dht/GridDhtPartitionsReservation.java       |   2 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  12 +-
 .../dht/GridPartitionedGetFuture.java           |   2 +-
 .../dht/GridPartitionedSingleGetFuture.java     |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   8 +-
 .../GridDhtPartitionsExchangeFuture.java        |  54 ++--
 .../dht/preloader/GridDhtPreloader.java         |  12 +-
 .../distributed/near/GridNearGetFuture.java     |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |   6 +-
 .../datastreamer/DataStreamerImpl.java          |   2 +-
 .../IgniteClientCacheStartFailoverTest.java     |   4 +-
 .../distributed/CacheExchangeMergeTest.java     |   1 +
 ...niteCacheClientNodeChangingTopologyTest.java |   5 +-
 ...teCacheClientNodePartitionsExchangeTest.java |   4 +-
 .../junits/common/GridCommonAbstractTest.java   |  22 +-
 .../cache/WaitMapExchangeFinishCallable.java    |   4 +-
 23 files changed, 316 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 8669530..44b2753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -73,6 +73,13 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
     }
 
     /**
+     * @return {@code True} if this is real topology version (neither {@link #NONE} nor {@link #ZERO}.
+     */
+    public boolean initialized() {
+        return topVer > 0;
+    }
+
+    /**
      * @return Topology version with incremented minor version.
      */
     public AffinityTopologyVersion nextMinorVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index c150e00..471f2ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1298,7 +1298,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         });
     }
 
-    public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg)
+    /**
+     * @param fut Current exchange future.
+     * @param msg Message finish message.
+     * @param resTopVer Result topology version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut,
+        GridDhtPartitionsFullMessage msg,
+        final AffinityTopologyVersion resTopVer)
         throws IgniteCheckedException {
         final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
 
@@ -1322,6 +1330,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache());
 
+                    assert resTopVer.equals(evts.topologyVersion());
+
                     // TODO 5578 transfer ideal for fairaffinity
                     // Calculate ideal assignments.
                     if (!aff.centralizedAffinityFunction())
@@ -1333,7 +1343,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     assert grp != null;
 
-                    grp.topology().initPartitions(fut);
+                    grp.topology().initPartitionsWhenAffinityReady(resTopVer, fut);
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7d6cc77..ca4eb44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchang
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1777,7 +1778,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
 
                 log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
-                    ", mergedFut=" + fut.initialVersion() + ']');
+                    ", mergedFut=" + fut.initialVersion() +
+                    ", evt=" + IgniteUtils.gridEventName(fut.discoveryEvent().type()) +
+                    ", evtNode=" + fut.discoveryEvent().eventNode().id()+ ']');
 
                 DiscoveryEvent evt = fut.discoveryEvent();
 
@@ -1876,7 +1879,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     }
 
                     log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
-                        ", mergedFut=" + fut.initialVersion() + ']');
+                        ", mergedFut=" + fut.initialVersion() +
+                        ", evt=" + IgniteUtils.gridEventName(fut.discoveryEvent().type()) +
+                        ", evtNode=" + fut.discoveryEvent().eventNode().id()+ ']');
 
                     curFut.context().events().addEvent(fut.initialVersion(),
                         fut.discoveryEvent(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index e3260ea..975385f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -214,7 +214,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
+    @Override public AffinityTopologyVersion readyTopologyVersion() {
         lock.readLock().lock();
 
         try {
@@ -228,6 +228,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion lastTopologyChangeVersion() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtTopologyFuture topologyVersionFuture() {
         assert topReadyFut != null;
 
@@ -240,7 +245,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+        GridDhtPartitionsExchangeFuture exchFut) {
         // No-op.
     }
 
@@ -857,14 +863,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
+    @Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) {
         assert false : "detectLostPartitions should never be called on client topology";
 
         return false;
     }
 
     /** {@inheritDoc} */
-    @Override public void resetLostPartitions() {
+    @Override public void resetLostPartitions(AffinityTopologyVersion affVer) {
         assert false : "resetLostPartitions should never be called on client topology";
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 171871b..1f67c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -433,7 +433,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             // While we are holding read lock, register lock future for partition release future.
             IgniteUuid lockId = IgniteUuid.fromUuid(ctx.localNodeId());
 
-            topVer = top.topologyVersion();
+            topVer = top.readyTopologyVersion();
 
             MultiUpdateFuture fut = new MultiUpdateFuture(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d3879cc..8911aa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -73,7 +73,12 @@ public interface GridDhtPartitionTopology {
      *
      * @return Topology version.
      */
-    public AffinityTopologyVersion topologyVersion();
+    public AffinityTopologyVersion readyTopologyVersion();
+
+    /**
+     * @return Start topology version of last exchange.
+     */
+    public AffinityTopologyVersion lastTopologyChangeVersion();
 
     /**
      * Gets a future that will be completed when partition exchange map for this
@@ -107,7 +112,8 @@ public interface GridDhtPartitionTopology {
      * @param exchFut Exchange future.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException;
+    public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut)
+        throws IgniteInterruptedCheckedException;
 
     /**
      * Post-initializes this topology.
@@ -243,7 +249,7 @@ public interface GridDhtPartitionTopology {
     public void onRemoved(GridDhtCacheEntry e);
 
     /**
-     * @param exchangeVer Topology version from exchange. Value should be greater than previously passed. Null value
+     * @param exchangeResVer Result topology version for exchange. Value should be greater than previously passed. Null value
      *      means full map received is not related to exchange
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
@@ -253,7 +259,7 @@ public interface GridDhtPartitionTopology {
      * @return {@code True} if local state was changed.
      */
     public boolean update(
-        @Nullable AffinityTopologyVersion exchangeVer,
+        @Nullable AffinityTopologyVersion exchangeResVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap,
         Set<Integer> partsToReload,
@@ -279,15 +285,16 @@ public interface GridDhtPartitionTopology {
      * <p>
      * This method should be called on topology coordinator after all partition messages are received.
      *
+     * @param resTopVer Exchange result version.
      * @param discoEvt Discovery event for which we detect lost partitions.
      * @return {@code True} if partitions state got updated.
      */
-    public boolean detectLostPartitions(DiscoveryEvent discoEvt);
+    public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt);
 
     /**
      * Resets the state of all LOST partitions to OWNING.
      */
-    public void resetLostPartitions();
+    public void resetLostPartitions(AffinityTopologyVersion resTopVer);
 
     /**
      * @return Collection of lost partitions, if any.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1b4bbcc..02790c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -112,10 +112,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE;
 
     /** */
-    private AffinityTopologyVersion lastExchangeVer;
+    //private AffinityTopologyVersion lastExchangeVer;
 
     /** */
-    private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+    // private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE;
+
+    /** */
+    private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE;
+
+    /** */
+    private volatile AffinityTopologyVersion lastTopChangeVer = AffinityTopologyVersion.NONE;
 
     /** Discovery cache. */
     private volatile DiscoCache discoCache;
@@ -173,8 +179,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             diffFromAffinity.clear();
 
-            lastExchangeVer = null;
-
             updateSeq.set(1);
 
             topReadyFut = null;
@@ -183,7 +187,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
-            topVer = AffinityTopologyVersion.NONE;
+            readyTopVer = AffinityTopologyVersion.NONE;
+
+            lastTopChangeVer = AffinityTopologyVersion.NONE;
 
             discoCache = ctx.discovery().discoCache();
         }
@@ -232,7 +238,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
 
-            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
+            assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [topVer=" + readyTopVer +
                 ", exchTopVer=" + exchTopVer +
                 ", fut=" + exchFut + ']';
 
@@ -244,7 +250,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
-            topVer = exchTopVer;
+            lastTopChangeVer = exchTopVer;
 
             this.discoCache = discoCache;
         }
@@ -254,8 +260,18 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        AffinityTopologyVersion topVer = this.topVer;
+    @Override public AffinityTopologyVersion readyTopologyVersion() {
+        AffinityTopologyVersion topVer = this.readyTopVer;
+
+        assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
+            ", group=" + grp.cacheOrGroupName() + ']';
+
+        return topVer;
+    }
+
+    @Override
+    public AffinityTopologyVersion lastTopologyChangeVersion() {
+        AffinityTopologyVersion topVer = this.lastTopChangeVer;
 
         assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
             ", group=" + grp.cacheOrGroupName() + ']';
@@ -276,8 +292,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void initPartitions(
-        GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException {
+    @Override public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+        GridDhtPartitionsExchangeFuture exchFut)
+        throws IgniteInterruptedCheckedException
+    {
         U.writeLock(lock);
 
         try {
@@ -286,9 +304,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            topVer = exchFut.context().events().topologyVersion();
-
-            initPartitions0(exchFut, updateSeq);
+            initPartitions0(affVer, exchFut, updateSeq);
 
             consistencyCheck();
         }
@@ -298,13 +314,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /**
+     * @param affVer Affinity version to use.
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
      */
-    private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
-        AffinityTopologyVersion resTopVer = exchFut.context().events().topologyVersion();
-
-        List<List<ClusterNode>> aff = grp.affinity().readyAssignments(resTopVer);
+    private void initPartitions0(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
+        List<List<ClusterNode>> aff = grp.affinity().readyAssignments(affVer);
 
         if (grp.affinityNode()) {
             ClusterNode loc = ctx.localNode();
@@ -313,10 +328,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-            assert grp.affinity().lastVersion().equals(resTopVer) :
+            assert grp.affinity().lastVersion().equals(affVer) :
                 "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
                     ", grp=" + grp.cacheOrGroupName() +
-                    ", futVer=" + resTopVer +
+                    ", affVer=" + affVer +
                     ", fut=" + exchFut + ']';
 
             int num = grp.affinity().partitions();
@@ -341,18 +356,18 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                             if (log.isDebugEnabled())
                                 log.debug("Owned partition for oldest node: " + locPart);
 
-                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
                         }
                     }
                 }
                 else
-                    createPartitions(aff, updateSeq);
+                    createPartitions(affVer, aff, updateSeq);
             }
             else {
                 // If preloader is disabled, then we simply clear out
                 // the partitions this node is not responsible for.
                 for (int p = 0; p < num; p++) {
-                    GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true, false);
+                    GridDhtLocalPartition locPart = localPartition0(p, affVer, false, true, false);
 
                     boolean belongs = localNode(p, aff);
 
@@ -363,7 +378,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                             if (state.active()) {
                                 locPart.rent(false);
 
-                                updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                                updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Evicting partition with rebalancing disabled " +
@@ -378,13 +393,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                         locPart.own();
 
-                        updateLocal(p, locPart.state(), updateSeq);
+                        updateLocal(p, locPart.state(), updateSeq, affVer);
                     }
                 }
             }
 
             if (node2part != null && node2part.valid())
-                checkEvictions(updateSeq, aff);
+                checkEvictions(updateSeq, affVer, aff);
         }
 
         updateRebalanceVersion(aff);
@@ -394,7 +409,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      * @param updateSeq Update sequence.
      */
-    private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
+    private void createPartitions(AffinityTopologyVersion affVer, List<List<ClusterNode>> aff, long updateSeq) {
         if (!grp.affinityNode())
             return;
 
@@ -407,7 +422,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // will be created in MOVING state.
                     GridDhtLocalPartition locPart = createPartition(p);
 
-                    updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                    updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
                 }
             }
             // If this node's map is empty, we pre-create local partitions,
@@ -435,12 +450,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (stopping)
                         return;
 
-                    ExchangeDiscoveryEvents evts = exchFut.context().events();
-
-                    assert topVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + topVer +
+                    assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer +
                         ", exchId=" + exchFut.exchangeId() + ']';
 
-                    topVer = evts.topologyVersion();
+                    ExchangeDiscoveryEvents evts = exchFut.context().events();
 
                     for (DiscoveryEvent evt : evts.events()) {
                         if ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && !CU.clientNode(evt.eventNode()))
@@ -498,12 +511,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (grpStarted ||
                             exchFut.discoveryEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
                             exchFut.serverNodeDiscoveryEvent()) {
-                            if (affReady)
-                                initPartitions0(exchFut, updateSeq);
+                            if (affReady) {
+                                assert grp.affinity().lastVersion().equals(evts.topologyVersion());
+
+                                initPartitions0(evts.topologyVersion(), exchFut, updateSeq);
+                            }
                             else {
+                                assert !exchFut.context().mergeExchanges();
+
                                 List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
 
-                                createPartitions(aff, updateSeq);
+                                createPartitions(exchFut.initialVersion(), aff, updateSeq);
                             }
                         }
                     }
@@ -553,6 +571,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return false;
 
+            assert readyTopVer.topologyVersion() > 0 : readyTopVer;
+            assert lastTopChangeVer.equals(readyTopVer);
+
             if (log.isDebugEnabled())
                 log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" +
                     fullMapString() + ']');
@@ -585,7 +606,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + ", locPart=" +
                                     locPart + ']';
 
-                                updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                                updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
 
                                 changed = true;
 
@@ -607,7 +628,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                     locPart + ", owners = " + owners + ']');
                         }
                         else
-                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
                     }
                 }
                 else {
@@ -617,7 +638,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (state == MOVING) {
                             locPart.rent(false);
 
-                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
 
                             changed = true;
 
@@ -628,7 +649,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            updateRebalanceVersion(grp.affinity().assignments(topVer));
+            updateRebalanceVersion(grp.affinity().readyAssignments(topVer));
 
             consistencyCheck();
         }
@@ -748,7 +769,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (!belongs)
                     throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
                         "(often may be caused by inconsistent 'key.hashCode()' implementation) " +
-                        "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
+                        "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.readyTopVer + ']');
             }
             else if (loc != null && state == RENTING && !showRenting)
                 throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted " +
@@ -758,7 +779,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (!belongs)
                     throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " +
                         "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
-                        "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
+                        "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.readyTopVer + ']');
 
                 locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
@@ -837,7 +858,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
          * ===================================================
          */
 
-        GridDhtLocalPartition loc = localPartition(e.partition(), topologyVersion(), false);
+        GridDhtLocalPartition loc = localPartition(e.partition(), readyTopVer, false);
 
         if (loc != null)
             loc.onRemoved(e);
@@ -861,7 +882,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             return new GridDhtPartitionMap(ctx.localNodeId(),
                 updateSeq.get(),
-                topVer,
+                readyTopVer,
                 map,
                 true);
         }
@@ -921,7 +942,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer +
-                ", topVer2=" + this.topVer +
+                ", topVer2=" + this.readyTopVer +
                 ", node=" + ctx.igniteInstanceName() +
                 ", grp=" + grp.cacheOrGroupName() +
                 ", node2part=" + node2part + ']';
@@ -1101,31 +1122,22 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Override public boolean update(
-        @Nullable AffinityTopologyVersion exchangeVer,
+        @Nullable AffinityTopologyVersion exchangeResVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> incomeCntrMap,
         Set<Integer> partsToReload,
         @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
-            log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
+            log.debug("Updating full partition map [exchVer=" + exchangeResVer + ", parts=" + fullMapString() + ']');
 
         assert partMap != null;
 
         lock.writeLock().lock();
 
         try {
-            if (stopping)
+            if (stopping || lastTopChangeVer == null)
                 return false;
 
-            if (exchangeVer == null && (topReadyFut == null || !topReadyFut.isDone()))
-                return false;
-
-            if (exchangeVer != null) {
-                assert exchangeVer.compareTo(topVer) >= 0 : exchangeVer;
-
-                topVer = exchangeVer;
-            }
-
             if (incomeCntrMap != null) {
                 // update local map partition counters
                 for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet()) {
@@ -1149,18 +1161,32 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for full partition map update (will ignore) [lastExch=" +
-                        lastExchangeVer + ", exch=" + exchangeVer + ']');
+            if (exchangeResVer != null) {
+                // Ignore if exchange already finished or new exchange started.
+                if (readyTopVer.compareTo(exchangeResVer) >= 0 || lastTopChangeVer.compareTo(exchangeResVer) > 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Stale exchange id for full partition map update (will ignore) [" +
+                            "lastTopChange=" + lastTopChangeVer +
+                            ", readTopVer=" + readyTopVer +
+                            ", exchVer=" + exchangeResVer + ']');
+                    }
 
-                return false;
+                    U.warn(log, "Stale exchange id for full partition map update (will ignore) [" +
+                        "lastTopChange=" + lastTopChangeVer +
+                        ", readTopVer=" + readyTopVer +
+                        ", exchVer=" + exchangeResVer + ']');
+
+                    return false;
+                }
             }
 
-            if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer) > 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale version for full partition map update message (will ignore) [lastExch=" +
-                        lastExchangeVer + ", topVersion=" + msgTopVer + ']');
+            if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Stale version for full partition map update message (will ignore) [" +
+                        "lastTopChange=" + lastTopChangeVer +
+                        ", readTopVer=" + readyTopVer +
+                        ", msgVer=" + msgTopVer + ']');
+                }
 
                 return false;
             }
@@ -1174,9 +1200,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (shouldOverridePartitionMap(part, newPart)) {
                         fullMapUpdated = true;
 
-                        if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update map [exchId=" + exchangeVer + ", curPart=" +
-                                mapString(part) + ", newPart=" + mapString(newPart) + ']');
+                        if (log.isDebugEnabled()) {
+                            log.debug("Overriding partition map in full update map [exchVer=" + exchangeResVer +
+                                ", curPart=" + mapString(part) +
+                                ", newPart=" + mapString(newPart) + ']');
+                        }
                     }
                     else {
                         // If for some nodes current partition has a newer map,
@@ -1208,22 +1236,26 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             }
 
             if (!fullMapUpdated) {
-                if (log.isDebugEnabled())
-                    log.debug("No updates for full partition map (will ignore) [lastExch=" +
-                            lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+                if (log.isDebugEnabled()) {
+                    log.debug("No updates for full partition map (will ignore) [lastExch=" + lastTopChangeVer +
+                        ", exchVer=" + exchangeResVer +
+                        ", curMap=" + node2part +
+                        ", newMap=" + partMap + ']');
+                }
 
                 return false;
             }
 
-            if (exchangeVer != null)
-                lastExchangeVer = exchangeVer;
+            if (exchangeResVer != null) {
+                assert exchangeResVer.compareTo(readyTopVer) >= 0 && exchangeResVer.compareTo(lastTopChangeVer) >= 0;
 
-            node2part = partMap;
+                lastTopChangeVer = readyTopVer = exchangeResVer;
+            }
 
-            AffinityTopologyVersion affVer = grp.affinity().lastVersion();
+            node2part = partMap;
 
-            if (affVer.topologyVersion() > 0 && diffFromAffinityVer.compareTo(affVer) <= 0) {
-                AffinityAssignment affAssignment = grp.affinity().cachedAffinity(affVer);
+            if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
+                AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
 
                 for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
                     for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
@@ -1248,7 +1280,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     }
                 }
 
-                diffFromAffinityVer = affVer;
+                diffFromAffinityVer = readyTopVer;
             }
 
             boolean changed = false;
@@ -1322,10 +1354,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = grp.affinity().readyAssignments(topVer);
+            if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
+                List<List<ClusterNode>> aff = grp.affinity().readyAssignments(readyTopVer);
 
-                changed |= checkEvictions(updateSeq, aff);
+                changed |= checkEvictions(updateSeq, readyTopVer, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1430,18 +1462,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 return false;
 
             if (!force) {
-                if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
+                if (lastTopChangeVer != null && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) {
                     if (log.isDebugEnabled())
                         log.debug("Stale exchange id for single partition map update (will ignore) [lastExch=" +
-                            lastExchangeVer + ", exch=" + exchId.topologyVersion() + ']');
+                            lastTopChangeVer + ", exch=" + exchId.topologyVersion() + ']');
 
                     return false;
                 }
             }
 
-            if (exchId != null)
-                lastExchangeVer = exchId.topologyVersion();
-
             if (node2part == null)
                 // Create invalid partition map.
                 node2part = new GridDhtPartitionFullMap();
@@ -1471,56 +1500,57 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             node2part.put(parts.nodeId(), parts);
 
-            AffinityTopologyVersion affVer = grp.affinity().lastVersion();
-
-            if (affVer.compareTo(diffFromAffinityVer) >= 0) {
-                AffinityAssignment affAssignment = grp.affinity().readyAffinity(affVer);
+            // During exchange calculate diff after all messages are received and affinity initialized.
+            if (exchId == null) {
+                if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
+                    AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
 
-                // Add new mappings.
-                for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
-                    int p = e.getKey();
+                    // Add new mappings.
+                    for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
+                        int p = e.getKey();
 
-                    Set<UUID> diffIds = diffFromAffinity.get(p);
+                        Set<UUID> diffIds = diffFromAffinity.get(p);
 
-                    if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING)
-                        && !affAssignment.getIds(p).contains(parts.nodeId())) {
-                        if (diffIds == null)
-                            diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+                        if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING)
+                            && !affAssignment.getIds(p).contains(parts.nodeId())) {
+                            if (diffIds == null)
+                                diffFromAffinity.put(p, diffIds = U.newHashSet(3));
 
-                        if (diffIds.add(parts.nodeId()))
-                            changed = true;
-                    }
-                    else {
-                        if (diffIds != null && diffIds.remove(parts.nodeId())) {
-                            changed = true;
+                            if (diffIds.add(parts.nodeId()))
+                                changed = true;
+                        }
+                        else {
+                            if (diffIds != null && diffIds.remove(parts.nodeId())) {
+                                changed = true;
 
-                            if (diffIds.isEmpty())
-                                diffFromAffinity.remove(p);
+                                if (diffIds.isEmpty())
+                                    diffFromAffinity.remove(p);
+                            }
                         }
                     }
-                }
 
-                // Remove obsolete mappings.
-                if (cur != null) {
-                    for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
-                        Set<UUID> ids = diffFromAffinity.get(p);
+                    // Remove obsolete mappings.
+                    if (cur != null) {
+                        for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
+                            Set<UUID> ids = diffFromAffinity.get(p);
 
-                        if (ids != null && ids.remove(parts.nodeId())) {
-                            changed = true;
+                            if (ids != null && ids.remove(parts.nodeId())) {
+                                changed = true;
 
-                            if (ids.isEmpty())
-                                diffFromAffinity.remove(p);
+                                if (ids.isEmpty())
+                                    diffFromAffinity.remove(p);
+                            }
                         }
                     }
-                }
 
-                diffFromAffinityVer = affVer;
+                    diffFromAffinityVer = readyTopVer;
+                }
             }
 
-            if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
+            if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
+                List<List<ClusterNode>> aff = grp.affinity().assignments(readyTopVer);
 
-                changed |= checkEvictions(updateSeq, aff);
+                changed |= checkEvictions(updateSeq, readyTopVer, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1545,6 +1575,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         lock.writeLock().lock();
 
         try {
+            assert assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion());
+
+            readyTopVer = lastTopChangeVer = assignment.topologyVersion();
+
             if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
                 rebuildDiff(assignment);
 
@@ -1587,13 +1621,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 UUID nodeId = e.getKey();
 
                 for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
-                    int p0 = e0.getKey();
+                    Integer p0 = e0.getKey();
 
                     GridDhtPartitionState state = e0.getValue();
 
                     Set<UUID> ids = diffFromAffinity.get(p0);
 
-                    if ((state == MOVING || state == OWNING) && !affAssignment.getIds(p0).contains(nodeId)) {
+                    if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId)) {
                         if (ids == null)
                             diffFromAffinity.put(p0, ids = U.newHashSet(3));
 
@@ -1611,7 +1645,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
+    @Override public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt) {
         lock.writeLock().lock();
 
         try {
@@ -1647,13 +1681,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 for (Integer part : lost) {
                     long updSeq = updateSeq.incrementAndGet();
 
-                    GridDhtLocalPartition locPart = localPartition(part, topVer, false);
+                    GridDhtLocalPartition locPart = localPartition(part, resTopVer, false);
 
                     if (locPart != null) {
                         boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost();
 
                         if (marked)
-                            updateLocal(locPart.id(), locPart.state(), updSeq);
+                            updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer);
 
                         changed |= marked;
                     }
@@ -1689,7 +1723,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void resetLostPartitions() {
+    @Override public void resetLostPartitions(AffinityTopologyVersion resTopVer) {
         lock.writeLock().lock();
 
         try {
@@ -1702,18 +1736,18 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                     e0.setValue(OWNING);
 
-                    GridDhtLocalPartition locPart = localPartition(e0.getKey(), topVer, false);
+                    GridDhtLocalPartition locPart = localPartition(e0.getKey(), resTopVer, false);
 
                     if (locPart != null && locPart.state() == LOST) {
                         boolean marked = locPart.own();
 
                         if (marked)
-                            updateLocal(locPart.id(), locPart.state(), updSeq);
+                            updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer);
                     }
                 }
             }
 
-            checkEvictions(updSeq, grp.affinity().assignments(topVer));
+            checkEvictions(updSeq, resTopVer, grp.affinity().readyAssignments(resTopVer));
 
             grp.needsRecovery(false);
         }
@@ -1814,7 +1848,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      * @return Checks if any of the local partitions need to be evicted.
      */
-    private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
+    private boolean checkEvictions(long updateSeq, AffinityTopologyVersion affVer, List<List<ClusterNode>> aff) {
         boolean changed = false;
 
         UUID locId = ctx.localNodeId();
@@ -1831,7 +1865,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 List<ClusterNode> affNodes = aff.get(p);
 
                 if (!affNodes.contains(ctx.localNode())) {
-                    List<ClusterNode> nodes = nodes(p, topVer, OWNING, null);
+                    List<ClusterNode> nodes = nodes(p, affVer, OWNING, null);
                     Collection<UUID> nodeIds = F.nodeIds(nodes);
 
                     // If all affinity nodes are owners, then evict partition from local node.
@@ -1840,7 +1874,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                         part.rent(false);
 
-                        updateSeq = updateLocal(part.id(), part.state(), updateSeq);
+                        updateSeq = updateLocal(part.id(), part.state(), updateSeq, affVer);
 
                         changed = true;
 
@@ -1865,7 +1899,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                                     part.rent(false);
 
-                                    updateSeq = updateLocal(part.id(), part.state(), updateSeq);
+                                    updateSeq = updateLocal(part.id(), part.state(), updateSeq, affVer);
 
                                     changed = true;
 
@@ -1894,7 +1928,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return Update sequence.
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
+    private long updateLocal(int p, GridDhtPartitionState state, long updateSeq, AffinityTopologyVersion affVer) {
         assert lock.isWriteLockedByCurrentThread();
 
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
@@ -1936,14 +1970,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (map == null) {
                 map = new GridDhtPartitionMap(locNodeId,
                     updateSeq,
-                    topVer,
+                    affVer,
                     GridPartitionStateMap.EMPTY,
                     false);
 
                 node2part.put(locNodeId, map);
             }
 
-            map.updateSequence(updateSeq, topVer);
+            map.updateSequence(updateSeq, affVer);
 
             map.put(p, state);
 
@@ -2007,7 +2041,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             if (part.own()) {
-                updateLocal(part.id(), part.state(), updateSeq.incrementAndGet());
+                assert lastTopChangeVer.initialized() : lastTopChangeVer;
+
+                updateLocal(part.id(), part.state(), updateSeq.incrementAndGet(), lastTopChangeVer);
 
                 consistencyCheck();
 
@@ -2038,7 +2074,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (part.reload())
                 part = createPartition(part.id());
 
-            updateLocal(part.id(), part.state(), seq);
+            assert lastTopChangeVer.initialized() : lastTopChangeVer;
+
+            updateLocal(part.id(), part.state(), seq, lastTopChangeVer);
 
             consistencyCheck();
         }
@@ -2107,7 +2145,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
-        AffinityTopologyVersion curTopVer = this.topVer;
+        AffinityTopologyVersion curTopVer = this.readyTopVer;
 
         return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer);
     }
@@ -2191,7 +2229,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      */
     private void updateRebalanceVersion(List<List<ClusterNode>> aff) {
-        if (!rebalancedTopVer.equals(topVer)) {
+        if (!rebalancedTopVer.equals(readyTopVer)) {
             if (node2part == null || !node2part.valid())
                 return;
 
@@ -2226,7 +2264,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     return;
             }
 
-            rebalancedTopVer = topVer;
+            rebalancedTopVer = readyTopVer;
 
             if (log.isDebugEnabled())
                 log.debug("Updated rebalanced version [cache=" + grp.cacheOrGroupName() + ", ver=" + rebalancedTopVer + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
index 2f51c5a..de58188 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -205,7 +205,7 @@ public class GridDhtPartitionsReservation implements GridReservable {
             if (reservations.compareAndSet(r, r - 1)) {
                 // If it was the last reservation and topology version changed -> attempt to evict partitions.
                 if (r == 1 && !cctx.kernalContext().isStopping() &&
-                    !topVer.equals(cctx.topology().topologyVersion()))
+                    !topVer.equals(cctx.topology().lastTopologyChangeVersion()))
                     tryEvict(parts.get());
 
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 73942ff..daedda8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -850,17 +850,17 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                 }
 
                 try {
-                    if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+                    if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) {
                         if (log.isDebugEnabled()) {
                             log.debug("Client topology version mismatch, need remap lock request [" +
                                 "reqTopVer=" + req.topologyVersion() +
-                                ", locTopVer=" + top.topologyVersion() +
+                                ", locTopVer=" + top.readyTopologyVersion() +
                                 ", req=" + req + ']');
                         }
 
                         GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
                             req,
-                            top.topologyVersion());
+                            top.lastTopologyChangeVersion());
 
                         return new GridFinishedFuture<>(res);
                     }
@@ -949,17 +949,17 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                     }
 
                     try {
-                        if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+                        if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) {
                             if (log.isDebugEnabled()) {
                                 log.debug("Client topology version mismatch, need remap lock request [" +
                                     "reqTopVer=" + req.topologyVersion() +
-                                    ", locTopVer=" + top.topologyVersion() +
+                                    ", locTopVer=" + top.readyTopologyVersion() +
                                     ", req=" + req + ']');
                             }
 
                             GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
                                 req,
-                                top.topologyVersion());
+                                top.lastTopologyChangeVersion());
 
                             return new GridFinishedFuture<>(res);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index deae466..7e7a1c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -551,7 +551,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     return true;
                 }
 
-                boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+                boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().lastTopologyChangeVersion());
 
                 // Entry not found, do not continue search if topology did not change and there is no store.
                 if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index d66afca..b001f2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -442,7 +442,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                     return true;
                 }
 
-                boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+                boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().lastTopologyChangeVersion());
 
                 // Entry not found, complete future with null result if topology did not change and there is no store.
                 if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 93d22aa..3e43e2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1799,7 +1799,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.readyTopologyVersion())) {
                         DhtAtomicUpdateResult updRes = update(node, locked, req, res);
 
                         dhtFut = updRes.dhtFuture();
@@ -1808,7 +1808,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
                     else
                         // Should remap all keys.
-                        res.remapTopologyVersion(top.topologyVersion());
+                        res.remapTopologyVersion(top.lastTopologyChangeVersion());
                 }
                 finally {
                     top.readUnlock();
@@ -1842,7 +1842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
 
-            res.remapTopologyVersion(ctx.topology().topologyVersion());
+            res.remapTopologyVersion(ctx.topology().lastTopologyChangeVersion());
         }
         catch (Throwable e) {
             // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
@@ -1945,7 +1945,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean hasNear = req.nearCache();
 
         // Assign next version for update inside entries lock.
-        GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+        GridCacheVersion ver = ctx.versions().next(top.readyTopologyVersion());
 
         if (hasNear)
             res.nearVersion(ver);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e2e62ba..1cd72e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1373,9 +1373,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return {@code True} if exchange triggered by server node join or fail.
      */
     public boolean serverNodeDiscoveryEvent() {
-        assert discoEvt != null;
+        assert exchCtx != null;
 
-        return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode());
+        return exchCtx.events().serverJoin() || exchCtx.events().serverLeft();
     }
 
     /** {@inheritDoc} */
@@ -1402,12 +1402,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         if (err == null) {
             if (centralizedAff) {
+                assert !exchCtx.mergeExchanges();
+
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (grp.isLocal())
                         continue;
 
                     try {
-                        grp.topology().initPartitions(this);
+                        grp.topology().initPartitionsWhenAffinityReady(res, this);
                     }
                     catch (IgniteInterruptedCheckedException e) {
                         U.error(log, "Failed to initialize partitions.", e);
@@ -1428,11 +1430,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (serverNodeDiscoveryEvent() &&
-                (discoEvt.type() == EVT_NODE_LEFT ||
-                discoEvt.type() == EVT_NODE_FAILED ||
-                discoEvt.type() == EVT_NODE_JOINED))
-                detectLostPartitions();
+            if (serverNodeDiscoveryEvent())
+                detectLostPartitions(res);
 
             Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
 
@@ -2072,8 +2071,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * Detect lost partitions.
+     *
+     * @param resTopVer Result topology version.
      */
-    private void detectLostPartitions() {
+    private void detectLostPartitions(AffinityTopologyVersion resTopVer) {
         boolean detected = false;
 
         synchronized (cctx.exchange().interruptLock()) {
@@ -2082,7 +2083,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (!grp.isLocal()) {
-                    boolean detectedOnGrp = grp.topology().detectLostPartitions(discoEvt);
+                    boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, discoEvt);
 
                     detected |= detectedOnGrp;
                 }
@@ -2097,6 +2098,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param cacheNames Cache names.
      */
     private void resetLostPartitions(Collection<String> cacheNames) {
+        assert !exchCtx.mergeExchanges();
+
         synchronized (cctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 return;
@@ -2107,7 +2110,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 for (String cacheName : cacheNames) {
                     if (grp.hasCache(cacheName)) {
-                        grp.topology().resetLostPartitions();
+                        grp.topology().resetLostPartitions(initialVersion());
 
                         break;
                     }
@@ -2274,7 +2277,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     assignPartitionsStates();
 
                 if (exchCtx.events().serverLeft())
-                    detectLostPartitions();
+                    detectLostPartitions(resTopVer);
             }
 
             updateLastVersion(cctx.versions().last());
@@ -2653,16 +2656,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             else
                 assert node == null : node;
 
+            AffinityTopologyVersion resTopVer = initialVersion();
+
             if (exchCtx.mergeExchanges()) {
                 if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) {
                     log.info("Received full message, need merge [curFut=" + initialVersion() +
                         ", resVer=" + msg.resultTopologyVersion() + ']');
 
+                    resTopVer = msg.resultTopologyVersion();
+
                     cctx.exchange().mergeExchanges(this, msg);
+
+                    assert resTopVer.equals(exchCtx.events().topologyVersion()) :  "Unexpected result version [" +
+                        "msgVer=" + resTopVer +
+                        ", locVer=" + exchCtx.events().topologyVersion() + ']';
                 }
 
                 if (localJoinExchange())
-                    cctx.affinity().onLocalJoin(this, msg);
+                    cctx.affinity().onLocalJoin(this, msg, resTopVer);
                 else {
                     if (exchCtx.events().serverLeft())
                         cctx.affinity().mergeExchangesOnServerLeft(this, msg);
@@ -2678,9 +2689,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (cctx.localNode().order() == 3L)
-                System.out.println();
-            updatePartitionFullMap(msg);
+            updatePartitionFullMap(resTopVer, msg);
 
             IgniteCheckedException err = null;
 
@@ -2690,7 +2699,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
             }
 
-            onDone(exchCtx.events().topologyVersion(), err);
+            onDone(resTopVer, err);
         }
         catch (IgniteCheckedException e) {
             onDone(e);
@@ -2700,9 +2709,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * Updates partition map in all caches.
      *
+     * @param resTopVer Result topology version.
      * @param msg Partitions full messages.
      */
-    private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) {
+    private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPartitionsFullMessage msg) {
         cctx.versions().onExchange(msg.lastVersion().order());
 
         assert partHistSuppliers.isEmpty();
@@ -2717,7 +2727,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
             if (grp != null) {
-                grp.topology().update(exchCtx.events().topologyVersion(),
+                grp.topology().update(resTopVer,
                     entry.getValue(),
                     cntrMap,
                     msg.partsToReload(cctx.localNodeId(), grpId),
@@ -2775,6 +2785,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     assert centralizedAff;
 
                     if (crd.equals(node)) {
+                        AffinityTopologyVersion resTopVer = initialVersion();
+
                         cctx.affinity().onExchangeChangeAffinityMessage(GridDhtPartitionsExchangeFuture.this,
                             crd.isLocal(),
                             msg);
@@ -2785,10 +2797,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             assert partsMsg != null : msg;
                             assert partsMsg.lastVersion() != null : partsMsg;
 
-                            updatePartitionFullMap(partsMsg);
+                            updatePartitionFullMap(resTopVer, partsMsg);
                         }
 
-                        onDone(initialVersion());
+                        onDone(resTopVer);
                     }
                     else {
                         if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index de21773..11d9c77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -185,18 +185,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         GridDhtPartitionTopology top = grp.topology();
 
         if (!grp.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
+            return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion());
 
         int partCnt = grp.affinity().partitions();
 
-        assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.topologyVersion()) :
+        AffinityTopologyVersion topVer = top.readyTopologyVersion();
+
+        assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) :
             "Topology version mismatch [exchId=" + exchId +
             ", grp=" + grp.name() +
-            ", topVer=" + top.topologyVersion() + ']';
-
-        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
+            ", topVer=" + top.readyTopologyVersion() + ']';
 
-        AffinityTopologyVersion topVer = assigns.topologyVersion();
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, topVer);
 
         AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index a49812e..3ee9734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -626,7 +626,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     return true;
                 }
                 else {
-                    boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+                    boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().lastTopologyChangeVersion());
 
                     // Entry not found, do not continue search if topology did not change and there is no store.
                     return !cctx.readThroughConfigured() && (topStable || partitionOwned(part));

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 54cfc2b..753f8d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -381,13 +381,13 @@ public class IgniteTxHandler {
             }
 
             try {
-                if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) {
+                if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion(), req)) {
                     if (txPrepareMsgLog.isDebugEnabled()) {
                         txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" +
                             "txId=" + req.version() +
                             ", node=" + nearNode.id() +
                             ", reqTopVer=" + req.topologyVersion() +
-                            ", locTopVer=" + top.topologyVersion() +
+                            ", locTopVer=" + top.readyTopologyVersion() +
                             ", req=" + req + ']');
                     }
 
@@ -400,7 +400,7 @@ public class IgniteTxHandler {
                         req.version(),
                         null,
                         null,
-                        top.topologyVersion(),
+                        top.lastTopologyChangeVersion(),
                         req.onePhaseCommit(),
                         req.deployInfo() != null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 147e6f0..25b319f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -770,7 +770,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             try {
                 AffinityTopologyVersion topVer = allowOverwrite() || cctx.isLocal() ?
                         ctx.cache().context().exchange().readyAffinityVersion() :
-                        cctx.topology().topologyVersion();
+                        cctx.topology().readyTopologyVersion();
 
                 for (DataStreamerEntry entry : entries) {
                     List<ClusterNode> nodes;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
index f32e15f..eda0a49 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
@@ -287,7 +287,9 @@ public class IgniteClientCacheStartFailoverTest extends GridCommonAbstractTest {
             for (String cacheName : cacheNames) {
                 GridDhtPartitionTopology top = node.context().cache().internalCache(cacheName).context().topology();
 
-                assertEquals(topVer, top.topologyVersion());
+                waitForReadyTopology(top, topVer);
+
+                assertEquals(topVer, top.readyTopologyVersion());
 
                 assertFalse(top.rebalanceFinished(topVer));
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 16bac3e..6d6da28 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -181,6 +181,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
                     Ignite node = startGrid(idx.getAndIncrement());
 
+                    // TODO 5578 non-intersecting keys.
 //                    if (getTestIgniteInstanceName(0).equals(node.name()))
 //                        checkNodeCaches(node);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 9fe41f2..c82b4b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -755,7 +755,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         AffinityTopologyVersion topVer1 = new AffinityTopologyVersion(4, 0);
 
-        assertEquals(topVer1, ignite0.context().cache().internalCache(DEFAULT_CACHE_NAME).context().topology().topologyVersion());
+        assertEquals(topVer1,
+            ignite0.context().cache().internalCache(DEFAULT_CACHE_NAME).context().topology().readyTopologyVersion());
 
         TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
 
@@ -796,7 +797,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         ignite0.context().cache().context().exchange().affinityReadyFuture(topVer2).get();
 
-        assertEquals(topVer2, ignite0.context().cache().internalCache(DEFAULT_CACHE_NAME).context().topology().topologyVersion());
+        assertEquals(topVer2, ignite0.context().cache().internalCache(DEFAULT_CACHE_NAME).context().topology().readyTopologyVersion());
 
         GridCacheAffinityManager aff = ignite0.context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
index 1f850b2..d03ae9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -446,9 +446,11 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
             for (IgniteInternalCache cache : kernal.context().cache().caches()) {
                 GridDhtPartitionTopology top = cache.context().topology();
 
+                waitForReadyTopology(top, topVer);
+
                 assertEquals("Unexpected topology version [node=" + ignite.name() + ", cache=" + cache.name() + ']',
                     topVer,
-                    top.topologyVersion());
+                    top.readyTopologyVersion());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6f06d95/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index c2cf41c..52cf25a 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -659,7 +659,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                         "igniteInstanceName=" + g.name() +
                                         ", cache=" + cfg.getName() +
                                         ", cacheId=" + dht.context().cacheId() +
-                                        ", topVer=" + top.topologyVersion() +
+                                        ", topVer=" + top.readyTopologyVersion() +
                                         ", p=" + p +
                                         ", affNodesCnt=" + affNodesCnt +
                                         ", ownersCnt=" + ownerNodesCnt +
@@ -676,7 +676,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                     "igniteInstanceName=" + g.name() +
                                     ", cache=" + cfg.getName() +
                                     ", cacheId=" + dht.context().cacheId() +
-                                    ", topVer=" + top.topologyVersion() +
+                                    ", topVer=" + top.readyTopologyVersion() +
                                     ", started=" + dht.context().started() +
                                     ", p=" + p +
                                     ", readVer=" + readyVer +
@@ -697,7 +697,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                         "igniteInstanceName=" + g.name() +
                                         ", cache=" + cfg.getName() +
                                         ", cacheId=" + dht.context().cacheId() +
-                                        ", topVer=" + top.topologyVersion() +
+                                        ", topVer=" + top.readyTopologyVersion() +
                                         ", p=" + p +
                                         ", readVer=" + readyVer +
                                         ", locNode=" + g.cluster().localNode() + ']');
@@ -737,7 +737,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                             "igniteInstanceName=" + g.name() +
                                             ", cache=" + cfg.getName() +
                                             ", cacheId=" + dht.context().cacheId() +
-                                            ", topVer=" + top.topologyVersion() +
+                                            ", topVer=" + top.readyTopologyVersion() +
                                             ", locNode=" + g.cluster().localNode() + ']');
                                     }
 
@@ -765,6 +765,20 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     }
 
     /**
+     * @param top Topology.
+     * @param topVer Version to wait for.
+     * @throws Exception If failed.
+     */
+    protected final void waitForReadyTopology(final GridDhtPartitionTopology top, final AffinityTopologyVersion topVer)
+        throws Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return topVer.compareTo(top.readyTopologyVersion()) <= 0;
+            }
+        }, 5000);
+    }
+
+    /**
      * @param c Cache proxy.
      */
     protected void printPartitionState(IgniteCache<?, ?> c) {


Mime
View raw message