ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [07/10] ignite git commit: ignite-6124 Merge exchanges for multiple discovery events
Date Mon, 21 Aug 2017 10:22:30 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2ee80a6..d991c86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -38,9 +38,6 @@ import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
-import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -49,6 +46,9 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d67d81d..08eb1bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchang
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -102,6 +103,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -126,9 +128,16 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.preloa
  */
 public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
     /** Exchange history size. */
-    private static final int EXCHANGE_HISTORY_SIZE =
+    private final int EXCHANGE_HISTORY_SIZE =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000);
 
+    /** */
+    private final long IGNITE_EXCHANGE_MERGE_DELAY =
+        IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, 0);
+
+    /** */
+    private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.1.4");
+
     /** Atomic reference for pending partition resend timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
 
@@ -176,7 +185,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * discovery event. In case if remote node will retry partition exchange, completed future will indicate
      * that full partition map should be sent to requesting node right away.
      */
-    private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
+    private ExchangeFutureSet exchFuts = new ExchangeFutureSet(EXCHANGE_HISTORY_SIZE);
 
     /** */
     private volatile IgniteCheckedException stopErr;
@@ -193,6 +202,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Events received while cluster state transition was in progress. */
     private final List<PendingDiscoveryEvent> pendingEvts = new ArrayList<>();
 
+    /** */
+    private final GridFutureAdapter<?> crdInitFut = new GridFutureAdapter();
+
+    /** For tests only. */
+    private volatile AffinityTopologyVersion exchMergeTestWaitVer;
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -288,7 +303,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
             new MessageHandler<GridDhtPartitionsSingleMessage>() {
-                @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+                @Override public void onMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+                    if (!crdInitFut.isDone() && !msg.restoreState()) {
+                        GridDhtPartitionExchangeId exchId = msg.exchangeId();
+
+                        log.info("Waiting for coordinator initialization [node=" + node.id() +
+                            ", nodeOrder=" + node.order() +
+                            ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+
+                        crdInitFut.listen(new CI1<IgniteInternalFuture>() {
+                            @Override public void apply(IgniteInternalFuture fut) {
+                                processSinglePartitionUpdate(node, msg);
+                            }
+                        });
+
+                        return;
+                    }
+
                     processSinglePartitionUpdate(node, msg);
                 }
             });
@@ -344,6 +375,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     *
+     */
+    public void onCoordinatorInitialized() {
+        crdInitFut.onDone();
+    }
+
+    /**
      * Callback for local join event (needed since regular event for local join is not generated).
      *
      * @param evt Event.
@@ -375,7 +413,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
 
-            exchFut = exchangeFuture(exchId, evt, cache,null, null);
+            exchFut = exchangeFuture(exchId, evt, cache, null, null);
         }
         else {
             DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
@@ -589,6 +627,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param ver Node version.
+     * @return Supported exchange protocol version.
+     */
+    public static int exchangeProtocolVersion(IgniteProductVersion ver) {
+        if (ver.compareToIgnoreTimestamp(EXCHANGE_PROTOCOL_2_SINCE) >= 0)
+            return 2;
+
+        return 1;
+    }
+
+    /**
      * @param idx Index.
      * @return Topic for index.
      */
@@ -598,6 +647,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
+        exchWorker.onKernalStop();
+
         cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
 
         cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleMessage.class);
@@ -620,6 +671,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         // Finish all exchange futures.
         ExchangeFutureSet exchFuts0 = exchFuts;
 
+        for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+            if (task instanceof GridDhtPartitionsExchangeFuture)
+                ((GridDhtPartitionsExchangeFuture)task).onDone(stopErr);
+        }
+
         if (exchFuts0 != null) {
             for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
                 f.onDone(stopErr);
@@ -659,10 +715,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * @param grpId Cache group ID.
-     * @param exchFut Exchange future.
      * @return Topology.
      */
-    public GridDhtPartitionTopology clientTopology(int grpId, GridDhtPartitionsExchangeFuture exchFut) {
+    public GridDhtPartitionTopology clientTopology(int grpId) {
         GridClientPartitionTopology top = clientTops.get(grpId);
 
         if (top != null)
@@ -670,7 +725,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         Object affKey = null;
 
-        CacheGroupDescriptor grpDesc = cctx.cache().cacheGroupDescriptors().get(grpId);
+        CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId);
 
         if (grpDesc != null) {
             CacheConfiguration<?, ?> ccfg = grpDesc.config();
@@ -684,7 +739,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
 
         GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
-            top = new GridClientPartitionTopology(cctx, grpId, exchFut, affKey));
+            top = new GridClientPartitionTopology(cctx, grpId, affKey));
 
         return old != null ? old : top;
     }
