ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [04/10] ignite git commit: ignite-6124 Merge exchanges for multiple discovery events
Date Mon, 21 Aug 2017 10:22:27 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 609021b..ca6ee5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -23,25 +23,26 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
@@ -51,16 +52,20 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
+import org.apache.ignite.internal.processors.cache.ExchangeContext;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -75,7 +80,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -84,7 +89,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -118,10 +122,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /** */
     @GridToStringExclude
-    private volatile DiscoCache discoCache;
+    private final Object mux = new Object();
 
-    /** Discovery event. */
-    private volatile DiscoveryEvent discoEvt;
+    /** */
+    @GridToStringExclude
+    private volatile DiscoCache firstEvtDiscoCache;
+
+    /** Discovery event triggered this exchange. */
+    private volatile DiscoveryEvent firstDiscoEvt;
 
     /** */
     @GridToStringExclude
@@ -168,10 +176,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
 
     /**
+     * Message received from node joining cluster (if this is 'node join' exchange),
+     * needed if this exchange is merged with another one.
+     */
+    @GridToStringExclude
+    private GridDhtPartitionsSingleMessage pendingJoinMsg;
+
+    /**
      * Messages received on non-coordinator are stored in case if this node
      * becomes coordinator.
      */
-    private final Map<ClusterNode, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>();
+    private final Map<UUID, GridDhtPartitionsSingleMessage> pendingSingleMsgs = new ConcurrentHashMap8<>();
 
     /** Messages received from new coordinator. */
     private final Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>();
@@ -211,6 +226,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>();
 
+    /** Single messages from merged 'node join' exchanges. */
+    @GridToStringExclude
+    private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
+
+    /** Number of awaited messages for merged 'node join' exchanges. */
+    @GridToStringExclude
+    private int awaitMergedMsgs;
+
     /** */
     @GridToStringExclude
     private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
@@ -225,6 +248,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private final AtomicBoolean done = new AtomicBoolean();
 
+    /** */
+    private ExchangeLocalState state;
+
+    /** */
+    @GridToStringExclude
+    private ExchangeContext exchCtx;
+
+    /** */
+    @GridToStringExclude
+    private FinishState finishState;
+
+    /** Initialized when node becomes new coordinator. */
+    @GridToStringExclude
+    private InitNewCoordinatorFuture newCrdFut;
+
+    /** */
+    @GridToStringExclude
+    private GridDhtPartitionsExchangeFuture mergedWith;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -253,13 +295,45 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         log = cctx.logger(getClass());
         exchLog = cctx.logger(EXCHANGE_LOG);
 
-        initFut = new GridFutureAdapter<>();
+        initFut = new GridFutureAdapter<Boolean>() {
+            @Override public IgniteLogger logger() {
+                return log;
+            }
+        };
 
         if (log.isDebugEnabled())
             log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
     }
 
     /**
+     * @return Future mutex.
+     */
+    public Object mutex() {
+        return mux;
+    }
+
+    /**
+     * @return Shared cache context.
+     */
+    public GridCacheSharedContext sharedContext() {
+        return cctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipForExchangeMerge() {
+        return false;
+    }
+
+    /**
+     * @return Exchange context.
+     */
+    public ExchangeContext context() {
+        assert exchCtx != null : this;
+
+        return exchCtx;
+    }
+
+    /**
      * @param exchActions Exchange actions.
      */
     public void exchangeActions(ExchangeActions exchActions) {
@@ -276,9 +350,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         this.affChangeMsg = affChangeMsg;
     }
 
+    /**
+     * @return Initial exchange version.
+     */
+    public AffinityTopologyVersion initialVersion() {
+        return exchId.topologyVersion();
+    }
+
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
-        return exchId.topologyVersion();
+        /*
+        Should not be called before exchange is finished since result version can change in
+        case of merged exchanges.
+         */
+        assert exchangeDone() : "Should not be called before exchange is finished";
+
+        return exchCtx.events().topologyVersion();
     }
 
     /**
@@ -291,19 +378,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @return Discovery cache.
-     */
-    public DiscoCache discoCache() {
-        return discoCache;
-    }
-
-    /**
      * @param cacheId Cache ID.
      * @param rcvdFrom Node ID cache was received from.
      * @return {@code True} if cache was added during this exchange.
      */
     public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
