ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 5578
Date Mon, 24 Jul 2017 12:26:28 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 0e26a8fee -> b681ca663


5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b681ca66
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b681ca66
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b681ca66

Branch: refs/heads/ignite-5578
Commit: b681ca66378411ee5de428ed0fcd499134870758
Parents: 0e26a8f
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jul 24 15:26:20 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jul 24 15:26:20 2017 +0300

----------------------------------------------------------------------
 .../internal/IgniteDiagnosticMessage.java       |  2 +-
 .../cache/CacheAffinitySharedManager.java       | 48 +++++-----
 .../cache/ExchangeDiscoveryEvents.java          |  2 +-
 .../GridCachePartitionExchangeManager.java      | 58 ++++++++----
 .../dht/ClientCacheDhtTopologyFuture.java       | 11 ++-
 .../distributed/dht/GridDhtCacheAdapter.java    | 16 ++--
 .../dht/GridDhtPartitionTopologyImpl.java       |  4 +-
 .../distributed/dht/GridDhtTopologyFuture.java  |  3 +-
 .../GridNearAtomicSingleUpdateFuture.java       |  2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  2 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |  2 +-
 .../GridDhtPartitionsAbstractMessage.java       |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        | 98 +++++++++++---------
 .../dht/preloader/GridDhtPreloader.java         |  8 +-
 .../distributed/near/GridNearLockFuture.java    |  2 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java |  2 +-
 .../GridCacheDatabaseSharedManager.java         |  2 +-
 .../datastreamer/DataStreamerImpl.java          |  2 +-
 .../distributed/CacheExchangeMergeTest.java     |  5 +-
 .../CacheLateAffinityAssignmentTest.java        |  4 +-
 21 files changed, 161 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
index bd4ec3a..8739c0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -364,7 +364,7 @@ public class IgniteDiagnosticMessage implements Message {
             List<GridDhtPartitionsExchangeFuture> futs = ctx.cache().context().exchange().exchangeFutures();
 
             for (GridDhtPartitionsExchangeFuture fut : futs) {
-                if (topVer.equals(fut.topologyVersion())) {
+                if (topVer.equals(fut.initialVersion())) {
                     sb.append("Exchange future: ").append(fut);
 
                     return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d5b4be7..0db5d6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -686,12 +686,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     public void onCacheChangeRequest(
-        final GridDhtPartitionsExchangeFuture fut,
+        GridDhtPartitionsExchangeFuture fut,
         boolean crd,
         final ExchangeActions exchActions
     ) throws IgniteCheckedException {
         assert exchActions != null && !exchActions.empty() : exchActions;
 
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
         caches.updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
@@ -700,7 +702,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 if (exchActions.cacheGroupStopping(aff.groupId()))
                     return;
 
-                aff.clientEventTopologyChange(fut.discoveryEvent(), fut.topologyVersion());
+                aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion());
             }
         });
 
@@ -751,7 +753,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     cctx.cache().prepareCacheStart(req.startCacheConfiguration(),
                         cacheDesc,
                         nearCfg,
-                        fut.topologyVersion());
+                        evts.topologyVersion());
 
                     if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
                         if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
@@ -943,10 +945,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         assert msg.topologyVersion() != null && msg.exchangeId() == null : msg;
 
-        final AffinityTopologyVersion topVer = exchFut.topologyVersion();
+        final AffinityTopologyVersion topVer = exchFut.initialVersion();
 
         if (log.isDebugEnabled()) {
-            log.debug("Process affinity change message [exchVer=" + exchFut.topologyVersion() +
+            log.debug("Process affinity change message [exchVer=" + topVer +
                 ", msgVer=" + msg.topologyVersion() + ']');
         }
 
@@ -1158,13 +1160,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (crd) {
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc);
+                    CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
 
                     if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         List<List<ClusterNode>> assignment =
-                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                            cache.affinity().calculate(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
 
-                        cache.affinity().initialize(fut.topologyVersion(), assignment);
+                        cache.affinity().initialize(fut.initialVersion(), assignment);
                     }
                 }
             });