@@ -705,19 +760,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * Gets topology version of last partition exchange, it is possible that last partition exchange
-     * is not completed yet.
-     *
-     * @return Topology version.
-     */
-    public AffinityTopologyVersion topologyVersion() {
-        GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut;
-
-        return lastInitializedFut0 != null
-            ? lastInitializedFut0.exchangeId().topologyVersion() : AffinityTopologyVersion.NONE;
-    }
-
-    /**
      * @return Topology version of latest completed partition exchange.
      */
     public AffinityTopologyVersion readyAffinityVersion() {
@@ -727,7 +769,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @return Last initialized topology future.
      */
-    public GridDhtTopologyFuture lastTopologyFuture() {
+    public GridDhtPartitionsExchangeFuture lastTopologyFuture() {
         return lastInitializedFut;
     }
 
@@ -763,7 +805,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     @Nullable public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) {
         GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut;
 
-        if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) == 0) {
+        if (lastInitializedFut0 != null && lastInitializedFut0.initialVersion().compareTo(ver) == 0) {
             if (log.isDebugEnabled())
                 log.debug("Return lastInitializedFut for topology ready future " +
                     "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']');
@@ -909,7 +951,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             // No need to send to nodes which did not finish their first exchange.
             AffinityTopologyVersion rmtTopVer =
-                lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
+                lastFut != null ? (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) : AffinityTopologyVersion.NONE;
 
             Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer);
 
@@ -1076,8 +1118,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param id Exchange ID.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
-        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
-            id,
+        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
             cctx.kernalContext().clientNode(),
             false);
 
@@ -1098,14 +1139,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param targetNode Target node.
      * @param exchangeId Exchange ID.
      * @param clientOnlyExchange Client exchange flag.
      * @param sndCounters {@code True} if need send partition update counters.
      * @return Message.
      */
-    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode,
-        @Nullable GridDhtPartitionExchangeId exchangeId,
+    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId,
         boolean clientOnlyExchange,
         boolean sndCounters)
     {
@@ -1231,14 +1270,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param exchFut Exchange.
+     * @param topVer Exchange result topology version.
+     * @param initTopVer Exchange initial version.
      * @param err Error.
      */
-    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) {
-        AffinityTopologyVersion topVer = exchFut.topologyVersion();
+    public void onExchangeDone(AffinityTopologyVersion topVer, AffinityTopologyVersion initTopVer, @Nullable Throwable err) {
+        assert topVer != null || err != null;
+        assert initTopVer != null;
 
         if (log.isDebugEnabled())
-            log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+            log.debug("Exchange done [topVer=" + topVer + ", err=" + err + ']');
 
         if (err == null) {
             while (true) {
@@ -1263,10 +1304,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
         else {
             for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
-                if (entry.getKey().compareTo(topVer) <= 0) {
+                if (entry.getKey().compareTo(initTopVer) <= 0) {
                     if (log.isDebugEnabled())
                         log.debug("Completing created topology ready future with error " +
-                            "[ver=" + topVer + ", fut=" + entry.getValue() + ']');
+                            "[ver=" + entry.getKey() + ", fut=" + entry.getValue() + ']');
 
                     entry.getValue().onDone(err);
                 }
@@ -1279,7 +1320,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             int skipped = 0;
 
             for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) {
-                if (exchFut.exchangeId().topologyVersion().compareTo(fut.exchangeId().topologyVersion()) < 0)
+                if (initTopVer.compareTo(fut.exchangeId().topologyVersion()) < 0)
                     continue;
 
                 skipped++;
@@ -1357,7 +1398,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     cctx.database().releaseHistoryForPreloading();
             }
             else
-                exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
+                exchangeFuture(msg.exchangeId(), null, null, null, null).onReceiveFullMessage(node, msg);
         }
         finally {
             leaveBusy();
@@ -1397,7 +1438,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = grp.topology();
 
                     if (top != null) {
-                        updated |= top.update(null, entry.getValue());
+                        updated |= top.update(null, entry.getValue(), false);
 
                         cctx.affinity().checkRebalanceState(top, grpId);
                     }
@@ -1406,24 +1447,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (updated)
                     scheduleResendPartitions();
             }
-            else {
-                if (msg.client()) {
-                    final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
-                        null,
-                        null,
-                        null,
-                        null);
-
-                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                            // Finished future should reply only to sender client node.
-                            exchFut.onReceive(node, msg);
-                        }
-                    });
-                }
-                else
-                    exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
-            }
+            else
+                exchangeFuture(msg.exchangeId(), null, null, null, null).onReceiveSingleMessage(node, msg);
         }
         finally {
             leaveBusy();
@@ -1439,7 +1464,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return;
 
         try {
-            sendLocalPartitions(node, msg.exchangeId());
+            final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+                null,
+                null,
+                null,
+                null);
+
+            exchFut.onReceivePartitionRequest(node, msg);
         }
         finally {
             leaveBusy();
@@ -1451,7 +1482,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @throws Exception If failed.
      */
     public void dumpDebugInfo(@Nullable GridDhtPartitionsExchangeFuture exchFut) throws Exception {
-        AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.topologyVersion() : null;
+        AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.initialVersion() : null;
 
         U.warn(diagnosticLog, "Ready affinity version: " + readyTopVer.get());
 
@@ -1734,6 +1765,181 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * For testing only.
+     *
+     * @param exchMergeTestWaitVer Version to wait for.
+     */
+    public void mergeExchangesTestWaitVersion(AffinityTopologyVersion exchMergeTestWaitVer) {
+        this.exchMergeTestWaitVer = exchMergeTestWaitVer;
+    }
+
+    /**
+     * @param curFut Current exchange future.
+     * @param msg Message.
+     * @return {@code True} if node is stopping.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public boolean mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, GridDhtPartitionsFullMessage msg)
+        throws IgniteInterruptedCheckedException {
+        AffinityTopologyVersion resVer = msg.resultTopologyVersion();
+
+        if (exchWorker.waitForExchangeFuture(resVer))
+            return true;
+
+        for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+            if (task instanceof GridDhtPartitionsExchangeFuture) {
+                GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture) task;
+
+                if (fut.initialVersion().compareTo(resVer) > 0) {
+                    log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() +
+                        ", resVer=" + resVer +
+                        ", nextFutVer=" + fut.initialVersion() + ']');
+
+                    break;
+                }
+
+                log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
+                    ", mergedFut=" + fut.initialVersion() +
+                    ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+                    ", evtNode=" + fut.firstEvent().eventNode().id()+
+                    ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+
+                DiscoveryEvent evt = fut.firstEvent();
+
+                curFut.context().events().addEvent(fut.initialVersion(),
+                    fut.firstEvent(),
+                    fut.firstEventCache());
+
+                if (evt.type() == EVT_NODE_JOINED) {
+                    final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut);
+
+                    if (pendingMsg != null)
+                        curFut.waitAndReplyToNode(evt.eventNode().id(), pendingMsg);
+                }
+            }
+        }
+
+        ExchangeDiscoveryEvents evts = curFut.context().events();
+
+        assert evts.topologyVersion().equals(resVer) : "Invalid exchange merge result [ver=" + evts.topologyVersion()
+            + ", expVer=" + resVer + ']';
+
+        return false;
+    }
+
+    /**
+     * @param curFut Current active exchange future.
+     * @return {@code False} if need wait messages for merged exchanges.
+     */
+    public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFut) {
+        if (IGNITE_EXCHANGE_MERGE_DELAY > 0) {
+            try {
+                U.sleep(IGNITE_EXCHANGE_MERGE_DELAY);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                U.warn(log, "Failed to wait for exchange merge, thread interrupted: " + e);
+
+                return true;
+            }
+        }
+
+        AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;
+
+        if (exchMergeTestWaitVer != null) {
+            log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
+                ", waitVer=" + exchMergeTestWaitVer + ']');
+
+            long end = U.currentTimeMillis() + 10_000;
+
+            while (U.currentTimeMillis() < end) {
+                boolean found = false;
+
+                for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+                    if (task instanceof GridDhtPartitionsExchangeFuture) {
+                        GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
+
+                        if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
+                            log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);
+
+                            found = true;
+
+                            break;
+                        }
+                    }
+                }
+
+                if (found)
+                    break;
+                else {
+                    try {
+                        U.sleep(100);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        break;
+                    }
+                }
+            }
+
+            this.exchMergeTestWaitVer = null;
+        }
+
+        synchronized (curFut.mutex()) {
+            int awaited = 0;
+
+            for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+                if (task instanceof GridDhtPartitionsExchangeFuture) {
+                    GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
+
+                    DiscoveryEvent evt = fut.firstEvent();
+
+                    if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+                        log.info("Stop merge, custom event found: " + evt);
+
+                        break;
+                    }
+
+                    ClusterNode node = evt.eventNode();
+
+                    if (!curFut.context().supportsMergeExchanges(node)) {
+                        log.info("Stop merge, node does not support merge: " + node);
+
+                        break;
+                    }
+                    if (evt.type() == EVT_NODE_JOINED && cctx.cache().hasCachesReceivedFromJoin(node)) {
+                        log.info("Stop merge, received caches from node: " + node);
+
+                        break;
+                    }
+
+                    log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
+                        ", mergedFut=" + fut.initialVersion() +
+                        ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+                        ", evtNode=" + fut.firstEvent().eventNode().id() +
+                        ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+
+                    curFut.context().events().addEvent(fut.initialVersion(),
+                        fut.firstEvent(),
+                        fut.firstEventCache());
+
+                    if (evt.type() == EVT_NODE_JOINED) {
+                        if (fut.mergeJoinExchange(curFut))
+                            awaited++;
+                    }
+                }
+                else {
+                    if (!task.skipForExchangeMerge()) {
+                        log.info("Stop merge, custom task found: " + task);
+
+                        break;
+                    }
+                }
+            }
+
+            return awaited == 0;
+        }
+    }
+
+    /**
      * Exchange future thread. All exchanges happen only by one thread and next
      * exchange will not start until previous one completes.
      */