-        return dynamicCacheStarted(cacheId) || (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
+        return dynamicCacheStarted(cacheId) || exchCtx.events().nodeJoined(rcvdFrom);
     }
 
     /**
@@ -312,8 +392,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return {@code True} if cache group was added during this exchange.
      */
     public boolean cacheGroupAddedOnExchange(int grpId, UUID rcvdFrom) {
-        return dynamicCacheGroupStarted(grpId) ||
-            (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
+        return dynamicCacheGroupStarted(grpId) || exchCtx.events().nodeJoined(rcvdFrom);
     }
 
     /**
@@ -350,8 +429,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         assert exchId.equals(this.exchId);
 
         this.exchId.discoveryEvent(discoEvt);
-        this.discoEvt = discoEvt;
-        this.discoCache = discoCache;
+        this.firstDiscoEvt= discoEvt;
+        this.firstEvtDiscoCache = discoCache;
 
         evtLatch.countDown();
     }
@@ -378,10 +457,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @return Discovery event.
+     * @return First event discovery event.
+     *
+     */
+    public DiscoveryEvent firstEvent() {
+        return firstDiscoEvt;
+    }
+
+    /**
+     * @return Discovery cache for first event.
+     */
+    public DiscoCache firstEventCache() {
+        return firstEvtDiscoCache;
+    }
+
+    /**
+     * @return Events processed in this exchange.
      */
-    public DiscoveryEvent discoveryEvent() {
-        return discoEvt;
+    public ExchangeDiscoveryEvents events() {
+        return exchCtx.events();
     }
 
     /**
@@ -412,6 +506,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param newCrd {@code True} if node become coordinator on this exchange.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void initCoordinatorCaches(boolean newCrd) throws IgniteCheckedException {
+        if (newCrd) {
+            IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this, false);
+
+            if (fut != null)
+                fut.get();
+
+            cctx.exchange().onCoordinatorInitialized();
+        }
+    }
+
+    /**
      * Starts activity.
      *
      * @param newCrd {@code True} if node become coordinator on this exchange.
@@ -427,15 +536,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         U.await(evtLatch);
 
-        assert discoEvt != null : this;
-        assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
+        assert firstDiscoEvt != null : this;
+        assert exchId.nodeId().equals(firstDiscoEvt.eventNode().id()) : this;
 
         try {
-            discoCache.updateAlives(cctx.discovery());
-
-            AffinityTopologyVersion topVer = topologyVersion();
+            AffinityTopologyVersion topVer = initialVersion();
 
-            srvNodes = new ArrayList<>(discoCache.serverNodes());
+            srvNodes = new ArrayList<>(firstEvtDiscoCache.serverNodes());
 
             remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
 
@@ -443,19 +550,30 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             boolean crdNode = crd != null && crd.isLocal();
 
-            if (exchLog.isInfoEnabled())
+            exchCtx = new ExchangeContext(crdNode, this);
+
+            assert state == null : state;
+
+            if (crdNode)
+                state = ExchangeLocalState.CRD;
+            else
+                state = cctx.kernalContext().clientNode() ? ExchangeLocalState.CLIENT : ExchangeLocalState.SRV;
+
+            if (exchLog.isInfoEnabled()) {
                 exchLog.info("Started exchange init [topVer=" + topVer +
                     ", crd=" + crdNode +
-                    ", evt=" + discoEvt.type() +
-                    ", node=" + discoEvt.node() +
-                    ", evtNode=" + discoEvt.node() +
-                    ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)discoEvt).customMessage() : null) +
-                    ']');
+                    ", evt=" + IgniteUtils.gridEventName(firstDiscoEvt.type()) +
+                    ", evtNode=" + firstDiscoEvt.eventNode().id() +
+                    ", customEvt=" + (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)firstDiscoEvt).customMessage() : null) +
+                    ", allowMerge=" + exchCtx.mergeExchanges() + ']');
+            }
 
             ExchangeType exchange;
 
-            if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
-                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
+            if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+                assert !exchCtx.mergeExchanges();
+
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
 
                 if (msg instanceof ChangeGlobalStateMessage) {
                     assert exchActions != null && !exchActions.empty();
@@ -468,7 +586,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     exchange = onCacheChangeRequest(crdNode);
                 }
                 else if (msg instanceof SnapshotDiscoveryMessage) {
-                    exchange = CU.clientNode(discoEvt.eventNode()) ?
+                    exchange = CU.clientNode(firstDiscoEvt.eventNode()) ?
                         onClientNodeEvent(crdNode) :
                         onServerNodeEvent(crdNode);
                 }
@@ -477,12 +595,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                     exchange = onAffinityChangeRequest(crdNode);
                 }
+
+                initCoordinatorCaches(newCrd);
             }
             else {
-                if (discoEvt.type() == EVT_NODE_JOINED) {
-                    if (!discoEvt.eventNode().isLocal()) {
+                if (firstDiscoEvt.type() == EVT_NODE_JOINED) {
+                    if (!firstDiscoEvt.eventNode().isLocal()) {
                         Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
-                            discoEvt.eventNode().id(),
+                            firstDiscoEvt.eventNode().id(),
                             topVer);
 
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
@@ -491,16 +611,35 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         initCachesOnLocalJoin();
                 }
 
-                if (newCrd) {
-                    IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this);
+                initCoordinatorCaches(newCrd);
 
-                    if (fut != null)
-                        fut.get();
-                }
+                if (exchCtx.mergeExchanges()) {
+                    if (localJoinExchange()) {
+                        if (cctx.kernalContext().clientNode()) {
+                            onClientNodeEvent(crdNode);
+
+                            exchange = ExchangeType.CLIENT;
+                        }
+                        else {
+                            onServerNodeEvent(crdNode);
+
+                            exchange = ExchangeType.ALL;
+                        }
+                    }
+                    else {
+                        if (CU.clientNode(firstDiscoEvt.eventNode()))
+                            exchange = onClientNodeEvent(crdNode);
+                        else
+                            exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
+                    }
 
-                exchange = CU.clientNode(discoEvt.eventNode()) ?
-                    onClientNodeEvent(crdNode) :
-                    onServerNodeEvent(crdNode);
+                    if (exchId.isLeft())
+                        onLeft();
+                }
+                else {
+                    exchange = CU.clientNode(firstDiscoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
+                        onServerNodeEvent(crdNode);
+                }
             }
 
             updateTopologies(crdNode);
@@ -513,7 +652,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 case CLIENT: {
-                    initTopologies();
+                    if (!exchCtx.mergeExchanges() && exchCtx.fetchAffinityOnJoin())
+                        initTopologies();
 
                     clientOnlyExchange();
 
@@ -579,7 +719,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             cctx.database().readCheckpointAndRestoreMemory(startDescs);
         }
 
-        cctx.cache().startCachesOnLocalJoin(caches, topologyVersion());
+        cctx.cache().startCachesOnLocalJoin(caches, initialVersion());
     }
 
     /**
@@ -594,7 +734,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     if (grp.isLocal())
                         continue;
 
-                    grp.topology().beforeExchange(this, !centralizedAff);
+                    grp.topology().beforeExchange(this, !centralizedAff, false);
                 }
             }
         }
@@ -622,7 +762,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion());
 
                 if (updateTop && clientTop != null) {
-                    top.update(topologyVersion(),
+                    top.update(null,
                         clientTop.partitionMap(true),
                         clientTop.updateCounters(false),
                         Collections.<Integer>emptySet(),
@@ -632,13 +772,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             top.updateTopologyVersion(
                 this,
-                discoCache(),
+                events().discoveryCache(),
                 updSeq,
                 cacheGroupStopping(grp.groupId()));
         }
 
         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
-            top.updateTopologyVersion(this, discoCache(), -1, cacheGroupStopping(top.groupId()));
+            top.updateTopologyVersion(this, events().discoveryCache(), -1, cacheGroupStopping(top.groupId()));
     }
 
     /**
@@ -656,7 +796,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (log.isInfoEnabled()) {
                 log.info("Start activation process [nodeId=" + cctx.localNodeId() +
                     ", client=" + cctx.kernalContext().clientNode() +
-                    ", topVer=" + topologyVersion() + "]");
+                    ", topVer=" + initialVersion() + "]");
             }
 
             try {
@@ -676,18 +816,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (log.isInfoEnabled()) {
                     log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() +
                         ", client=" + cctx.kernalContext().clientNode() +
-                        ", topVer=" + topologyVersion() + "]");
+                        ", topVer=" + initialVersion() + "]");
                 }
             }
             catch (Exception e) {
                 U.error(log, "Failed to activate node components [nodeId=" + cctx.localNodeId() +
                     ", client=" + cctx.kernalContext().clientNode() +
-                    ", topVer=" + topologyVersion() + "]", e);
+                    ", topVer=" + initialVersion() + "]", e);
 
                 changeGlobalStateE = e;
 
                 if (crd) {
-                    synchronized (this) {
+                    synchronized (mux) {
                         changeGlobalStateExceptions.put(cctx.localNodeId(), e);
                     }
                 }
@@ -697,7 +837,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (log.isInfoEnabled()) {
                 log.info("Start deactivation process [nodeId=" + cctx.localNodeId() +
                     ", client=" + cctx.kernalContext().clientNode() +
-                    ", topVer=" + topologyVersion() + "]");
+                    ", topVer=" + initialVersion() + "]");
             }
 
             try {
@@ -711,13 +851,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     log.info("Successfully deactivated data structures, services and caches [" +
                         "nodeId=" + cctx.localNodeId() +
                         ", client=" + cctx.kernalContext().clientNode() +
-                        ", topVer=" + topologyVersion() + "]");
+                        ", topVer=" + initialVersion() + "]");
                 }
             }
             catch (Exception e) {
                 U.error(log, "Failed to deactivate node components [nodeId=" + cctx.localNodeId() +
                     ", client=" + cctx.kernalContext().clientNode() +
-                    ", topVer=" + topologyVersion() + "]", e);
+                    ", topVer=" + initialVersion() + "]", e);
 
                 changeGlobalStateE = e;
             }
@@ -763,19 +903,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return Exchange type.
      */
     private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedException {
-        assert CU.clientNode(discoEvt.eventNode()) : this;
+        assert CU.clientNode(firstDiscoEvt.eventNode()) : this;
 
-        if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) {
+        if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
             onLeft();
 
-            assert !discoEvt.eventNode().isLocal() : discoEvt;
+            assert !firstDiscoEvt.eventNode().isLocal() : firstDiscoEvt;
         }
         else
-            assert discoEvt.type() == EVT_NODE_JOINED || discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+            assert firstDiscoEvt.type() == EVT_NODE_JOINED || firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : firstDiscoEvt;
 
         cctx.affinity().onClientEvent(this, crd);
 
-        return discoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE;
+        return firstDiscoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE;
     }
 
     /**
@@ -784,12 +924,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return Exchange type.
      */
     private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedException {
-        assert !CU.clientNode(discoEvt.eventNode()) : this;
+        assert !CU.clientNode(firstDiscoEvt.eventNode()) : this;
 
-        if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) {
+        if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
             onLeft();
 
-            warnNoAffinityNodes();
+            exchCtx.events().warnNoAffinityNodes(cctx);
 
             centralizedAff = cctx.affinity().onServerLeft(this, crd);
         }
@@ -818,12 +958,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     GridAffinityAssignmentCache aff = grp.affinity();
 
-                    aff.initialize(topologyVersion(), aff.idealAssignment());
+                    aff.initialize(initialVersion(), aff.idealAssignment());
                 }
             }