@@ -1191,22 +1193,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         assert desc != null : aff.cacheOrGroupName();
 
+        ExchangeDiscoveryEvents evts = fut.context().events();
+
         if (canCalculateAffinity(desc, aff, fut)) {
-            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+            List<List<ClusterNode>> assignment = aff.calculate(evts.topologyVersion(),
+                evts.lastEvent(),
+                evts.discoveryCache());
 
-            aff.initialize(fut.topologyVersion(), assignment);
+            aff.initialize(evts.topologyVersion(), assignment);
         }
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                 desc.groupId(),
-                fut.topologyVersion(),
-                fut.discoCache());
+                evts.topologyVersion(),
+                evts.discoveryCache());
 
             fetchFut.init(false);
 
-            fetchAffinity(fut.topologyVersion(),
-                fut.discoveryEvent(),
-                fut.discoCache(),
+            fetchAffinity(evts.topologyVersion(),
+                evts.lastEvent(),
+                evts.discoveryCache(),
                 aff, fetchFut);
         }
     }
@@ -1387,7 +1393,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (crd) {
                 forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                     @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                        AffinityTopologyVersion topVer = fut.topologyVersion();
+                        AffinityTopologyVersion topVer = fut.initialVersion();
 
                         CacheGroupHolder cache = groupHolder(topVer, desc);
 
@@ -1405,7 +1411,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         else
             waitRebalanceInfo = initAffinityOnNodeJoin(fut.context().events(), crd);
 
-        setWaitRebalanceInfo(waitRebalanceInfo, fut.topologyVersion(), crd);
+        setWaitRebalanceInfo(waitRebalanceInfo, fut.initialVersion(), crd);
     }
 
     private void setWaitRebalanceInfo(WaitRebalanceInfo waitRebalanceInfo, AffinityTopologyVersion topVer, boolean crd) {
@@ -1458,7 +1464,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
-        AffinityTopologyVersion topVer = fut.topologyVersion();
+        AffinityTopologyVersion topVer = fut.initialVersion();
 
         List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
 
@@ -1467,11 +1473,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 continue;
 
             if (fut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) {
-                List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(),
+                List<List<ClusterNode>> assignment = grp.affinity().calculate(topVer,
                     fut.discoveryEvent(),
                     fut.discoCache());
 
-                grp.affinity().initialize(fut.topologyVersion(), assignment);
+                grp.affinity().initialize(topVer, assignment);
             }
             else {
                 if (fut.context().fetchAffinityOnJoin()) {
@@ -1507,7 +1513,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             Integer grpId = fetchFut.groupId();
 
-            fetchAffinity(fut.topologyVersion(),
+            fetchAffinity(topVer,
                 fut.discoveryEvent(),
                 fut.discoCache(),
                 cctx.cache().cacheGroup(grpId).affinity(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index db9173f..e8a278f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -65,7 +65,7 @@ public class ExchangeDiscoveryEvents {
      * @param fut Future.
      */
     ExchangeDiscoveryEvents(GridDhtPartitionsExchangeFuture fut) {
-        addEvent(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+        addEvent(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
     }
 
     boolean groupAddedOnExchange(int grpId, UUID rcvdFrom) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d26ca0a..02cd758 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -199,6 +199,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Events received while cluster state transition was in progress. */
     private final List<PendingDiscoveryEvent> pendingEvts = new ArrayList<>();
 
+    /** */
+    private final GridFutureAdapter<?> crdInitFut = new GridFutureAdapter();
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -294,7 +297,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
             new MessageHandler<GridDhtPartitionsSingleMessage>() {
-                @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+                @Override public void onMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+                    if (!crdInitFut.isDone() && !msg.restoreState()) {
+                        GridDhtPartitionExchangeId exchId = msg.exchangeId();
+
+                        log.info("Waiting for coordinator initialization [node=" + node.id() +
+                            ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+
+                        crdInitFut.listen(new CI1<IgniteInternalFuture>() {
+                            @Override public void apply(IgniteInternalFuture fut) {
+                                processSinglePartitionUpdate(node, msg);
+                            }
+                        });
+
+                        return;
+                    }
+
                     processSinglePartitionUpdate(node, msg);
                 }
             });
@@ -349,6 +367,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
     }
 
+    public void coordinatorInitialized() {
+        crdInitFut.onDone();
+    }
+
     /**
      * Callback for local join event (needed since regular event for local join is not generated).
      *
@@ -780,7 +802,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     @Nullable public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) {
         GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut;
 
-        if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) == 0) {
+        if (lastInitializedFut0 != null && lastInitializedFut0.initialVersion().compareTo(ver) == 0) {
             if (log.isDebugEnabled())
                 log.debug("Return lastInitializedFut for topology ready future " +
                     "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']');
@@ -924,7 +946,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             // No need to send to nodes which did not finish their first exchange.
             AffinityTopologyVersion rmtTopVer =
-                lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
+                lastFut != null ? lastFut.initialVersion() : AffinityTopologyVersion.NONE;
 
             Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer);
 
@@ -1443,7 +1465,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @throws Exception If failed.
      */
     public void dumpDebugInfo(@Nullable GridDhtPartitionsExchangeFuture exchFut) throws Exception {
-        AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.topologyVersion() : null;
+        AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.initialVersion() : null;
 
         U.warn(diagnosticLog, "Ready affinity version: " + readyTopVer.get());
 
@@ -1749,18 +1771,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             if (task instanceof GridDhtPartitionsExchangeFuture) {
                 GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture) task;
 
-                if (fut.topologyVersion().compareTo(resVer) > 0) {
-                    log.info("Merge exchange future on finish stop [curFut=" + curFut.topologyVersion() +
+                if (fut.initialVersion().compareTo(resVer) > 0) {
+                    log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() +
                         ", resVer=" + resVer +
-                        ", nextFutVer=" + fut.topologyVersion() + ']');
+                        ", nextFutVer=" + fut.initialVersion() + ']');
 
                     break;
                 }
 
-                log.info("Merge exchange future on finish [curFut=" + curFut.topologyVersion() +
-                    ", mergedFut=" + fut.topologyVersion() + ']');
+                log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
+                    ", mergedFut=" + fut.initialVersion() + ']');
 
-                curFut.context().events().addEvent(fut.topologyVersion(),
+                curFut.context().events().addEvent(fut.initialVersion(),
                     fut.discoveryEvent(),
                     fut.discoCache());
 
@@ -1782,7 +1804,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;
 
         if (exchMergeTestWaitVer != null) {
-            log.info("Exchange merge test, waiting for version [exch=" + curFut.topologyVersion() +
+            log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
                 ", waitVer=" + exchMergeTestWaitVer + ']');
 
             long end = U.currentTimeMillis() + 10_000;
@@ -1794,7 +1816,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (task instanceof GridDhtPartitionsExchangeFuture) {
                         GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
 
-                        if (exchMergeTestWaitVer.equals(fut.topologyVersion())) {
+                        if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
                             log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);
 
                             found = true;
@@ -1842,10 +1864,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         break;
                     }
 
-                    log.info("Merge exchange future [curFut=" + curFut.topologyVersion() +
-                        ", mergedFut=" + fut.topologyVersion() + ']');
+                    log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
+                        ", mergedFut=" + fut.initialVersion() + ']');
 
-                    curFut.context().events().addEvent(fut.topologyVersion(),
+                    curFut.context().events().addEvent(fut.initialVersion(),
                         fut.discoveryEvent(),
                         fut.discoCache());
 
@@ -1923,7 +1945,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             futQ.offer(exchFut);
 
             synchronized (this) {
-                lastFutVer = exchFut.topologyVersion();
+                lastFutVer = exchFut.initialVersion();
 
                 notifyAll();
             }
@@ -1948,7 +1970,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (task instanceof GridDhtPartitionsExchangeFuture) {
                         GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task;
 
-                        if (resVer.compareTo(fut0.topologyVersion()) >= 0)
+                        if (resVer.compareTo(fut0.initialVersion()) >= 0)
                             futQ.remove(fut0);
                         else
                             break;
@@ -2139,7 +2161,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 }
                                 catch (IgniteFutureTimeoutCheckedException ignored) {
                                     U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
-                                        "topVer=" + exchFut.topologyVersion() +
+                                        "topVer=" + exchFut.initialVersion() +
                                         ", node=" + cctx.localNodeId() + "]. " +
                                         "Dumping pending objects that might be the cause: ");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
index d0b2c72..96fa04a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -56,10 +56,6 @@ public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter
         onDone(e);
     }
 
-    @Override public AffinityTopologyVersion resultTopologyVersion() {
-        return topologyVersion();
-    }
-
     /**
      * @param grp Cache group.
      * @param topNodes Topology nodes.
@@ -71,12 +67,17 @@ public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion initialVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "ClientCacheDhtTopologyFuture [topVer=" + topologyVersion() + ']';
+        return "ClientCacheDhtTopologyFuture [topVer=" + topVer + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 960b91a..171871b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -562,11 +562,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         }
 
         // Version for all loaded entries.
-        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion());
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
-        final boolean replicate = ctx.isDrEnabled();
+        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topVer);
 
-        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+        final boolean replicate = ctx.isDrEnabled();
 
         final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
 
@@ -587,13 +587,13 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             return;
         }
 
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+
         // Version for all loaded entries.
-        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion());
+        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topVer);
 
         final boolean replicate = ctx.isDrEnabled();
 
-        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         ExpiryPolicy plc0 = opCtx != null ? opCtx.expiry() : null;
@@ -938,7 +938,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                             res.setContainsValue();
                     }
                     else {
-                        AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().topologyVersion();
+                        AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion();
 
                         assert topVer.compareTo(req.topologyVersion()) > 0 : "Wrong ready topology version for " +
                             "invalid partitions response [topVer=" + topVer + ", req=" + req + ']';
@@ -1028,7 +1028,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 }
 
                 if (!F.isEmpty(fut.invalidPartitions()))
-                    res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().topologyVersion());
+                    res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().initialVersion());
 
                 try {
                     ctx.io().send(nodeId, res, ctx.ioPolicy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 52df4a6..a3a488a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -230,7 +230,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         U.writeLock(lock);
 
         try {
-            AffinityTopologyVersion exchTopVer = exchFut.topologyVersion();
+            AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
 
             assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
                 ", exchTopVer=" + exchTopVer +
@@ -437,7 +437,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                     ExchangeDiscoveryEvents evts = exchFut.context().events();
 
-                    assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" + topVer +
+                    assert topVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + topVer +
                         ", exchId=" + exchFut.exchangeId() + ']';
 
                     topVer = evts.topologyVersion();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
index 4b5e189..965590f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -39,7 +40,7 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo
      */
     public AffinityTopologyVersion topologyVersion();
 
-    public AffinityTopologyVersion resultTopologyVersion();
+    public AffinityTopologyVersion initialVersion();
 
     /**
      * Returns error is cache topology is not valid.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index db98052..f5d89c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -420,7 +420,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 return;
             }
 
-            topVer = fut.resultTopologyVersion();
+            topVer = fut.topologyVersion();
         }
         else {
             assert !topLocked : this;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index f715486..138645d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -640,7 +640,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 return;
             }
 
-            topVer = fut.resultTopologyVersion();
+            topVer = fut.topologyVersion();
         }
         else {
             assert !topLocked : this;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 40eb371..af4af5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -690,7 +690,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
 
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) {
-                if (fut.topologyVersion().equals(topVer)) {
+                if (fut.isDone() && fut.topologyVersion().equals(topVer)) {
                     Throwable err = fut.validateCache(cctx, recovery, read, null, keys);
 
                     if (err != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 3ead982..e25ace7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -134,7 +134,7 @@ class GridDhtPartitionSupplier {
      * @param topVer Topology version.
      */
     @SuppressWarnings("ConstantConditions")
-    public void onTopologyChanged(AffinityTopologyVersion topVer) {
+    void onTopologyChanged(AffinityTopologyVersion topVer) {
         synchronized (scMap) {
             Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 9adbf0b..77a5d9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -138,7 +138,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
         flags = restoreState ? (byte)(flags | RESTORE_STATE_FLAG_MASK) : (byte)(flags & ~RESTORE_STATE_FLAG_MASK);
     }
 
-    boolean restoreState() {
+    public boolean restoreState() {
         return (flags & RESTORE_STATE_FLAG_MASK) != 0;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3674276..8f9b5d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -246,6 +246,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private InitNewCoordinatorFuture newCrdFut;
 
+    /** */
+    @GridToStringExclude
+    private GridDhtPartitionsExchangeFuture mergedWith;
+
+    /** */
+    @GridToStringExclude
+    private GridDhtPartitionsSingleMessage pendingJoinMsg;
+
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -315,16 +324,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         this.affChangeMsg = affChangeMsg;
     }
 
-    private AffinityTopologyVersion initialVersion() {
+    public AffinityTopologyVersion initialVersion() {
         return exchId.topologyVersion();
     }
 
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
-        return exchId.topologyVersion();
-    }
+        assert isDone();
 
-    @Override public AffinityTopologyVersion resultTopologyVersion() {
         return exchCtx.events().topologyVersion();
     }
 
@@ -339,6 +346,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @return Discovery cache.
+     *
+     * TODO 5578 review usages, rename initialDiscoveryEvent
      */
     public DiscoCache discoCache() {
         return discoCache;
@@ -426,6 +435,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @return Discovery event.
+     *
+     * TODO 5578 review usages, rename initialDiscoveryEvent
      */
     public DiscoveryEvent discoveryEvent() {
         return discoEvt;
@@ -479,7 +490,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         exchCtx = new ExchangeContext(this);
 
         try {
-            AffinityTopologyVersion topVer = topologyVersion();
+            AffinityTopologyVersion topVer = initialVersion();
 
             srvNodes = new ArrayList<>(discoCache.serverNodes());
 
@@ -574,6 +585,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 if (fut != null)
                     fut.get();
+
+                cctx.exchange().coordinatorInitialized();
             }
 
             updateTopologies(crdNode);
@@ -661,7 +674,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             cctx.database().readCheckpointAndRestoreMemory(startDescs);
         }
 
-        cctx.cache().startCachesOnLocalJoin(caches, topologyVersion());
+        cctx.cache().startCachesOnLocalJoin(caches, initialVersion());
     }
 
     /**
@@ -704,7 +717,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion());
 
                 if (updateTop && clientTop != null) {
-                    top.update(topologyVersion(),
+                    top.update(initialVersion(),
                         clientTop.partitionMap(true),
                         clientTop.updateCounters(false),
                         Collections.<Integer>emptySet());
@@ -737,7 +750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (log.isInfoEnabled()) {
                 log.info("Start activation process [nodeId=" + cctx.localNodeId() +
                     ", client=" + cctx.kernalContext().clientNode() +
-                    ", topVer=" + topologyVersion() + "]");
+                    ", topVer=" + initialVersion() + "]");
             }
 
             try {
@@ -757,13 +770,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (log.isInfoEnabled()) {
                     log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() +
                         ", client=" + cctx.kernalContext().clientNode() +
-                        ", topVer=" + topologyVersion() + "]");
+                        ", topVer=" + initialVersion() + "]");
                 }
             }
             catch (Exception e) {
                 U.error(log, "Failed to activate node components [nodeId=" + cctx.localNodeId() +
                     ", client=" + cctx.kernalContext().clientNode() +
-                    ", topVer=" + topologyVersion() + "]", e);
+                    ", topVer=" + initialVersion() + "]", e);
 
                 changeGlobalStateE = e;
 
@@ -778,7 +791,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (log.isInfoEnabled()) {
                 log.info("Start deactivation process [nodeId=" + cctx.localNodeId() +
                     ", client=" + cctx.kernalContext().clientNode() +
-                    ", topVer=" + topologyVersion() + "]");
+                    ", topVer=" + initialVersion() + "]");
             }
 
             try {
@@ -792,13 +805,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     log.info("Successfully deactivated data structures, services and caches [" +
                         "nodeId=" + cctx.localNodeId() +
                         ", client=" + cctx.kernalContext().clientNode() +
-                        ", topVer=" + topologyVersion() + "]");
+                        ", topVer=" + initialVersion() + "]");
                 }
             }
             catch (Exception e) {
                 U.error(log, "Failed to deactivate node components [nodeId=" + cctx.localNodeId() +
                     ", client=" + cctx.kernalContext().clientNode() +
-                    ", topVer=" + topologyVersion() + "]", e);
+                    ", topVer=" + initialVersion() + "]", e);
 
                 changeGlobalStateE = e;
             }
@@ -899,12 +912,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     GridAffinityAssignmentCache aff = grp.affinity();
 
-                    aff.initialize(topologyVersion(), aff.idealAssignment());
+                    aff.initialize(initialVersion(), aff.idealAssignment());
                 }
             }
         }
 
-        onDone(topologyVersion());
+        onDone(initialVersion());
     }
 
     /**
@@ -1010,7 +1023,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @throws IgniteCheckedException If failed.
      */
     private void waitPartitionRelease() throws IgniteCheckedException {
-        IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(topologyVersion());
+        IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
 
         // Assign to class variable so it will be included into toString() method.
         this.partReleaseFut = partReleaseFut;
@@ -1174,7 +1187,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     private void dumpPendingObjects() {
         U.warn(cctx.kernalContext().cluster().diagnosticLog(),
-            "Failed to wait for partition release future [topVer=" + topologyVersion() +
+            "Failed to wait for partition release future [topVer=" + initialVersion() +
             ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: ");
 
         try {
@@ -1379,7 +1392,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (!done.compareAndSet(false, true))
             return false;
 
-        log.info("Finish exchange future [startVer=" + topologyVersion() + ", resVer=" + res + ", err=" + err + ']');
+        log.info("Finish exchange future [startVer=" + initialVersion() +
+            ", resVer=" + res +
+            ", err=" + err + ']');
 
         assert res != null || err != null;
 
@@ -1488,7 +1503,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (discoEvt instanceof DiscoveryCustomEvent)
                 ((DiscoveryCustomEvent)discoEvt).customMessage(null);
 
-            cctx.exchange().lastFinishedFuture(this);
+            if (err == null)
+                cctx.exchange().lastFinishedFuture(this);
 
             return true;
         }
@@ -1528,12 +1544,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
     }
 
-    /** */
-    private GridDhtPartitionsExchangeFuture mergedWith;
-
-    /** */
-    private GridDhtPartitionsSingleMessage pendingJoinMsg;
-
     private boolean addMergedJoinExchange(ClusterNode node, @Nullable GridDhtPartitionsSingleMessage msg) {
         assert Thread.holdsLock(this);
         assert node != null;
@@ -1552,14 +1562,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 mergedJoinExchMsgs = new LinkedHashMap<>();
 
             if (msg != null) {
-                log.info("Merge server join exchange, message received [curFut=" + topologyVersion() +
+                log.info("Merge server join exchange, message received [curFut=" + initialVersion() +
                     ", node=" + nodeId + ']');
 
                 mergedJoinExchMsgs.put(nodeId, msg);
             }
             else {
                 if (cctx.discovery().alive(nodeId)) {
-                    log.info("Merge server join exchange, wait for message [curFut=" + topologyVersion() +
+                    log.info("Merge server join exchange, wait for message [curFut=" + initialVersion() +
                         ", node=" + nodeId + ']');
 
                     wait = true;
@@ -1569,7 +1579,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     awaitMergedMsgs++;
                 }
                 else {
-                    log.info("Merge server join exchange, awaited node left [curFut=" + topologyVersion() +
+                    log.info("Merge server join exchange, awaited node left [curFut=" + initialVersion() +
                         ", node=" + nodeId + ']');
                 }
             }
@@ -1621,7 +1631,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 mergedJoinExchMsgs.containsKey(node.id()) &&
                 mergedJoinExchMsgs.get(node.id()) == null;
 
-            log.info("Merge server join exchange, received message [curFut=" + topologyVersion() +
+            log.info("Merge server join exchange, received message [curFut=" + initialVersion() +
                 ", node=" + node.id() +
                 ", msgVer=" + msg.exchangeId().topologyVersion() +
                 ", process=" + process +
@@ -2104,11 +2114,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     private void finishExchangeOnCoordinator() {
         try {
-            log.info("finishExchangeOnCoordinator [topVer=" + topologyVersion() +
-                ", resVer=" + exchCtx.events().topologyVersion() + ']');
-
             AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion();
 
+            log.info("finishExchangeOnCoordinator [topVer=" + initialVersion() +
+                ", resVer=" + resTopVer + ']');
+
             Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null;
 
             if (exchCtx.mergeExchanges()) {
@@ -2586,7 +2596,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             if (exchCtx.mergeExchanges()) {
                 if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) {
-                    log.info("Received full message, need merge [curFut=" + topologyVersion() +
+                    log.info("Received full message, need merge [curFut=" + initialVersion() +
                         ", resVer=" + msg.resultTopologyVersion() + ']');
 
                     cctx.exchange().mergeExchanges(this, msg.resultTopologyVersion());
@@ -2715,7 +2725,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             updatePartitionFullMap(partsMsg);
                         }
 
-                        onDone(topologyVersion());
+                        onDone(initialVersion());
                     }
                     else {
                         if (log.isDebugEnabled()) {
@@ -2785,7 +2795,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (isDone() || !enterBusy())
             return;
 
-        cctx.mvcc().removeExplicitNodeLocks(node.id(), topologyVersion());
+        cctx.mvcc().removeExplicitNodeLocks(node.id(), initialVersion());
 
         try {
             onDiscoveryEvent(new IgniteRunnable() {
@@ -2871,10 +2881,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                                 for (int i = 0; i < grp.affinity().partitions(); i++)
                                     affAssignment.add(empty);
 
-                                grp.affinity().initialize(topologyVersion(), affAssignment);
+                                grp.affinity().initialize(initialVersion(), affAssignment);
                             }
 
-                            onDone(topologyVersion());
+                            onDone(initialVersion());
 
                             return;
                         }
@@ -2955,6 +2965,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private void onBecomeCoordinator(InitNewCoordinatorFuture newCrdFut) {
         boolean allRcvd = false;
 
+        cctx.exchange().coordinatorInitialized();
+
         if (newCrdFut.restoreState()) {
             GridDhtPartitionsFullMessage fullMsg = newCrdFut.fullMessage();
 
@@ -3143,14 +3155,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             if (crd != null) {
                 if (!crd.isLocal()) {
-                    diagCtx.exchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting for coordinator " +
-                        "response [crd=" + crd.id() + ", topVer=" + topologyVersion() + ']');
+                    diagCtx.exchangeInfo(crd.id(), initialVersion(), "Exchange future waiting for coordinator " +
+                        "response [crd=" + crd.id() + ", topVer=" + initialVersion() + ']');
                 }
                 else if (!remaining.isEmpty()){
                     UUID nodeId = remaining.iterator().next();
 
-                    diagCtx.exchangeInfo(nodeId, topologyVersion(), "Exchange future on coordinator waiting for " +
-                        "server response [node=" + nodeId + ", topVer=" + topologyVersion() + ']');
+                    diagCtx.exchangeInfo(nodeId, initialVersion(), "Exchange future on coordinator waiting for " +
+                        "server response [node=" + nodeId + ", topVer=" + initialVersion() + ']');
                 }
             }
         }
@@ -3160,8 +3172,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return Short information string.
      */
     public String shortInfo() {
-        return "GridDhtPartitionsExchangeFuture [topVer=" + topologyVersion() +
-            ", evt=" + (discoEvt != null ? discoEvt.type() : -1) +
+        return "GridDhtPartitionsExchangeFuture [topVer=" + initialVersion() +
+            ", evt=" + (discoEvt != null ? IgniteUtils.gridEventName(discoEvt.type()) : -1) +
             ", evtNode=" + (discoEvt != null ? discoEvt.eventNode() : null) +
             ", done=" + isDone() + ']';
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 37de068..95abd16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -173,13 +173,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
-        supplier.onTopologyChanged(lastFut.topologyVersion());
+        supplier.onTopologyChanged(lastFut.initialVersion());
 
         demander.onTopologyChanged(lastFut);
     }
 
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) {
+        assert exchFut == null || exchFut.isDone();
+
         // No assignments for disabled preloader.
         GridDhtPartitionTopology top = grp.topology();
 
@@ -190,8 +192,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.topologyVersion()) :
             "Topology version mismatch [exchId=" + exchId +
-                ", grp=" + grp.name() +
-                ", topVer=" + top.topologyVersion() + ']';
+            ", grp=" + grp.name() +
+            ", topVer=" + top.topologyVersion() + ']';
 
         GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index db030b0..c6d7450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -787,7 +787,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
 
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) {
-                if (fut.topologyVersion().equals(topVer)){
+                if (fut.isDone() && fut.topologyVersion().equals(topVer)){
                     Throwable err = fut.validateCache(cctx, recovery, read, null, keys);
 
                     if (err != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 6428df7..f09b6c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -107,7 +107,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
             }
 
             if (topFut.isDone()) {
-                topVer = topFut.resultTopologyVersion();
+                topVer = topFut.topologyVersion();
 
                 if (remap)
                     tx.onRemap(topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 8cee714..3257839 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -737,7 +737,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         if (cctx.kernalContext().query().moduleEnabled()) {
             for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
-                if (cacheCtx.startTopologyVersion().equals(fut.topologyVersion()) &&
+                if (cacheCtx.startTopologyVersion().equals(fut.initialVersion()) &&
                     !cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) {
                     final int cacheId = cacheCtx.cacheId();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index df51fac..147e6f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1538,7 +1538,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     if (!allowOverwrite && !topVer.equals(reqTopVer)) {
                         curFut.onDone(new IgniteCheckedException(
                             "DataStreamer will retry data transfer at stable topology. " +
-                                "[reqTop=" + reqTopVer + " ,topVer=" + topVer + ", node=local]"));
+                                "[reqTop=" + reqTopVer + ", topVer=" + topVer + ", node=local]"));
                     }
                     else if (loc || allowOverwrite || fut.isDone()) {
                         IgniteInternalFuture<Object> callFut = ctx.closure().callLocalSafe(

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 3d10b31..8b51e47 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -365,7 +366,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
         if (latch != null && !latch.await(5, TimeUnit.SECONDS))
             fail("Failed to wait for expected messages.");
 
-        stopGrid(0);
+        stopGrid(getTestIgniteInstanceName(0), true, false);
 
         fut.get();
 
@@ -654,7 +655,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         boolean wait = GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
-                return exch.lastTopologyFuture().topologyVersion().topologyVersion() >= topVer;
+                return exch.lastTopologyFuture().initialVersion().topologyVersion() >= topVer;
             }
         }, 5000);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b681ca66/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index a1a01e1..2350ed0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -1246,7 +1246,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
 
         for (GridDhtPartitionsExchangeFuture f : exFutures) {
             //Shouldn't contains staled futures.
-            assertTrue(f.topologyVersion().topologyVersion() >= nodeJoinTopVer);
+            assertTrue(f.initialVersion().topologyVersion() >= nodeJoinTopVer);
         }
     }
 
@@ -2511,7 +2511,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             for (int i = futs.size() - 1; i >= 0; i--) {
                 GridDhtPartitionsExchangeFuture fut = futs.get(i);
 
-                if (fut.topologyVersion().equals(topVer0)) {
+                if (fut.initialVersion().equals(topVer0)) {
                     evt = fut.discoveryEvent();
 
                     break;


Mime
View raw message