@@ -1742,12 +1948,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
             new LinkedBlockingDeque<>();
 
+        /** */
+        private AffinityTopologyVersion lastFutVer;
+
         /** Busy flag used as performance optimization to stop current preloading. */
         private volatile boolean busy;
 
         /** */
         private boolean crd;
 
+        /** */
+        private boolean stop;
+
         /**
          * Constructor.
          */
@@ -1783,10 +1995,67 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             futQ.offer(exchFut);
 
+            synchronized (this) {
+                lastFutVer = exchFut.initialVersion();
+
+                notifyAll();
+            }
+
             if (log.isDebugEnabled())
                 log.debug("Added exchange future to exchange worker: " + exchFut);
         }
 
+        /**
+         *
+         */
+        private void onKernalStop() {
+            synchronized (this) {
+                stop = true;
+
+                notifyAll();
+            }
+        }
+
+        /**
+         * @param resVer Version to wait for.
+         * @return {@code True} if node is stopping.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        private boolean waitForExchangeFuture(AffinityTopologyVersion resVer) throws IgniteInterruptedCheckedException {
+            synchronized (this) {
+                while (!stop && lastFutVer.compareTo(resVer) < 0)
+                    U.wait(this);
+
+                return stop;
+            }
+        }
+
+        /**
+         * @param resVer Exchange result version.
+         * @param exchFut Exchange future.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        private void removeMergedFutures(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture exchFut)
+            throws IgniteInterruptedCheckedException {
+            if (resVer.compareTo(exchFut.initialVersion()) != 0) {
+                waitForExchangeFuture(resVer);
+
+                for (CachePartitionExchangeWorkerTask task : futQ) {
+                    if (task instanceof GridDhtPartitionsExchangeFuture) {
+                        GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task;
+
+                        if (resVer.compareTo(fut0.initialVersion()) >= 0) {
+                            fut0.finishMerged();
+
+                            futQ.remove(fut0);
+                        }
+                        else
+                            break;
+                    }
+                }
+            }
+        }
+
         /** {@inheritDoc} */
         @Override public void cancel() {
             synchronized (interruptLock) {
@@ -1920,6 +2189,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     GridDhtPartitionsExchangeFuture exchFut = null;
 
+                    AffinityTopologyVersion resVer = null;
+
                     try {
                         if (isCancelled())
                             break;
@@ -1946,7 +2217,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             boolean newCrd = false;
 
                             if (!crd) {
-                                List<ClusterNode> srvNodes = exchFut.discoCache().serverNodes();
+                                List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes();
 
                                 crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
                             }
@@ -1961,13 +2232,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             while (true) {
                                 try {
-                                    exchFut.get(futTimeout, TimeUnit.MILLISECONDS);
+                                    resVer = exchFut.get(futTimeout, TimeUnit.MILLISECONDS);
 
                                     break;
                                 }
                                 catch (IgniteFutureTimeoutCheckedException ignored) {
                                     U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
-                                        "topVer=" + exchFut.topologyVersion() +
+                                        "topVer=" + exchFut.initialVersion() +
                                         ", node=" + cctx.localNodeId() + "]. " +
                                         "Dumping pending objects that might be the cause: ");
 
@@ -1990,6 +2261,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 }
                             }
 
+                            removeMergedFutures(resVer, exchFut);
 
                             if (log.isDebugEnabled())
                                 log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
@@ -2098,7 +2370,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                         if (assignsCancelled) { // Pending exchange.
                             U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                                "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+                                "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
                                 ", node=" + exchId.nodeId() + ']');
                         }
                         else if (r != null) {
@@ -2108,19 +2380,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             if (!hasPendingExchange()) {
                                 U.log(log, "Rebalancing started " +
-                                    "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+                                    "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
                                     ", node=" + exchId.nodeId() + ']');
 
                                 r.run(); // Starts rebalancing routine.
                             }
                             else
                                 U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                                    "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+                                    "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
                                     ", node=" + exchId.nodeId() + ']');
                         }
                         else
                             U.log(log, "Skipping rebalancing (nothing scheduled) " +
-                                "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+                                "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
                                 ", node=" + exchId.nodeId() + ']');
                     }
                 }