+            else
+                onAllServersLeft();
         }
 
-        onDone(topologyVersion());
+        onDone(initialVersion());
     }
 
     /**
@@ -848,7 +990,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         waitPartitionRelease();
 
-        boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
+        boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal() || cacheStopping(cacheCtx.cacheId()))
@@ -860,18 +1002,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
         }
 
-        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-            if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
-                continue;
+        if (!exchCtx.mergeExchanges()) {
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                    continue;
 
-            grp.topology().beforeExchange(this, !centralizedAff);
+                // It is possible affinity is not initialized yet if node joins to cluster.
+                if (grp.affinity().lastVersion().topologyVersion() > 0)
+                    grp.topology().beforeExchange(this, !centralizedAff, false);
+            }
         }
 
         cctx.database().beforeExchange(this);
 
         if (crd.isLocal()) {
             if (remaining.isEmpty())
-                onAllReceived();
+                onAllReceived(null);
         }
         else
             sendPartitions(crd);
@@ -886,7 +1032,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         try {
             long start = U.currentTimeMillis();
 
-            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
+            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt);
 
             if (fut != null) {
                 fut.get();
@@ -913,7 +1059,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @throws IgniteCheckedException If failed.
      */
     private void waitPartitionRelease() throws IgniteCheckedException {
-        IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(topologyVersion());
+        IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
 
         // Assign to class variable so it will be included into toString() method.
         this.partReleaseFut = partReleaseFut;
@@ -941,7 +1087,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             catch (IgniteFutureTimeoutCheckedException ignored) {
                 // Print pending transactions and locks that might have led to hang.
                 if (nextDumpTime <= U.currentTimeMillis()) {
-                    dumpPendingObjects();
+                    dumpPendingObjects(partReleaseFut);
 
                     nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
                 }
@@ -1015,71 +1161,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     *
+     * @param partReleaseFut Partition release future.
      */
-    private void warnNoAffinityNodes() {
-        List<String> cachesWithoutNodes = null;
-
-        for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors().values()) {
-            if (discoCache.cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) {
-                if (cachesWithoutNodes == null)
-                    cachesWithoutNodes = new ArrayList<>();
-
-                cachesWithoutNodes.add(cacheDesc.cacheName());
-
-                // Fire event even if there is no client cache started.
-                if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
-                    Event evt = new CacheEvent(
-                        cacheDesc.cacheName(),
-                        cctx.localNode(),
-                        cctx.localNode(),
-                        "All server nodes have left the cluster.",
-                        EventType.EVT_CACHE_NODES_LEFT,
-                        0,
-                        false,
-                        null,
-                        null,
-                        null,
-                        null,
-                        false,
-                        null,
-                        false,
-                        null,
-                        null,
-                        null
-                    );
-
-                    cctx.gridEvents().record(evt);
-                }
-            }
-        }
-
-        if (cachesWithoutNodes != null) {
-            StringBuilder sb =
-                new StringBuilder("All server nodes for the following caches have left the cluster: ");
-
-            for (int i = 0; i < cachesWithoutNodes.size(); i++) {
-                String cache = cachesWithoutNodes.get(i);
-
-                sb.append('\'').append(cache).append('\'');
-
-                if (i != cachesWithoutNodes.size() - 1)
-                    sb.append(", ");
-            }
-
-            U.quietAndWarn(log, sb.toString());
+    private void dumpPendingObjects(IgniteInternalFuture<?> partReleaseFut) {
+        U.warn(cctx.kernalContext().cluster().diagnosticLog(),
+            "Failed to wait for partition release future [topVer=" + initialVersion() +
+            ", node=" + cctx.localNodeId() + "]");
 
-            U.quietAndWarn(log, "Must have server nodes for caches to operate.");
-        }
-    }
+        U.warn(log, "Partition release future: " + partReleaseFut);
 
-    /**
-     *
-     */
-    private void dumpPendingObjects() {
         U.warn(cctx.kernalContext().cluster().diagnosticLog(),
-            "Failed to wait for partition release future [topVer=" + topologyVersion() +
-            ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: ");
+            "Dumping pending objects that might be the cause: ");
 
         try {
             cctx.exchange().dumpDebugInfo(this);
@@ -1106,6 +1198,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @return {@code True} if exchange for local node join.
+     */
+    public boolean localJoinExchange() {
+        return firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isLocal();
+    }
+
+    /**
      * @param node Target Node.
      * @throws IgniteCheckedException If failed.
      */
@@ -1129,19 +1228,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 true);
         }
         else {
-            msg = cctx.exchange().createPartitionsSingleMessage(node,
-                exchangeId(),
+            msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
                 false,
                 true);
-        }
 
-        Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
+            Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 
-        if (partHistReserved0 != null)
-            msg.partitionHistoryCounters(partHistReserved0);
+            if (partHistReserved0 != null)
+                msg.partitionHistoryCounters(partHistReserved0);
+        }
 
         if (stateChangeExchange() && changeGlobalStateE != null)
             msg.setError(changeGlobalStateE);
+        else if (localJoinExchange())
+            msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
 
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']');
@@ -1176,30 +1276,72 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param msg Message to send.
      * @param nodes Nodes.
-     * @throws IgniteCheckedException If failed.
+     * @param mergedJoinExchMsgs Messages received from merged 'join node' exchanges.
+     * @param joinedNodeAff Affinity if was requested by some nodes.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage m = createPartitionsMessage(true);
+    private void sendAllPartitions(
+        GridDhtPartitionsFullMessage msg,
+        Collection<ClusterNode> nodes,
+        Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs,
+        Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
+        boolean singleNode = nodes.size() == 1;
+
+        GridDhtPartitionsFullMessage joinedNodeMsg = null;
 
         assert !nodes.contains(cctx.localNode());
 
         if (log.isDebugEnabled()) {
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
-                ", exchId=" + exchId + ", msg=" + m + ']');
+                ", exchId=" + exchId + ", msg=" + msg + ']');
         }
 
         for (ClusterNode node : nodes) {
+            GridDhtPartitionsFullMessage sndMsg = msg;
+
+            if (joinedNodeAff != null) {
+                if (singleNode)
+                    msg.joinedNodeAffinity(joinedNodeAff);
+                else {
+                    GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
+
+                    if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
+                        if (joinedNodeMsg == null) {
+                            joinedNodeMsg = msg.copy();
+
+                            joinedNodeMsg.joinedNodeAffinity(joinedNodeAff);
+                        }
+
+                        sndMsg = joinedNodeMsg;
+                    }
+                }
+            }
+
             try {
-                cctx.io().send(node, m, SYSTEM_POOL);
+                GridDhtPartitionExchangeId sndExchId = exchangeId();
+
+                if (mergedJoinExchMsgs != null) {
+                    GridDhtPartitionsSingleMessage mergedMsg = mergedJoinExchMsgs.get(node.id());
+
+                    if (mergedMsg != null)
+                        sndExchId = mergedMsg.exchangeId();
+                }
+
+                if (sndExchId != null && !sndExchId.equals(exchangeId())) {
+                    sndMsg = sndMsg.copy();
+
+                    sndMsg.exchangeId(sndExchId);
+                }
+
+                cctx.io().send(node, sndMsg, SYSTEM_POOL);
+            }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send partitions, node failed: " + node);
             }
             catch (IgniteCheckedException e) {
-                if (cctx.io().checkNodeLeft(node.id(), e, false)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send partitions, node failed: " + node);
-                }
-                else
-                    U.error(log, "Failed to send partitions [node=" + node + ']', e);
+                U.error(log, "Failed to send partitions [node=" + node + ']', e);
             }
         }
     }
@@ -1226,16 +1368,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return {@code True} if exchange triggered by server node join or fail.
      */
     public boolean serverNodeDiscoveryEvent() {
-        assert discoEvt != null;
+        assert exchCtx != null;
 
-        return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode());
+        return exchCtx.events().hasServerJoin() || exchCtx.events().hasServerLeft();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exchangeDone() {
+        return done.get();
+    }
+
+    /**
+     * Finish merged future to allow GridCachePartitionExchangeManager.ExchangeFutureSet cleanup.
+     */
+    public void finishMerged() {
+        super.onDone(null, null);
     }
 
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) {
-        if (!done.compareAndSet(false, true))
+        if (isDone() || !done.compareAndSet(false, true))
             return false;
 
+        log.info("Finish exchange future [startVer=" + initialVersion() +
+            ", resVer=" + res +
+            ", err=" + err + ']');
+
+        assert res != null || err != null;
+
         if (err == null &&
             !cctx.kernalContext().clientNode() &&
             (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
@@ -1243,18 +1403,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
                     continue;
 
-                cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion());
+                cacheCtx.continuousQueries().flushBackupQueue(res);
             }
        }
 
         if (err == null) {
             if (centralizedAff) {
+                assert !exchCtx.mergeExchanges();
+
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (grp.isLocal())
                         continue;
 
                     try {
-                        grp.topology().initPartitions(this);
+                        grp.topology().initPartitionsWhenAffinityReady(res, this);
                     }
                     catch (IgniteInterruptedCheckedException e) {
                         U.error(log, "Failed to initialize partitions.", e);
@@ -1267,7 +1429,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 if (drCacheCtx.isDrEnabled()) {
                     try {
-                        drCacheCtx.dr().onExchange(topologyVersion(), exchId.isLeft());
+                        drCacheCtx.dr().onExchange(res, exchId.isLeft());
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to notify DR: " + e, e);
@@ -1275,25 +1437,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (serverNodeDiscoveryEvent() &&
-                (discoEvt.type() == EVT_NODE_LEFT ||
-                discoEvt.type() == EVT_NODE_FAILED ||
-                discoEvt.type() == EVT_NODE_JOINED))
-                detectLostPartitions();
+            if (serverNodeDiscoveryEvent())
+                detectLostPartitions(res);
 
             Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
 
             for (CacheGroupContext grp : cctx.cache().cacheGroups())
-                m.put(grp.groupId(), validateCacheGroup(grp, discoEvt.topologyNodes()));
+                m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes()));
 
             grpValidRes = m;
         }
 
         tryToPerformLocalSnapshotOperation();
 
-        cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err);
+        cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
 
-        cctx.exchange().onExchangeDone(this, err);
+        cctx.exchange().onExchangeDone(res, initialVersion(), err);
 
         if (exchActions != null && err == null)
             exchActions.completeRequestFutures(cctx);
@@ -1320,7 +1479,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (err == null) {
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (!grp.isLocal())
-                    grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false);
+                    grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false);
             }
         }
 
@@ -1331,17 +1490,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             initFut.onDone(err == null);
 
-            if (exchId.isLeft()) {
-                for (CacheGroupContext grp : cctx.cache().cacheGroups())
-                    grp.affinityFunction().removeNode(exchId.nodeId());
+            if (exchCtx != null && exchCtx.events().hasServerLeft()) {
+                ExchangeDiscoveryEvents evts = exchCtx.events();
+
+                for (DiscoveryEvent evt : exchCtx.events().events()) {
+                    if (evts.serverLeftEvent(evt)) {
+                        for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                            grp.affinityFunction().removeNode(evt.eventNode().id());
+                    }
+                }
             }
 
             exchActions = null;
 
-            if (discoEvt instanceof DiscoveryCustomEvent)
-                ((DiscoveryCustomEvent)discoEvt).customMessage(null);
+            if (firstDiscoEvt instanceof DiscoveryCustomEvent)
+                ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);
 
-            cctx.exchange().lastFinishedFuture(this);
+            if (err == null)
+                cctx.exchange().lastFinishedFuture(this);
 
             return true;
         }
@@ -1353,7 +1519,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * Cleans up resources to avoid excessive memory usage.
      */
     public void cleanUp() {
-        singleMsgs.clear();
+        pendingSingleMsgs.clear();
         fullMsgs.clear();
         msgs.clear();
         changeGlobalStateExceptions.clear();
@@ -1361,6 +1527,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         partReleaseFut = null;
         changeGlobalStateE = null;
         exchActions = null;
+        mergedJoinExchMsgs = null;
+        pendingJoinMsg = null;
+        exchCtx = null;
+        newCrdFut = null;
     }
 
     /**
@@ -1382,137 +1552,401 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * Processing of received single message. Actual processing in future may be delayed if init method was not
-     * completed, see {@link #initDone()}
+     * Records that this exchange if merged with another 'node join' exchange.
      *
-     * @param node Sender node.
-     * @param msg Single partition info.
+     * @param node Joined node.
+     * @param msg Joined node message if already received.
+     * @return {@code True} if need to wait for message from joined server node.
      */
-    public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
-        assert msg != null;
-        assert msg.exchangeId().equals(exchId) : msg;
+    private boolean addMergedJoinExchange(ClusterNode node, @Nullable GridDhtPartitionsSingleMessage msg) {
+        assert Thread.holdsLock(mux);
+        assert node != null;
+        assert state == ExchangeLocalState.CRD : state;
 
-        if (!msg.client()) {
-            assert msg.lastVersion() != null : msg;
+        if (msg == null && newCrdFut != null)
+            msg = newCrdFut.joinExchangeMessage(node.id());
 
-            updateLastVersion(msg.lastVersion());
-        }
+        UUID nodeId = node.id();
 
-        if (isDone()) {
-            if (log.isDebugEnabled())
-                log.debug("Received message for finished future (will reply only to sender) [msg=" + msg +
-                    ", fut=" + this + ']');
+        boolean wait = false;
 
-            if (!centralizedAff)
-                sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount());
+        if (CU.clientNode(node)) {
+            if (msg != null)
+                waitAndReplyToNode(nodeId, msg);
         }
         else {
-            initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                @Override public void apply(IgniteInternalFuture<Boolean> f) {
-                    try {
-                        if (!f.get())
-                            return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to initialize exchange future: " + this, e);
+            if (mergedJoinExchMsgs == null)
+                mergedJoinExchMsgs = new LinkedHashMap<>();
 
-                        return;
-                    }
+            if (msg != null) {
+                assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order()));
+
+                log.info("Merge server join exchange, message received [curFut=" + initialVersion() +
+                    ", node=" + nodeId + ']');
 
-                    processMessage(node, msg);
+                mergedJoinExchMsgs.put(nodeId, msg);
+            }
+            else {
+                if (cctx.discovery().alive(nodeId)) {
+                    log.info("Merge server join exchange, wait for message [curFut=" + initialVersion() +
+                        ", node=" + nodeId + ']');
+
+                    wait = true;
+
+                    mergedJoinExchMsgs.put(nodeId, null);
+
+                    awaitMergedMsgs++;
                 }
-            });
+                else {
+                    log.info("Merge server join exchange, awaited node left [curFut=" + initialVersion() +
+                        ", node=" + nodeId + ']');
+                }
+            }
         }