@@ -2205,10 +2477,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** */
+        private final int histSize;
+
         /**
          * Creates ordered, not strict list set.
+         *
+         * @param histSize Max history size.
          */
-        private ExchangeFutureSet() {
+        private ExchangeFutureSet(int histSize) {
             super(new Comparator<GridDhtPartitionsExchangeFuture>() {
                 @Override public int compare(
                     GridDhtPartitionsExchangeFuture f1,
@@ -2224,6 +2501,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     return t2.compareTo(t1);
                 }
             }, /*not strict*/false);
+
+            this.histSize = histSize;
         }
 
         /**
@@ -2234,7 +2513,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             GridDhtPartitionsExchangeFuture fut) {
             GridDhtPartitionsExchangeFuture cur = super.addx(fut);
 
-            while (size() > EXCHANGE_HISTORY_SIZE) {
+            while (size() > histSize) {
                 GridDhtPartitionsExchangeFuture last = last();
 
                 if (!last.isDone())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2e543c7..bf3ee0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1681,6 +1681,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param node Joined node.
+     * @return {@code True} if there are new caches received from joined node.
+     */
+    boolean hasCachesReceivedFromJoin(ClusterNode node) {
+        return cachesInfo.hasCachesReceivedFromJoin(node.id());
+    }
+
+    /**
      * Starts statically configured caches received from remote nodes during exchange.
      *
      * @param nodeId Joining node ID.
@@ -2016,17 +2024,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * Callback invoked when first exchange future for dynamic cache is completed.
      *
-     * @param topVer Completed topology version.
+     * @param cacheStartVer Started caches version to create proxy for.
      * @param exchActions Change requests.
      * @param err Error.
      */
     @SuppressWarnings("unchecked")
     public void onExchangeDone(
-        AffinityTopologyVersion topVer,
+        AffinityTopologyVersion cacheStartVer,
         @Nullable ExchangeActions exchActions,
         @Nullable Throwable err
     ) {
-        initCacheProxies(topVer, err);
+        initCacheProxies(cacheStartVer, err);
 
         if (exchActions == null)
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
index 3166159..317037b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -67,12 +67,22 @@ public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion initialVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exchangeDone() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "ClientCacheDhtTopologyFuture [topVer=" + topologyVersion() + ']';
+        return "ClientCacheDhtTopologyFuture [topVer=" + topVer + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 4b9826e..745e7d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -115,33 +117,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /**
      * @param cctx Context.
      * @param grpId Group ID.
-     * @param exchFut Exchange ID.
      * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
-        GridCacheSharedContext cctx,
+        GridCacheSharedContext<?, ?> cctx,
         int grpId,
-        GridDhtPartitionsExchangeFuture exchFut,
         Object similarAffKey
     ) {
         this.cctx = cctx;
         this.grpId = grpId;
         this.similarAffKey = similarAffKey;
 
-        topVer = exchFut.topologyVersion();
-
-        discoCache = exchFut.discoCache();
+        topVer = AffinityTopologyVersion.NONE;
 
         log = cctx.logger(getClass());
 
-        lock.writeLock().lock();
-
-        try {
-            beforeExchange0(cctx.localNode(), exchFut);
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
+        node2part = new GridDhtPartitionFullMap(cctx.localNode().id(),
+            cctx.localNode().order(),
+            updateSeq.get());
     }
 
     /**
@@ -194,9 +187,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         U.writeLock(lock);
 
         try {
-            AffinityTopologyVersion exchTopVer = exchFut.topologyVersion();
+            AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
 
-            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
+            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp=" + grpId +
+                ", topVer=" + topVer +
                 ", exchVer=" + exchTopVer + ']';
 
             this.stopping = stopping;
@@ -214,7 +208,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
+    @Override public AffinityTopologyVersion readyTopologyVersion() {
         lock.readLock().lock();
 
         try {
@@ -228,6 +222,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion lastTopologyChangeVersion() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtTopologyFuture topologyVersionFuture() {
         assert topReadyFut != null;
 
@@ -240,13 +239,17 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+        GridDhtPartitionsExchangeFuture exchFut) {
         // No-op.
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean initParts)
-        throws IgniteCheckedException {
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
+        boolean initParts,
+        boolean updateMoving)
+        throws IgniteCheckedException
+    {
         ClusterNode loc = cctx.localNode();
 
         U.writeLock(lock);
@@ -255,7 +258,19 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (stopping)
                 return;
 
+            discoCache = exchFut.events().discoveryCache();
+
             beforeExchange0(loc, exchFut);
+
+            if (updateMoving) {
+                ExchangeDiscoveryEvents evts = exchFut.context().events();
+
+                GridAffinityAssignmentCache aff = cctx.affinity().affinity(grpId);
+
+                assert aff.lastVersion().equals(evts.topologyVersion());
+
+                createMovingPartitions(aff.readyAffinity(evts.topologyVersion()));
+            }
         }
         finally {
             lock.writeLock().unlock();
@@ -263,17 +278,50 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /**
+     * @param aff Affinity.
+     */
+    private void createMovingPartitions(AffinityAssignment aff) {
+        for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+            GridDhtPartitionMap map = e.getValue();
+
+            addMoving(map, aff.backupPartitions(e.getKey()));
+            addMoving(map, aff.primaryPartitions(e.getKey()));
+        }
+    }
+
+    /**
+     * @param map Node partition state map.
+     * @param parts Partitions assigned to node.
+     */
+    private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) {
+        if (F.isEmpty(parts))
+            return;
+
+        for (Integer p : parts) {
+            GridDhtPartitionState state = map.get(p);
+
+            if (state == null || state == EVICTED)
+                map.put(p, MOVING);
+        }
+    }
+
+    /**
      * @param loc Local node.
      * @param exchFut Exchange future.
      */
     private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut) {
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-        assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
-            topVer + ", exchId=" + exchId + ']';
+        if (exchFut.context().events().hasServerLeft()) {
+            List<DiscoveryEvent> evts0 = exchFut.context().events().events();
 
-        if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
-            removeNode(exchId.nodeId());
+            for (int i = 0; i < evts0.size(); i++) {
+                DiscoveryEvent evt = evts0.get(i);
+
+                if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
+                    removeNode(evt.eventNode().id());
+            }
+        }
 
         // In case if node joins, get topology at the time of joining node.
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
@@ -492,7 +540,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             List<ClusterNode> nodes = new ArrayList<>(size);
 
             for (UUID id : nodeIds) {
-                if (topVer.topologyVersion() > 0 && !allIds.contains(id))
+                if (topVer.topologyVersion() > 0 && !F.contains(allIds, id))
                     continue;
 
                 if (hasState(p, id, state, states)) {
@@ -747,7 +795,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Override public boolean update(
         @Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts
+        GridDhtPartitionMap parts,
+        boolean force
     ) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -766,12 +815,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (stopping)
                 return false;
 
-            if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" +
-                        lastExchangeVer + ", exchId=" + exchId + ']');
+            if (!force) {
+                if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" +
+                            lastExchangeVer + ", exchId=" + exchId + ']');
 
-                return false;
+                    return false;
+                }
             }
 
             if (exchId != null)