+
+        return wait;
     }
 
     /**
-     * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the
-     * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section.
+     * Merges this exchange with given one.
      *
-     * @param node Sender node.
-     * @param msg Partition single message.
+     * @param fut Current exchange to merge with.
+     * @return {@code True} if need wait for message from joined server node.
      */
-    private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
-        boolean allReceived = false; // Received all expected messages.
-        boolean updateSingleMap = false;
+    public boolean mergeJoinExchange(GridDhtPartitionsExchangeFuture fut) {
+        boolean wait;
 
-        synchronized (this) {
-            assert crd != null;
+        synchronized (mux) {
+            assert (!isDone() && !initFut.isDone()) || cctx.kernalContext().isStopping() : this;
+            assert (mergedWith == null && state == null) || cctx.kernalContext().isStopping()  : this;
 
-            if (crd.isLocal()) {
-                if (remaining.remove(node.id())) {
-                    updateSingleMap = true;
+            state = ExchangeLocalState.MERGED;
 
-                    pendingSingleUpdates++;
+            mergedWith = fut;
 
-                    if (stateChangeExchange() && msg.getError() != null)
-                        changeGlobalStateExceptions.put(node.id(), msg.getError());
+            ClusterNode joinedNode = firstDiscoEvt.eventNode();
 
-                    allReceived = remaining.isEmpty();
-                }
-            }
-            else
-                singleMsgs.put(node, msg);
+            wait = fut.addMergedJoinExchange(joinedNode, pendingJoinMsg);
         }
 
-        if (updateSingleMap) {
-            try {
-                // Do not update partition map, in case cluster transitioning to inactive state.
-                if (!deactivateCluster())
-                    updatePartitionSingleMap(node, msg);
-            }
-            finally {
-                synchronized (this) {
-                    assert pendingSingleUpdates > 0;
+        return wait;
+    }
 
-                    pendingSingleUpdates--;
+    /**
+     * @param fut Current future.
+     * @return Pending join request if any.
+     */
+    @Nullable public GridDhtPartitionsSingleMessage mergeJoinExchangeOnDone(GridDhtPartitionsExchangeFuture fut) {
+        synchronized (mux) {
+            assert !isDone();
+            assert !initFut.isDone();
+            assert mergedWith == null;
+            assert state == null;
 
-                    if (pendingSingleUpdates == 0)
-                        notifyAll();
-                }
-            }
-        }
+            state = ExchangeLocalState.MERGED;
 
-        if (allReceived) {
-            awaitSingleMapUpdates();
+            mergedWith = fut;
 
-            onAllReceived();
+            return pendingJoinMsg;
         }
     }
 
     /**
-     *
+     * @param node Sender node.
+     * @param msg Message.
      */
-    private synchronized void awaitSingleMapUpdates() {
-        try {
-            while (pendingSingleUpdates > 0)
-                U.wait(this);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+    private void processMergedMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+        if (msg.client()) {
+            waitAndReplyToNode(node.id(), msg);
+
+            return;
         }
-    }
 
-    /**
-     * @param fut Affinity future.
-     */
-    private void onAffinityInitialized(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
-        try {
-            assert fut.isDone();
+        boolean done = false;
 
-            Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
+        FinishState finishState0 = null;
 
-            GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+        synchronized (mux) {
+            if (state == ExchangeLocalState.DONE) {
+                assert finishState != null;
 
-            CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
+                finishState0 = finishState;
+            }
+            else {
+                boolean process = mergedJoinExchMsgs != null &&
+                    mergedJoinExchMsgs.containsKey(node.id()) &&
+                    mergedJoinExchMsgs.get(node.id()) == null;
 
-            if (log.isDebugEnabled())
-                log.debug("Centralized affinity exchange, send affinity change message: " + msg);
+                log.info("Merge server join exchange, received message [curFut=" + initialVersion() +
+                    ", node=" + node.id() +
+                    ", msgVer=" + msg.exchangeId().topologyVersion() +
+                    ", process=" + process +
+                    ", awaited=" + awaitMergedMsgs + ']');
 
-            cctx.discovery().sendCustomEvent(msg);
+                if (process) {
+                    mergedJoinExchMsgs.put(node.id(), msg);
+
+                    assert awaitMergedMsgs > 0 : awaitMergedMsgs;
+
+                    awaitMergedMsgs--;
+
+                    done = awaitMergedMsgs == 0;
+                }
+            }
         }
-        catch (IgniteCheckedException e) {
-            onDone(e);
+
+        if (finishState0 != null) {
+            sendAllPartitionsToNode(finishState0, msg, node.id());
+
+            return;
+        }
+
+        if (done)
+            finishExchangeOnCoordinator(null);
+    }
+
+    /**
+     * Processing of received single message. Actual processing in future may be delayed if init method was not
+     * completed, see {@link #initDone()}
+     *
+     * @param node Sender node.
+     * @param msg Single partition info.
+     */
+    public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+        assert !node.isDaemon() : node;
+        assert msg != null;
+        assert exchId.equals(msg.exchangeId()) : msg;
+        assert !cctx.kernalContext().clientNode();
+
+        if (msg.restoreState()) {
+            InitNewCoordinatorFuture newCrdFut0;
+
+            synchronized (mux) {
+                assert newCrdFut != null;
+
+                newCrdFut0 = newCrdFut;
+            }
+
+            newCrdFut0.onMessage(node, msg);
+
+            return;
+        }
+
+        if (!msg.client()) {
+            assert msg.lastVersion() != null : msg;
+
+            updateLastVersion(msg.lastVersion());
+        }
+
+        GridDhtPartitionsExchangeFuture mergedWith0 = null;
+
+        synchronized (mux) {
+            if (state == ExchangeLocalState.MERGED) {
+                assert mergedWith != null;
+
+                mergedWith0 = mergedWith;
+            }
+            else {
+                assert state != ExchangeLocalState.CLIENT;
+
+                if (exchangeId().isJoined() && node.id().equals(exchId.nodeId()))
+                    pendingJoinMsg = msg;
+            }
+        }
+
+        if (mergedWith0 != null) {
+            mergedWith0.processMergedMessage(node, msg);
+
+            return;
+        }
+
+        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+            @Override public void apply(IgniteInternalFuture<Boolean> f) {
+                try {
+                    if (!f.get())
+                        return;
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to initialize exchange future: " + this, e);
+
+                    return;
+                }
+
+                processSingleMessage(node.id(), msg);
+            }
+        });
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Client's message.
+     */
+    public void waitAndReplyToNode(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) {
+        listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                if (cctx.kernalContext().isStopping())
+                    return;
+
+                FinishState finishState0;
+
+                synchronized (mux) {
+                    finishState0 = finishState;
+                }
+
+                if (finishState0 == null) {
+                    assert firstDiscoEvt.type() == EVT_NODE_JOINED && CU.clientNode(firstDiscoEvt.eventNode()) : this;
+
+                    finishState0 = new FinishState(cctx.localNodeId(),
+                        initialVersion(),
+                        createPartitionsMessage(true));
+                }
+
+                sendAllPartitionsToNode(finishState0, msg, nodeId);
+            }
+        });
+    }
+
+    /**
+     * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the
+     * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section.
+     *
+     * @param nodeId Sender node.
+     * @param msg Partition single message.
+     */
+    private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+        if (msg.client()) {
+            waitAndReplyToNode(nodeId, msg);
+
+            return;
+        }
+
+        boolean allReceived = false; // Received all expected messages.
+        boolean updateSingleMap = false;
+
+        FinishState finishState0 = null;
+
+        synchronized (mux) {
+            assert crd != null;
+
+            switch (state) {
+                case DONE: {
+                    log.info("Received single message, already done [ver=" + initialVersion() +
+                        ", node=" + nodeId + ']');
+
+                    assert finishState != null;
+
+                    finishState0 = finishState;
+
+                    break;
+                }
+
+                case CRD: {
+                    assert crd.isLocal() : crd;
+
+                    if (remaining.remove(nodeId)) {
+                        updateSingleMap = true;
+
+                        pendingSingleUpdates++;
+
+                        if (stateChangeExchange() && msg.getError() != null)
+                            changeGlobalStateExceptions.put(nodeId, msg.getError());
+
+                        allReceived = remaining.isEmpty();
+
+                        log.info("Coordinator received single message [ver=" + initialVersion() +
+                            ", node=" + nodeId +
+                            ", allReceived=" + allReceived + ']');
+                    }
+
+                    break;
+                }
+
+                case SRV:
+                case BECOME_CRD: {
+                    log.info("Non-coordinator received single message [ver=" + initialVersion() +
+                        ", node=" + nodeId + ", state=" + state + ']');
+
+                    pendingSingleMsgs.put(nodeId, msg);
+
+                    break;
+                }
+
+                default:
+                    assert false : state;
+            }
+        }
+
+        if (finishState0 != null) {
+            sendAllPartitionsToNode(finishState0, msg, nodeId);
+
+            return;
+        }
+
+        if (updateSingleMap) {
+            try {
+                // Do not update partition map, in case cluster transitioning to inactive state.
+                if (!deactivateCluster())
+                    updatePartitionSingleMap(nodeId, msg);
+            }
+            finally {
+                synchronized (mux) {
+                    assert pendingSingleUpdates > 0;
+
+                    pendingSingleUpdates--;
+
+                    if (pendingSingleUpdates == 0)
+                        mux.notifyAll();
+                }
+            }
+        }
+
+        if (allReceived) {
+            if (!awaitSingleMapUpdates())
+                return;
+
+            onAllReceived(null);
+        }
+    }
+
+    /**
+     * @return {@code False} if interrupted.
+     */
+    private boolean awaitSingleMapUpdates() {
+        try {
+            synchronized (mux) {
+                while (pendingSingleUpdates > 0)
+                    U.wait(mux);
+            }
+
+            return true;
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+
+            return false;
+        }
+    }
+
+    /**
+     * @param fut Affinity future.
+     */
+    private void onAffinityInitialized(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
+        try {
+            assert fut.isDone();
+
+            Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
+
+            GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+
+            CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
+
+            if (log.isDebugEnabled())
+                log.debug("Centralized affinity exchange, send affinity change message: " + msg);
+
+            cctx.discovery().sendCustomEvent(msg);
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
         }
     }
 
@@ -1657,8 +2091,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * Detect lost partitions.
+     *
+     * @param resTopVer Result topology version.
      */
-    private void detectLostPartitions() {
+    private void detectLostPartitions(AffinityTopologyVersion resTopVer) {
         boolean detected = false;
 
         synchronized (cctx.exchange().interruptLock()) {
@@ -1667,7 +2103,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (!grp.isLocal()) {
-                    boolean detectedOnGrp = grp.topology().detectLostPartitions(discoEvt);
+                    boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, events().lastEvent());
 
                     detected |= detectedOnGrp;
                 }
@@ -1682,6 +2118,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param cacheNames Cache names.
      */
     private void resetLostPartitions(Collection<String> cacheNames) {
+        assert !exchCtx.mergeExchanges();
+
         synchronized (cctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 return;
@@ -1692,7 +2130,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 for (String cacheName : cacheNames) {
                     if (grp.hasCache(cacheName)) {
-                        grp.topology().resetLostPartitions();
+                        grp.topology().resetLostPartitions(initialVersion());
 
                         break;
                     }
@@ -1702,49 +2140,122 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     *
+     * @param sndResNodes Additional nodes to send finish message to.
      */
-    private void onAllReceived() {
+    private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) {
         try {
             assert crd.isLocal();
 
-            assert partHistSuppliers.isEmpty();
+            assert partHistSuppliers.isEmpty() : partHistSuppliers;
 
-            if (!crd.equals(discoCache.serverNodes().get(0))) {
+            if (!exchCtx.mergeExchanges() && !crd.equals(events().discoveryCache().serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (!grp.isLocal())
-                        grp.topology().beforeExchange(this, !centralizedAff);
+                        grp.topology().beforeExchange(this, !centralizedAff, false);
                 }
             }
 
-            for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
-                if (msg instanceof GridDhtPartitionsSingleMessage) {
-                    GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;
+            if (exchCtx.mergeExchanges()) {
+                log.info("Coordinator received all messages, try merge [ver=" + initialVersion() + ']');
+
+                boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this);
+
+                if (!finish)
+                    return;
+            }
+
+            finishExchangeOnCoordinator(sndResNodes);
+        }
+        catch (IgniteCheckedException e) {
+            if (reconnectOnError(e))
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            else
+                onDone(e);
+        }
+    }
+
+    /**
+     * @param sndResNodes Additional nodes to send finish message to.
+     */
+    private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndResNodes) {
+        try {
+            AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion();
 
-                    for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) {
-                        Integer grpId = entry.getKey();
-                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+            log.info("finishExchangeOnCoordinator [topVer=" + initialVersion() +
+                ", resVer=" + resTopVer + ']');
 
-                        GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                            cctx.exchange().clientTopology(grpId, this);
+            Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null;
 
-                        Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId);
+            if (exchCtx.mergeExchanges()) {
+                synchronized (mux) {
+                    if (mergedJoinExchMsgs != null) {
+                        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) {
+                            msgs.put(e.getKey(), e.getValue());
 
-                        if (cntrs != null)
-                            top.applyUpdateCounters(cntrs);
+                            updatePartitionSingleMap(e.getKey(), e.getValue());
+                        }
                     }
                 }
+
+                assert exchCtx.events().hasServerJoin() || exchCtx.events().hasServerLeft();
+
+                exchCtx.events().processEvents(this);
+
+                if (exchCtx.events().hasServerLeft())
+                    idealAffDiff = cctx.affinity().onServerLeftWithExchangeMergeProtocol(this);
+                else
+                    cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
+
+                for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups().values()) {
+                    if (desc.config().getCacheMode() == CacheMode.LOCAL)
+                        continue;
+
+                    CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+                    GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                        cctx.exchange().clientTopology(desc.groupId());
+
+                    top.beforeExchange(this, true, true);
+                }
             }
 
-            if (discoEvt.type() == EVT_NODE_JOINED)
-                assignPartitionsStates();
-            else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
-                assert discoEvt instanceof DiscoveryCustomEvent;
+            Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
+
+            for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
+                GridDhtPartitionsSingleMessage msg = e.getValue();
+
+                // Apply update counters after all single messages are received.
+                for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
+                    Integer grpId = entry.getKey();
+
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+                    GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                        cctx.exchange().clientTopology(grpId);
+
+                    Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId);
+
+                    if (cntrs != null)
+                        top.applyUpdateCounters(cntrs);
+                }
+
+                Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+                if (affReq != null) {
+                    joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
+                        resTopVer,
+                        affReq,
+                        joinedNodeAff);
+                }
+            }
+
+            if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+                assert firstDiscoEvt instanceof DiscoveryCustomEvent;
 
                 if (activateCluster())
                     assignPartitionsStates();
 