@@ -783,7 +834,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
             GridDhtPartitionMap cur = node2part.get(parts.nodeId());
 
-            if (isStaleUpdate(cur, parts)) {
+            if (force) {
+                if (cur != null && cur.topologyVersion().initialized())
+                    parts.updateSequence(cur.updateSequence(), cur.topologyVersion());
+            }
+            else if (isStaleUpdate(cur, parts)) {
                 if (log.isDebugEnabled())
                     log.debug("Stale update for single partition map update (will ignore) [exchId=" + exchId +
                             ", curMap=" + cur + ", newMap=" + parts + ']');
@@ -845,19 +900,20 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
+    @Override public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment,
+        boolean updateRebalanceVer) {
         // No-op.
     }
 
     /** {@inheritDoc} */
-    @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
+    @Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) {
         assert false : "detectLostPartitions should never be called on client topology";
 
         return false;
     }
 
     /** {@inheritDoc} */
-    @Override public void resetLostPartitions() {
+    @Override public void resetLostPartitions(AffinityTopologyVersion affVer) {
         assert false : "resetLostPartitions should never be called on client topology";
     }
 
@@ -931,12 +987,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
-
         ClusterNode loc = cctx.localNode();
 
         if (node2part != null) {
-            if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) {
+            if (!node2part.nodeId().equals(loc.id())) {
                 updateSeq.setIfGreater(node2part.updateSequence());
 
                 node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 960b91a..1f67c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -433,7 +433,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             // While we are holding read lock, register lock future for partition release future.
             IgniteUuid lockId = IgniteUuid.fromUuid(ctx.localNodeId());
 
-            topVer = top.topologyVersion();
+            topVer = top.readyTopologyVersion();
 
             MultiUpdateFuture fut = new MultiUpdateFuture(topVer);
 
@@ -562,11 +562,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         }
 
         // Version for all loaded entries.
-        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion());
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
-        final boolean replicate = ctx.isDrEnabled();
+        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topVer);
 
-        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+        final boolean replicate = ctx.isDrEnabled();
 
         final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
 
@@ -587,13 +587,13 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             return;
         }
 
+        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+
         // Version for all loaded entries.
-        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion());
+        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topVer);
 
         final boolean replicate = ctx.isDrEnabled();
 
-        final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         ExpiryPolicy plc0 = opCtx != null ? opCtx.expiry() : null;
@@ -938,7 +938,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                             res.setContainsValue();
                     }
                     else {
-                        AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().topologyVersion();
+                        AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion();
 
                         assert topVer.compareTo(req.topologyVersion()) > 0 : "Wrong ready topology version for " +
                             "invalid partitions response [topVer=" + topVer + ", req=" + req + ']';
@@ -1028,7 +1028,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 }
 
                 if (!F.isEmpty(fut.invalidPartitions()))
-                    res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().topologyVersion());
+                    res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().initialVersion());
 
                 try {
                     ctx.io().send(nodeId, res, ctx.ioPolicy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index dbfb426..4d1bb38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -47,9 +45,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridIterator;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 8688c4f..0dea5e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -69,11 +69,14 @@ public interface GridDhtPartitionTopology {
     ) throws IgniteInterruptedCheckedException;
 
     /**
-     * Topology version.
-     *
-     * @return Topology version.
+     * @return Result topology version of last finished exchange.
      */
-    public AffinityTopologyVersion topologyVersion();
+    public AffinityTopologyVersion readyTopologyVersion();
+
+    /**
+     * @return Start topology version of last exchange.
+     */
+    public AffinityTopologyVersion lastTopologyChangeVersion();
 
     /**
      * Gets a future that will be completed when partition exchange map for this
@@ -98,16 +101,21 @@ public interface GridDhtPartitionTopology {
      *
      * @param exchFut Exchange future.
      * @param affReady Affinity ready flag.
+     * @param updateMoving
      * @throws IgniteCheckedException If failed.
      */
-    public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
+    public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
+        boolean affReady,
+        boolean updateMoving)
         throws IgniteCheckedException;
 
     /**
+     * @param affVer Affinity version.
      * @param exchFut Exchange future.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException;
+    public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut)
+        throws IgniteInterruptedCheckedException;
 
     /**
      * Post-initializes this topology.
@@ -245,7 +253,7 @@ public interface GridDhtPartitionTopology {
     public void onRemoved(GridDhtCacheEntry e);
 
     /**
-     * @param exchangeVer Topology version from exchange. Value should be greater than previously passed. Null value
+     * @param exchangeResVer Result topology version for exchange. Value should be greater than previously passed. Null value
      *      means full map received is not related to exchange
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
@@ -255,7 +263,7 @@ public interface GridDhtPartitionTopology {
      * @return {@code True} if local state was changed.
      */
     public boolean update(
-        @Nullable AffinityTopologyVersion exchangeVer,
+        @Nullable AffinityTopologyVersion exchangeResVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap,
         Set<Integer> partsToReload,
@@ -264,10 +272,12 @@ public interface GridDhtPartitionTopology {
     /**
      * @param exchId Exchange ID.
      * @param parts Partitions.
+     * @param force {@code True} to skip stale update check.
      * @return {@code True} if local state was changed.
      */
     public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts);
+        GridDhtPartitionMap parts,
+        boolean force);
 
     /**
      * @param cntrMap Counters map.
@@ -280,15 +290,18 @@ public interface GridDhtPartitionTopology {
      * <p>
      * This method should be called on topology coordinator after all partition messages are received.
      *
+     * @param resTopVer Exchange result version.
      * @param discoEvt Discovery event for which we detect lost partitions.
      * @return {@code True} if partitions state got updated.
      */
-    public boolean detectLostPartitions(DiscoveryEvent discoEvt);
+    public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt);
 
     /**
      * Resets the state of all LOST partitions to OWNING.
+     *
+     * @param resTopVer Exchange result version.
      */
-    public void resetLostPartitions();
+    public void resetLostPartitions(AffinityTopologyVersion resTopVer);
 
     /**
      * @return Collection of lost partitions, if any.
@@ -349,5 +362,5 @@ public interface GridDhtPartitionTopology {
      * @param assignment New affinity assignment.
      * @param updateRebalanceVer {@code True} if need check rebalance state.
      */
-    public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer);
+    public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer);
 }


Mime
View raw message