-                if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
+                if (((DiscoveryCustomEvent)firstDiscoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
                     if (exchActions != null) {
                         Set<String> caches = exchActions.cachesToResetLostPartitions();
 
@@ -1753,14 +2264,40 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     }
                 }
             }
-            else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED)
-                detectLostPartitions();
+            else {
+                if (exchCtx.events().hasServerJoin())
+                    assignPartitionsStates();
+
+                if (exchCtx.events().hasServerLeft())
+                    detectLostPartitions(resTopVer);
+            }
 
             updateLastVersion(cctx.versions().last());
 
             cctx.versions().onExchange(lastVer.get().order());
 
+            GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
+
+            if (exchCtx.mergeExchanges()) {
+                assert !centralizedAff;
+
+                msg.resultTopologyVersion(resTopVer);
+
+                if (exchCtx.events().hasServerLeft())
+                    msg.idealAffinityDiff(idealAffDiff);
+            }
+
+            msg.prepareMarshal(cctx);
+
+            synchronized (mux) {
+                finishState = new FinishState(crd.id(), resTopVer, msg);
+
+                state = ExchangeLocalState.DONE;
+            }
+
             if (centralizedAff) {
+                assert !exchCtx.mergeExchanges();
+
                 IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut = cctx.affinity().initAffinityOnNodeLeft(this);
 
                 if (!fut.isDone()) {
@@ -1774,12 +2311,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     onAffinityInitialized(fut);
             }
             else {
-                List<ClusterNode> nodes;
+                Set<ClusterNode> nodes;
+
+                Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs0;
 
-                synchronized (this) {
+                synchronized (mux) {
                     srvNodes.remove(cctx.localNode());
 
-                    nodes = new ArrayList<>(srvNodes);
+                    nodes = U.newHashSet(srvNodes.size());
+
+                    nodes.addAll(srvNodes);
+
+                    mergedJoinExchMsgs0 = mergedJoinExchMsgs;
+
+                    if (mergedJoinExchMsgs != null) {
+                        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) {
+                            if (e.getValue() != null) {
+                                ClusterNode node = cctx.discovery().node(e.getKey());
+
+                                if (node != null)
+                                    nodes.add(node);
+                            }
+                        }
+                    }
+
+                    if (!F.isEmpty(sndResNodes))
+                        nodes.addAll(sndResNodes);
                 }
 
                 IgniteCheckedException err = null;
@@ -1801,15 +2358,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                     boolean active = !stateChangeErr && req.activate();
 
-                    ChangeGlobalStateFinishMessage msg = new ChangeGlobalStateFinishMessage(req.requestId(), active);
+                    ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage(
+                        req.requestId(),
+                        active);
 
-                    cctx.discovery().sendCustomEvent(msg);
+                    cctx.discovery().sendCustomEvent(stateFinishMsg);
                 }
 
                 if (!nodes.isEmpty())
-                    sendAllPartitions(nodes);
+                    sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff);
+
+                onDone(exchCtx.events().topologyVersion(), err);
+
+                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : pendingSingleMsgs.entrySet()) {
+                    log.info("Process pending message on coordinator [node=" + e.getKey() +
+                        ", ver=" + initialVersion() +
+                        ", resVer=" + resTopVer + ']');
 
-                onDone(exchangeId().topologyVersion(), err);
+                    processSingleMessage(e.getKey(), e.getValue());
+                }
             }
         }
         catch (IgniteCheckedException e) {
@@ -1835,127 +2402,332 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param nodeId Node ID.
-     * @param retryCnt Number of retries.
+     * @param finishState State.
+     * @param msg Request.
+     * @param nodeId Node ID.
+     */
+    private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsSingleMessage msg, UUID nodeId) {
+        ClusterNode node = cctx.node(nodeId);
+
+        if (node != null) {
+            GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy();
+
+            Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+            if (affReq != null) {
+                Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages(
+                    cctx,
+                    finishState.resTopVer,
+                    affReq,
+                    null);
+
+                fullMsg.joinedNodeAffinity(aff);
+            }
+
+            if (!fullMsg.exchangeId().equals(msg.exchangeId())) {
+                fullMsg = fullMsg.copy();
+
+                fullMsg.exchangeId(msg.exchangeId());
+            }
+
+            try {
+                cctx.io().send(node, fullMsg, SYSTEM_POOL);
+            }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send partitions, node failed: " + node);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send partitions [node=" + node + ']', e);
+            }
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Failed to send partitions, node failed: " + nodeId);
+
+    }
+
+    /**
+     * @param node Sender node.
+     * @param msg Full partition info.
+     */
+    public void onReceiveFullMessage(final ClusterNode node, final GridDhtPartitionsFullMessage msg) {
+        assert msg != null;
+        assert msg.exchangeId() != null : msg;
+        assert !node.isDaemon() : node;
+
+        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+            @Override public void apply(IgniteInternalFuture<Boolean> f) {
+                try {
+                    if (!f.get())
+                        return;
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to initialize exchange future: " + this, e);
+
+                    return;
+                }
+
+                processFullMessage(true, node, msg);
+            }
+        });
+    }
+
+    /**
+     * @param node Sender node.
+     * @param msg Message with full partition info.
+     */
+    public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) {
+        assert !cctx.kernalContext().clientNode() || msg.restoreState();
+        assert !node.isDaemon() && !CU.clientNode(node) : node;
+
+        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+            @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                processSinglePartitionRequest(node, msg);
+            }
+        });
+    }
+
+    /**
+     * @param node Sender node.
+     * @param msg Message.
+     */
+    private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
+        FinishState finishState0 = null;
+
+        synchronized (mux) {
+            if (crd == null) {
+                log.info("Ignore partitions request, no coordinator [node=" + node.id() + ']');
+
+                return;
+            }
+
+            switch (state) {
+                case DONE: {
+                    assert finishState != null;
+
+                    if (node.id().equals(finishState.crdId)) {
+                        log.info("Ignore partitions request, finished exchange with this coordinator: " + msg);
+
+                        return;
+                    }
+
+                    finishState0 = finishState;
+
+                    break;
+                }
+
+                case CRD:
+                case BECOME_CRD: {
+                    log.info("Ignore partitions request, node is coordinator: " + msg);
+
+                    return;
+                }
+
+                case CLIENT:
+                case SRV: {
+                    if (!cctx.discovery().alive(node)) {
+                        log.info("Ignore partitions request, node is not alive [node=" + node.id() + ']');
+
+                        return;
+                    }
+
+                    if (msg.restoreState()) {
+                        if (!node.equals(crd)) {
+                            if (node.order() > crd.order()) {
+                                log.info("Received partitions request, change coordinator [oldCrd=" + crd.id() +
+                                    ", newCrd=" + node.id() + ']');
+
+                                crd = node; // Do not allow to process FullMessage from old coordinator.
+                            }
+                            else {
+                                log.info("Ignore restore state request, coordinator changed [oldCrd=" + crd.id() +
+                                    ", newCrd=" + node.id() + ']');
+
+                                return;
+                            }
+                        }
+                    }
+
+                    break;
+                }
+
+                default:
+                    assert false : state;
+            }
+        }
+
+        if (msg.restoreState()) {
+            try {
+                assert msg.restoreExchangeId() != null : msg;
+
+                GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage(
+                    msg.restoreExchangeId(),
+                    cctx.kernalContext().clientNode(),
+                    true);
+
+                if (localJoinExchange() && finishState0 == null)
+                    res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
+
+                res.restoreState(true);
+
+                log.info("Send restore state response [node=" + node.id() +
+                    ", exchVer=" + msg.restoreExchangeId().topologyVersion() +
+                    ", hasState=" + (finishState0 != null) +
+                    ", affReq=" + !F.isEmpty(res.cacheGroupsAffinityRequest()) + ']');
+
+                res.finishMessage(finishState0 != null ? finishState0.msg : null);
+
+                cctx.io().send(node, res, SYSTEM_POOL);
+            }
+            catch (ClusterTopologyCheckedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']');
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send partitions message [node=" + node + ", msg=" + msg + ']', e);
+            }
+
+            return;
+        }
+
+        try {
+            sendLocalPartitions(node);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send message to coordinator: " + e);
+        }
+    }
+
+    /**
+     * @param node Sender node.
+     * @param msg Message.
      */
-    private void sendAllPartitions(final UUID nodeId, final int retryCnt) {
-        ClusterNode n = cctx.node(nodeId);
-
+    private void processFullMessage(boolean checkCrd, ClusterNode node, GridDhtPartitionsFullMessage msg) {
         try {
-            if (n != null)
-                sendAllPartitions(F.asList(n));
-        }
-        catch (IgniteCheckedException e) {
-            if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send full partition map to node, node left grid " +
-                        "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+            assert exchId.equals(msg.exchangeId()) : msg;
+            assert msg.lastVersion() != null : msg;
 
-                return;
-            }
+            if (checkCrd) {
+                assert node != null;
 
-            if (reconnectOnError(e)) {
-                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+                synchronized (mux) {
+                    if (crd == null) {
+                        log.info("Ignore full message, all server nodes left: " + msg);
 
-                return;
-            }
+                        return;
+                    }
 
-            if (retryCnt > 0) {
-                long timeout = cctx.gridConfig().getNetworkSendRetryDelay();
+                    switch (state) {
+                        case CRD:
+                        case BECOME_CRD: {
+                            log.info("Ignore full message, node is coordinator: " + msg);
 
-                LT.error(log, e, "Failed to send full partition map to node (will retry after timeout) " +
-                    "[node=" + nodeId + ", exchangeId=" + exchId + ", timeout=" + timeout + ']');
+                            return;
+                        }
 
-                cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(timeout) {
-                    @Override public void onTimeout() {
-                        sendAllPartitions(nodeId, retryCnt - 1);
-                    }
-                });
-            }
-            else
-                U.error(log, "Failed to send full partition map [node=" + n + ", exchangeId=" + exchId + ']', e);
-        }
-    }
+                        case DONE: {
+                            log.info("Ignore full message, future is done: " + msg);
 
-    /**
-     * @param node Sender node.
-     * @param msg Full partition info.
-     */
-    public void onReceive(final ClusterNode node, final GridDhtPartitionsFullMessage msg) {
-        assert msg != null;
+                            return;
+                        }
 
-        final UUID nodeId = node.id();
+                        case SRV:
+                        case CLIENT: {
+                            if (!crd.equals(node)) {
+                                log.info("Received full message from non-coordinator [node=" + node.id() +
+                                    ", nodeOrder=" + node.order() +
+                                    ", crd=" + crd.id() +
+                                    ", crdOrder=" + crd.order() + ']');
 
-        if (isDone()) {
-            if (log.isDebugEnabled())
-                log.debug("Received message for finished future [msg=" + msg + ", fut=" + this + ']');
+                                if (node.order() > crd.order())
+                                    fullMsgs.put(node, msg);
 
-            return;
-        }
+                                return;
+                            }
+                            else {
+                                AffinityTopologyVersion resVer = msg.resultTopologyVersion() != null ? msg.resultTopologyVersion() : initialVersion();
 
-        if (log.isDebugEnabled())
-            log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
+                                log.info("Received full message, will finish exchange [node=" + node.id() +
+                                    ", resVer=" + resVer + ']');
 
-        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-            @Override public void apply(IgniteInternalFuture<Boolean> f) {
-                try {
-                    if (!f.get())
-                        return;
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to initialize exchange future: " + this, e);
+                                finishState = new FinishState(crd.id(), resVer, msg);
 
-                    return;
-                }
+                                state = ExchangeLocalState.DONE;
 
-                processMessage(node, msg);
+                                break;
+                            }
+                        }
+                    }
+                }
             }
-        });
-    }
+            else
+                assert node == null : node;
 
-    /**
-     * @param node Sender node.
-     * @param msg Message with full partition info.
-     */
-    private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
-        assert exchId.equals(msg.exchangeId()) : msg;
-        assert msg.lastVersion() != null : msg;
+            AffinityTopologyVersion resTopVer = initialVersion();
 
-        synchronized (this) {
-            if (crd == null)
-                return;
+            if (exchCtx.mergeExchanges()) {
+                if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) {
+                    log.info("Received full message, need merge [curFut=" + initialVersion() +
+                        ", resVer=" + msg.resultTopologyVersion() + ']');
 
-            if (!crd.equals(node)) {
-                if (log.isDebugEnabled())
-                    log.debug("Received full partition map from unexpected node [oldest=" + crd.id() +
-                        ", nodeId=" + node.id() + ']');
+                    resTopVer = msg.resultTopologyVersion();
 
-                if (node.order() > crd.order())
-                    fullMsgs.put(node, msg);
+                    if (cctx.exchange().mergeExchanges(this, msg)) {
+                        assert cctx.kernalContext().isStopping();
 
-                return;
+                        return; // Node is stopping, no need to further process exchange.
+                    }
+
+                    assert resTopVer.equals(exchCtx.events().topologyVersion()) :  "Unexpected result version [" +
+                        "msgVer=" + resTopVer +
+                        ", locVer=" + exchCtx.events().topologyVersion() + ']';
+                }
+
+                exchCtx.events().processEvents(this);
+
+                if (localJoinExchange())
+                    cctx.affinity().onLocalJoin(this, msg, resTopVer);
+                else {
+                    if (exchCtx.events().hasServerLeft())
+                        cctx.affinity().mergeExchangesOnServerLeft(this, msg);
+                    else
+                        cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, false);
+
+                    for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                        if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                            continue;
+
+                        grp.topology().beforeExchange(this, true, false);
+                    }
+                }
             }
-        }
+            else if (localJoinExchange() && !exchCtx.fetchAffinityOnJoin())
+                cctx.affinity().onLocalJoin(this, msg, resTopVer);
 
-        updatePartitionFullMap(msg);
+            updatePartitionFullMap(resTopVer, msg);
 
-        IgniteCheckedException err = null;
+            IgniteCheckedException err = null;
 
-        if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
-            err = new IgniteCheckedException("Cluster state change failed");
+            if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
+                err = new IgniteCheckedException("Cluster state change failed");
 
-            cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
- 

<TRUNCATED>

Mime
View raw message