ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [15/50] [abbrv] ignite git commit: ignite-324 Partition exchange: node should be assigned as primary only after preloading is finished Implemented 'late affinity assignment', also fixes: - fixed BinaryObject/BinaryReaderExImpl to properly handle case whe
Date Mon, 11 Apr 2016 14:25:42 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d52ad72..c879016 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -22,11 +22,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,8 +34,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
-import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -49,30 +46,30 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -108,38 +105,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /** */
     @GridToStringInclude
-    private final Collection<UUID> rcvdIds = new GridConcurrentHashSet<>();
+    private final Set<UUID> remaining = new HashSet<>();
 
-    /** Remote nodes. */
-    private volatile Collection<ClusterNode> rmtNodes;
-
-    /** Remote nodes. */
-    @GridToStringInclude
-    private volatile Collection<UUID> rmtIds;
+    /** */
+    private List<ClusterNode> srvNodes;
 
-    /** Oldest node. */
-    @GridToStringExclude
-    private final AtomicReference<ClusterNode> oldestNode = new AtomicReference<>();
+    /** */
+    private ClusterNode crd;
 
     /** ExchangeFuture id. */
     private final GridDhtPartitionExchangeId exchId;
 
-    /** Init flag. */
-    @GridToStringInclude
-    private final AtomicBoolean init = new AtomicBoolean(false);
-
-    /** Ready for reply flag. */
-    @GridToStringInclude
-    private final AtomicBoolean ready = new AtomicBoolean(false);
-
-    /** Replied flag. */
-    @GridToStringInclude
-    private final AtomicBoolean replied = new AtomicBoolean(false);
-
-    /** Timeout object. */
-    @GridToStringExclude
-    private volatile GridTimeoutObject timeoutObj;
-
     /** Cache context. */
     private final GridCacheSharedContext<?, ?> cctx;
 
@@ -156,6 +132,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** */
     private GridFutureAdapter<Boolean> initFut;
 
+    /** */
+    private final List<IgniteRunnable> discoEvts = new ArrayList<>();
+
+    /** */
+    private boolean init;
+
     /** Topology snapshot. */
     private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
 
@@ -166,10 +148,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * Messages received on non-coordinator are stored in case if this node
      * becomes coordinator.
      */
-    private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>();
+    private final Map<ClusterNode, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>();
 
     /** Messages received from new coordinator. */
-    private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>();
+    private final Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>();
 
     /** */
     @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
@@ -185,6 +167,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Dynamic cache change requests. */
     private Collection<DynamicCacheChangeRequest> reqs;
 
+    /** */
+    private CacheAffinityChangeMessage affChangeMsg;
+
     /** Cache validation results. */
     private volatile Map<Integer, Boolean> cacheValidRes;
 
@@ -197,6 +182,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Init timestamp. Used to track the amount of time spent to complete the future. */
     private long initTs;
 
+    /** */
+    private boolean centralizedAff;
+
     /**
      * Dummy future created to trigger reassignments if partition
      * topology changed while preloading.
@@ -250,12 +238,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
      * @param reqs Cache change requests.
+     * @param affChangeMsg Affinity change message.
      */
     public GridDhtPartitionsExchangeFuture(
         GridCacheSharedContext cctx,
         ReadWriteLock busyLock,
         GridDhtPartitionExchangeId exchId,
-        Collection<DynamicCacheChangeRequest> reqs
+        Collection<DynamicCacheChangeRequest> reqs,
+        CacheAffinityChangeMessage affChangeMsg
     ) {
         assert busyLock != null;
         assert exchId != null;
@@ -268,6 +258,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         this.busyLock = busyLock;
         this.exchId = exchId;
         this.reqs = reqs;
+        this.affChangeMsg = affChangeMsg;
 
         log = cctx.logger(getClass());
 
@@ -284,6 +275,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         this.reqs = reqs;
     }
 
+    /**
+     * @param affChangeMsg Affinity change message.
+     */
+    public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) {
+        this.affChangeMsg = affChangeMsg;
+    }
+
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return exchId.topologyVersion();
@@ -342,7 +340,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param cacheId Cache ID.
      * @return {@code True} if non-client cache was added during this exchange.
      */
-    private boolean cacheStarted(int cacheId) {
+    public boolean cacheStarted(int cacheId) {
         if (!F.isEmpty(reqs)) {
             for (DynamicCacheChangeRequest req : reqs) {
                 if (req.start() && !req.clientStartOnly()) {
@@ -356,87 +354,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param cacheId Cache ID.
-     * @return {@code True} if local client has been added.
-     */
-    public boolean isLocalClientAdded(int cacheId) {
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.start() && F.eq(req.initiatingNodeId(), cctx.localNodeId())) {
-                    if (CU.cacheId(req.cacheName()) == cacheId)
-                        return true;
-                }
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException {
-        if (stopping(cacheCtx.cacheId()))
-            return;
-
-        if (canCalculateAffinity(cacheCtx)) {
-            if (log.isDebugEnabled())
-                log.debug("Will recalculate affinity [locNodeId=" + cctx.localNodeId() + ", exchId=" + exchId + ']');
-
-            cacheCtx.affinity().calculateAffinity(exchId.topologyVersion(), discoEvt);
-        }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Will request affinity from remote node [locNodeId=" + cctx.localNodeId() + ", exchId=" +
-                    exchId + ']');
-
-            // Fetch affinity assignment from remote node.
-            GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cacheCtx,
-                exchId.topologyVersion(),
-                CU.affinityNodes(cacheCtx, exchId.topologyVersion()));
-
-            fetchFut.init();
-
-            List<List<ClusterNode>> affAssignment = fetchFut.get();
-
-            if (log.isDebugEnabled())
-                log.debug("Fetched affinity from remote node, initializing affinity assignment [locNodeId=" +
-                    cctx.localNodeId() + ", topVer=" + exchId.topologyVersion() + ']');
-
-            if (affAssignment == null) {
-                affAssignment = new ArrayList<>(cacheCtx.affinity().partitions());
-
-                List<ClusterNode> empty = Collections.emptyList();
-
-                for (int i = 0; i < cacheCtx.affinity().partitions(); i++)
-                    affAssignment.add(empty);
-            }
-
-            cacheCtx.affinity().initializeAffinity(exchId.topologyVersion(), affAssignment);
-        }
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
-     */
-    private boolean canCalculateAffinity(GridCacheContext cacheCtx) {
-        AffinityFunction affFunc = cacheCtx.config().getAffinity();
-
-        // Do not request affinity from remote nodes if affinity function is not centralized.
-        if (!U.hasAnnotation(affFunc, AffinityCentralizedFunction.class))
-            return true;
-
-        // If local node did not initiate exchange or local node is the only cache node in grid.
-        Collection<ClusterNode> affNodes = CU.affinityNodes(cacheCtx, exchId.topologyVersion());
-
-        return cacheStarted(cacheCtx.cacheId()) ||
-            !exchId.nodeId().equals(cctx.localNodeId()) ||
-            (affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
-    }
-
-    /**
      * @return {@code True}
      */
     public boolean onAdded() {
@@ -500,427 +417,469 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (isDone())
             return;
 
-        if (init.compareAndSet(false, true)) {
-            if (isDone())
-                return;
+        initTs = U.currentTimeMillis();
 
-            initTs = U.currentTimeMillis();
+        U.await(evtLatch);
 
-            try {
-                // Wait for event to occur to make sure that discovery
-                // will return corresponding nodes.
-                U.await(evtLatch);
+        assert discoEvt != null : this;
+        assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
+        assert !dummy && !forcePreload : this;
 
-                assert discoEvt != null : this;
-                assert !dummy && !forcePreload : this;
+        try {
+            srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
 
-                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+            remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
 
-                oldestNode.set(oldest);
+            crd = srvNodes.isEmpty() ? null : srvNodes.get(0);
 
-                if (!F.isEmpty(reqs))
-                    blockGateways();
+            boolean crdNode = crd != null && crd.isLocal();
 
-                startCaches();
+            skipPreload = cctx.kernalContext().clientNode();
 
-                // True if client node joined or failed.
-                boolean clientNodeEvt;
+            ExchangeType exchange;
 
-                if (F.isEmpty(reqs)) {
-                    int type = discoEvt.type();
+            Collection<DynamicCacheDescriptor> receivedCaches;
 
-                    assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt;
+            if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+                if (!F.isEmpty(reqs))
+                    exchange = onCacheChangeRequest(crdNode);
+                else {
+                    assert affChangeMsg != null : this;
 
-                    clientNodeEvt = CU.clientNode(discoEvt.eventNode());
+                    exchange = onAffinityChangeRequest(crdNode);
                 }
-                else {
-                    assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+            }
+            else {
+                if (discoEvt.type() == EVT_NODE_JOINED) {
+                    receivedCaches = cctx.cache().startReceivedCaches(topologyVersion());
 
-                    boolean clientOnlyCacheEvt = true;
+                    if (!discoEvt.eventNode().isLocal())
+                        cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
+                }
 
-                    for (DynamicCacheChangeRequest req : reqs) {
-                        if (req.clientStartOnly() || req.close())
-                            continue;
+                if (CU.clientNode(discoEvt.eventNode()))
+                    exchange = onClientNodeEvent(crdNode);
+                else
+                    exchange = onServerNodeEvent(crdNode);
+            }
 
-                        clientOnlyCacheEvt = false;
+            updateTopologies(crdNode);
 
-                        break;
-                    }
+            switch (exchange) {
+                case ALL: {
+                    distributedExchange();
 
-                    clientNodeEvt = clientOnlyCacheEvt;
+                    break;
                 }
 
-                if (clientNodeEvt) {
-                    ClusterNode node = discoEvt.eventNode();
+                case CLIENT: {
+                    initTopologies();
 
-                    // Client need to initialize affinity for local join event or for stated client caches.
-                    if (!node.isLocal() || clientCacheClose()) {
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                            if (cacheCtx.isLocal())
-                                continue;
+                    clientOnlyExchange();
 
-                            GridDhtPartitionTopology top = cacheCtx.topology();
+                    break;
+                }
 
-                            top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+                case NONE: {
+                    initTopologies();
 
-                            if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) {
-                                initTopology(cacheCtx);
+                    onDone(topologyVersion());
 
-                                top.beforeExchange(this);
-                            }
-                            else
-                                cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
+                    break;
+                }
 
-                            if (!exchId.isJoined())
-                                cacheCtx.preloader().unwindUndeploys();
-                        }
+                default:
+                    assert false;
+            }
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            onDone(e);
 
-                        if (exchId.isLeft())
-                            cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+            throw e;
+        }
+        catch (Throwable e) {
+            U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
 
-                        rmtIds = Collections.emptyList();
-                        rmtNodes = Collections.emptyList();
+            onDone(e);
 
-                        onDone(exchId.topologyVersion());
+            if (e instanceof Error)
+                throw (Error)e;
+        }
+    }
 
-                        skipPreload = cctx.kernalContext().clientNode();
+    /**
+      * @throws IgniteCheckedException If failed.
+     */
+    private void initTopologies() throws IgniteCheckedException {
+        if (crd != null) {
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (cacheCtx.isLocal())
+                    continue;
 
-                        return;
-                    }
-                }
+                cacheCtx.topology().beforeExchange(this, !centralizedAff);
+            }
+        }
+    }
 
-                clientOnlyExchange = clientNodeEvt || cctx.kernalContext().clientNode();
+    /**
+     * @param crd Coordinator flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void updateTopologies(boolean crd) throws IgniteCheckedException {
+        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+            if (cacheCtx.isLocal())
+                continue;
 
-                if (clientOnlyExchange) {
-                    skipPreload = cctx.kernalContext().clientNode();
+            GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(cacheCtx.cacheId());
 
-                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                        if (cacheCtx.isLocal())
-                            continue;
+            long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence();
 
-                        GridDhtPartitionTopology top = cacheCtx.topology();
+            GridDhtPartitionTopology top = cacheCtx.topology();
 
-                        top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
-                    }
+            if (crd) {
+                boolean updateTop = !cacheCtx.isLocal() &&
+                    exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
-                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                        if (cacheCtx.isLocal())
-                            continue;
+                if (updateTop && clientTop != null)
+                    cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters());
+            }
 
-                        initTopology(cacheCtx);
-                    }
+            top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
+        }
 
-                    if (oldest != null) {
-                        rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
-                            exchId.topologyVersion()));
+        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+            top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId()));
+    }
 
-                        rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
+    /**
+     * @param crd Coordinator flag.
+     * @throws IgniteCheckedException If failed.
+     * @return Exchange type.
+     */
+    private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
+        assert !F.isEmpty(reqs) : this;
 
-                        initFut.onDone(true);
+        boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Initialized future: " + this);
+        if (clientOnly) {
+            boolean clientCacheStarted = false;
 
-                        if (cctx.localNode().equals(oldest)) {
-                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                boolean updateTop = !cacheCtx.isLocal() &&
-                                    exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+            for (DynamicCacheChangeRequest req : reqs) {
+                if (req.start() && req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId())) {
+                    clientCacheStarted = true;
 
-                                if (updateTop) {
-                                    for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-                                        if (top.cacheId() == cacheCtx.cacheId()) {
-                                            cacheCtx.topology().update(exchId,
-                                                top.partitionMap(true),
-                                                top.updateCounters());
+                    break;
+                }
+            }
 
-                                            break;
-                                        }
-                                    }
+            if (clientCacheStarted)
+                return ExchangeType.CLIENT;
+            else
+                return ExchangeType.NONE;
+        }
+        else
+            return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
+    }
 
-                                }
-                            }
+    /**
+     * @param crd Coordinator flag.
+     * @throws IgniteCheckedException If failed.
+     * @return Exchange type.
+     */
+    private ExchangeType onAffinityChangeRequest(boolean crd) throws IgniteCheckedException {
+        assert affChangeMsg != null : this;
 
-                            onDone(exchId.topologyVersion());
-                        }
-                        else
-                            sendPartitions(oldest);
-                    }
-                    else {
-                        rmtIds = Collections.emptyList();
-                        rmtNodes = Collections.emptyList();
+        cctx.affinity().onChangeAffinityMessage(this, crd, affChangeMsg);
 
-                        onDone(exchId.topologyVersion());
-                    }
+        if (cctx.kernalContext().clientNode())
+            return ExchangeType.CLIENT;
 
-                    return;
-                }
+        return ExchangeType.ALL;
+    }
 
-                assert oldestNode.get() != null;
+    /**
+     * @param crd Coordinator flag.
+     * @throws IgniteCheckedException If failed.
+     * @return Exchange type.
+     */
+    private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedException {
+        assert CU.clientNode(discoEvt.eventNode()) : this;
 
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) {
-                        if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty())
-                            U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex());
-                    }
+        if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) {
+            onLeft();
 
-                    cacheCtx.preloader().onExchangeFutureAdded();
-                }
+            assert !discoEvt.eventNode().isLocal() : discoEvt;
+        }
+        else
+            assert discoEvt.type() == EVT_NODE_JOINED : discoEvt;
 
-                List<String> cachesWithoutNodes = null;
-
-                if (exchId.isLeft()) {
-                    for (String name : cctx.cache().cacheNames()) {
-                        if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
-                            if (cachesWithoutNodes == null)
-                                cachesWithoutNodes = new ArrayList<>();
-
-                            cachesWithoutNodes.add(name);
-
-                            // Fire event even if there is no client cache started.
-                            if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
-                                Event evt = new CacheEvent(
-                                    name,
-                                    cctx.localNode(),
-                                    cctx.localNode(),
-                                    "All server nodes have left the cluster.",
-                                    EventType.EVT_CACHE_NODES_LEFT,
-                                    0,
-                                    false,
-                                    null,
-                                    null,
-                                    null,
-                                    null,
-                                    false,
-                                    null,
-                                    false,
-                                    null,
-                                    null,
-                                    null
-                                );
-
-                                cctx.gridEvents().record(evt);
-                            }
-                        }
-                    }
-                }
+        cctx.affinity().onClientEvent(this, crd);
 
-                if (cachesWithoutNodes != null) {
-                    StringBuilder sb =
-                        new StringBuilder("All server nodes for the following caches have left the cluster: ");
+        if (discoEvt.eventNode().isLocal())
+            return ExchangeType.CLIENT;
+        else
+            return ExchangeType.NONE;
+    }
 
-                    for (int i = 0; i < cachesWithoutNodes.size(); i++) {
-                        String cache = cachesWithoutNodes.get(i);
+    /**
+     * @param crd Coordinator flag.
+     * @throws IgniteCheckedException If failed.
+     * @return Exchange type.
+     */
+    private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedException {
+        assert !CU.clientNode(discoEvt.eventNode()) : this;
 
-                        sb.append('\'').append(cache).append('\'');
+        if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) {
+            onLeft();
 
-                        if (i != cachesWithoutNodes.size() - 1)
-                            sb.append(", ");
-                    }
+            warnNoAffinityNodes();
 
-                    U.quietAndWarn(log, sb.toString());
+            centralizedAff = cctx.affinity().onServerLeft(this);
+        }
+        else {
+            assert discoEvt.type() == EVT_NODE_JOINED : discoEvt;
 
-                    U.quietAndWarn(log, "Must have server nodes for caches to operate.");
-                }
+            cctx.affinity().onServerJoin(this, crd);
+        }
 
-                assert discoEvt != null;
+        if (cctx.kernalContext().clientNode())
+            return ExchangeType.CLIENT;
+        else
+            return ExchangeType.ALL;
+    }
 
-                assert exchId.nodeId().equals(discoEvt.eventNode().id());
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void clientOnlyExchange() throws IgniteCheckedException {
+        clientOnlyExchange = true;
 
+        if (crd != null) {
+            if (crd.isLocal()) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(
-                        cacheCtx.cacheId());
+                    boolean updateTop = !cacheCtx.isLocal() &&
+                        exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
-                    long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence();
+                    if (updateTop) {
+                        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
+                            if (top.cacheId() == cacheCtx.cacheId()) {
+                                cacheCtx.topology().update(exchId,
+                                    top.partitionMap(true),
+                                    top.updateCounters());
 
-                    // Update before waiting for locks.
-                    if (!cacheCtx.isLocal())
-                        cacheCtx.topology().updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
+                                break;
+                            }
+                        }
+                    }
                 }
+            }
+            else {
+                if (!centralizedAff)
+                    sendLocalPartitions(crd, exchId);
 
-                // Grab all alive remote nodes with order of equal or less than last joined node.
-                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
-                    exchId.topologyVersion()));
-
-                rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
-
-                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
-                    // If received any messages, process them.
-                    onReceive(m.getKey(), m.getValue());
-
-                for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
-                    // If received any messages, process them.
-                    onReceive(m.getKey(), m.getValue());
-
-                AffinityTopologyVersion topVer = exchId.topologyVersion();
+                initDone();
 
+                return;
+            }
+        }
+        else {
+            if (centralizedAff) { // Last server node failed.
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (cacheCtx.isLocal())
-                        continue;
+                    GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
 
-                    // Must initialize topology after we get discovery event.
-                    initTopology(cacheCtx);
+                    aff.initialize(topologyVersion(), aff.idealAssignment());
+                }
+            }
+        }
 
-                    cacheCtx.preloader().onTopologyChanged(exchId.topologyVersion());
+        onDone(topologyVersion());
+    }
 
-                    cacheCtx.preloader().updateLastExchangeFuture(this);
-                }
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void distributedExchange() throws IgniteCheckedException {
+        assert crd != null;
 
-                IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(topVer);
+        assert !cctx.kernalContext().clientNode();
 
-                // Assign to class variable so it will be included into toString() method.
-                this.partReleaseFut = partReleaseFut;
+        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+            if (cacheCtx.isLocal())
+                continue;
 
-                if (exchId.isLeft())
-                    cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+            cacheCtx.preloader().onTopologyChanged(this);
+        }
 
-                if (log.isDebugEnabled())
-                    log.debug("Before waiting for partition release future: " + this);
+        waitPartitionRelease();
 
-                int dumpedObjects = 0;
+        boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
 
-                while (true) {
-                    try {
-                        partReleaseFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+            if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
+                continue;
 
-                        break;
-                    }
-                    catch (IgniteFutureTimeoutCheckedException ignored) {
-                        // Print pending transactions and locks that might have led to hang.
-                        if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
-                            dumpPendingObjects();
+            if (topChanged) {
+                cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
 
-                            dumpedObjects++;
-                        }
-                    }
-                }
+                // Partition release future is done so we can flush the write-behind store.
+                cacheCtx.store().forceFlush();
+            }
 
-                if (log.isDebugEnabled())
-                    log.debug("After waiting for partition release future: " + this);
+            cacheCtx.topology().beforeExchange(this, !centralizedAff);
+        }
 
-                IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion());
+        if (crd.isLocal()) {
+            if (remaining.isEmpty())
+                onAllReceived(false);
+        }
+        else
+            sendPartitions(crd);
 
-                dumpedObjects = 0;
+        initDone();
+    }
 
-                while (true) {
-                    try {
-                        locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void waitPartitionRelease() throws IgniteCheckedException {
+        IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(topologyVersion());
 
-                        break;
-                    }
-                    catch (IgniteFutureTimeoutCheckedException ignored) {
-                        if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
-                            U.warn(log, "Failed to wait for locks release future. " +
-                                "Dumping pending objects that might be the cause: " + cctx.localNodeId());
+        // Assign to class variable so it will be included into toString() method.
+        this.partReleaseFut = partReleaseFut;
 
-                            U.warn(log, "Locked keys:");
+        if (exchId.isLeft())
+            cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
 
-                            for (IgniteTxKey key : cctx.mvcc().lockedKeys())
-                                U.warn(log, "Locked key: " + key);
+        if (log.isDebugEnabled())
+            log.debug("Before waiting for partition release future: " + this);
 
-                            for (IgniteTxKey key : cctx.mvcc().nearLockedKeys())
-                                U.warn(log, "Locked near key: " + key);
+        int dumpedObjects = 0;
 
-                            Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
-                                cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
+        while (true) {
+            try {
+                partReleaseFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
 
-                            for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
-                                U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
+                break;
+            }
+            catch (IgniteFutureTimeoutCheckedException ignored) {
+                // Print pending transactions and locks that might have led to hang.
+                if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
+                    dumpPendingObjects();
 
-                            dumpedObjects++;
-                        }
-                    }
+                    dumpedObjects++;
                 }
+            }
+        }
 
-                boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT;
-
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (cacheCtx.isLocal())
-                        continue;
+        if (log.isDebugEnabled())
+            log.debug("After waiting for partition release future: " + this);
 
-                    // Notify replication manager.
-                    GridCacheContext drCacheCtx = cacheCtx.isNear() ? cacheCtx.near().dht().context() : cacheCtx;
+        IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion());
 
-                    if (drCacheCtx.isDrEnabled())
-                        drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft());
+        dumpedObjects = 0;
 
-                    if (topChanged)
-                        cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+        while (true) {
+            try {
+                locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
 
-                    // Partition release future is done so we can flush the write-behind store.
-                    cacheCtx.store().forceFlush();
+                break;
+            }
+            catch (IgniteFutureTimeoutCheckedException ignored) {
+                if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
+                    U.warn(log, "Failed to wait for locks release future. " +
+                        "Dumping pending objects that might be the cause: " + cctx.localNodeId());
 
-                    if (!exchId.isJoined())
-                        // Process queued undeploys prior to sending/spreading map.
-                        cacheCtx.preloader().unwindUndeploys();
+                    U.warn(log, "Locked keys:");
 
-                    GridDhtPartitionTopology top = cacheCtx.topology();
+                    for (IgniteTxKey key : cctx.mvcc().lockedKeys())
+                        U.warn(log, "Locked key: " + key);
 
-                    assert topVer.equals(top.topologyVersion()) :
-                        "Topology version is updated only in this class instances inside single ExchangeWorker thread.";
+                    for (IgniteTxKey key : cctx.mvcc().nearLockedKeys())
+                        U.warn(log, "Locked near key: " + key);
 
-                    top.beforeExchange(this);
-                }
+                    Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
+                        cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
 
-                for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-                    top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId()));
+                    for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
+                        U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
 
-                    top.beforeExchange(this);
+                    dumpedObjects++;
                 }
             }
-            catch (IgniteInterruptedCheckedException e) {
-                onDone(e);
-
-                throw e;
-            }
-            catch (Throwable e) {
-                U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
-
-                onDone(e);
+        }
+    }
 
-                if (e instanceof Error)
-                    throw (Error)e;
+    /**
+     *
+     */
+    private void onLeft() {
+        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+            if (cacheCtx.isLocal())
+                continue;
 
-                return;
-            }
+            cacheCtx.preloader().unwindUndeploys();
+        }
 
-            if (F.isEmpty(rmtIds)) {
-                onDone(exchId.topologyVersion());
+        cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+    }
 
-                return;
+    /**
+     *
+     */
+    private void warnNoAffinityNodes() {
+        List<String> cachesWithoutNodes = null;
+
+        for (String name : cctx.cache().cacheNames()) {
+            if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
+                if (cachesWithoutNodes == null)
+                    cachesWithoutNodes = new ArrayList<>();
+
+                cachesWithoutNodes.add(name);
+
+                // Fire event even if there is no client cache started.
+                if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
+                    Event evt = new CacheEvent(
+                        name,
+                        cctx.localNode(),
+                        cctx.localNode(),
+                        "All server nodes have left the cluster.",
+                        EventType.EVT_CACHE_NODES_LEFT,
+                        0,
+                        false,
+                        null,
+                        null,
+                        null,
+                        null,
+                        false,
+                        null,
+                        false,
+                        null,
+                        null,
+                        null
+                    );
+
+                    cctx.gridEvents().record(evt);
+                }
             }
+        }
 
-            ready.set(true);
-
-            initFut.onDone(true);
-
-            if (log.isDebugEnabled())
-                log.debug("Initialized future: " + this);
+        if (cachesWithoutNodes != null) {
+            StringBuilder sb =
+                new StringBuilder("All server nodes for the following caches have left the cluster: ");
 
-            ClusterNode oldest = oldestNode.get();
+            for (int i = 0; i < cachesWithoutNodes.size(); i++) {
+                String cache = cachesWithoutNodes.get(i);
 
-            // If this node is not oldest.
-            if (!oldest.id().equals(cctx.localNodeId()))
-                sendPartitions(oldest);
-            else {
-                boolean allReceived = allReceived();
+                sb.append('\'').append(cache).append('\'');
 
-                if (allReceived && replied.compareAndSet(false, true)) {
-                    if (spreadPartitions())
-                        onDone(exchId.topologyVersion());
-                }
+                if (i != cachesWithoutNodes.size() - 1)
+                    sb.append(", ");
             }
 
-            scheduleRecheck();
-        }
-        else
-            assert false : "Skipped init future: " + this;
-    }
+            U.quietAndWarn(log, sb.toString());
 
-    /**
-     * @return {@code True} if exchange initiated for client cache close.
-     */
-    private boolean clientCacheClose() {
-        return reqs != null && reqs.size() == 1 && reqs.iterator().next().close();
+            U.quietAndWarn(log, "Must have server nodes for caches to operate.");
+        }
     }
 
     /**
@@ -930,14 +889,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         U.warn(log, "Failed to wait for partition release future [topVer=" + topologyVersion() +
             ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: ");
 
-        cctx.exchange().dumpPendingObjects();
+        cctx.exchange().dumpPendingObjects(topologyVersion());
     }
 
     /**
      * @param cacheId Cache ID to check.
      * @return {@code True} if cache is stopping by this exchange.
      */
-    private boolean stopping(int cacheId) {
+    public boolean stopping(int cacheId) {
         boolean stopping = false;
 
         if (!F.isEmpty(reqs)) {
@@ -954,30 +913,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * Starts dynamic caches.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void startCaches() throws IgniteCheckedException {
-        cctx.cache().prepareCachesStart(F.view(reqs, new IgnitePredicate<DynamicCacheChangeRequest>() {
-            @Override public boolean apply(DynamicCacheChangeRequest req) {
-                return req.start();
-            }
-        }), exchId.topologyVersion());
-    }
-
-    /**
-     *
-     */
-    private void blockGateways() {
-        for (DynamicCacheChangeRequest req : reqs) {
-            if (req.stop() || req.close())
-                cctx.cache().blockGateway(req);
-        }
-    }
-
-    /**
-     * @param node Node.
-     * @param id ID.
+     * @param node Node.
+     * @param id ID.
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
@@ -1002,32 +939,40 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
 
-        cctx.io().send(node, m, SYSTEM_POOL);
+        try {
+            cctx.io().send(node, m, SYSTEM_POOL);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']');
+        }
     }
 
     /**
-     * @param nodes Nodes.
-     * @param id ID.
-     * @throws IgniteCheckedException If failed.
+     * @param nodes Target nodes.
+     * @return Message;
      */
-    private void sendAllPartitions(Collection<? extends ClusterNode> nodes, GridDhtPartitionExchangeId id)
-        throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(id,
-            lastVer.get(),
-            id.topologyVersion());
+    private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes) {
+        GridCacheVersion last = lastVer.get();
+
+        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(),
+            last != null ? last : cctx.versions().last(),
+            topologyVersion());
 
         boolean useOldApi = false;
 
-        for (ClusterNode node : nodes) {
-            if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
-                useOldApi = true;
+        if (nodes != null) {
+            for (ClusterNode node : nodes) {
+                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+                    useOldApi = true;
+            }
         }
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
                 AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
 
-                boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
+                boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion()) <= 0;
 
                 if (ready) {
                     GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
@@ -1049,6 +994,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
         }
 
+        return m;
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes);
+
         if (log.isDebugEnabled())
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
                 ", exchId=" + exchId + ", msg=" + m + ']');
@@ -1069,65 +1024,65 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     ", exchId=" + exchId + ']');
         }
         catch (IgniteCheckedException e) {
-            scheduleRecheck();
-
             U.error(log, "Failed to send local partitions to oldest node (will retry after timeout) [oldestNodeId=" +
                 oldestNode.id() + ", exchId=" + exchId + ']', e);
         }
     }
 
-    /**
-     * @return {@code True} if succeeded.
-     */
-    private boolean spreadPartitions() {
-        try {
-            sendAllPartitions(rmtNodes, exchId);
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) {
+        boolean realExchange = !dummy && !forcePreload;
 
-            return true;
-        }
-        catch (IgniteCheckedException e) {
-            scheduleRecheck();
+        if (err == null && realExchange) {
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (cacheCtx.isLocal())
+                    continue;
 
-            if (!X.hasCause(e, InterruptedException.class))
-                U.error(log, "Failed to send full partition map to nodes (will retry after timeout) [nodes=" +
-                    F.nodeId8s(rmtNodes) + ", exchangeId=" + exchId + ']', e);
+                try {
+                    if (centralizedAff)
+                        cacheCtx.topology().initPartitions(this);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    U.error(log, "Failed to initialize partitions.", e);
+                }
 
-            return false;
-        }
-    }
+                GridCacheContext drCacheCtx = cacheCtx.isNear() ? cacheCtx.near().dht().context() : cacheCtx;
 
-    /** {@inheritDoc} */
-    @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) {
-        Map<Integer, Boolean> m = null;
+                if (drCacheCtx.isDrEnabled()) {
+                    try {
+                        drCacheCtx.dr().onExchange(topologyVersion(), exchId.isLeft());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to notify DR: " + e, e);
+                    }
+                }
+            }
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) {
-                if (m == null)
-                    m = new HashMap<>();
+            Map<Integer, Boolean> m = null;
+
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) {
+                    if (m == null)
+                        m = new HashMap<>();
 
-                m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()));
+                    m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()));
+                }
             }
-        }
 
-        cacheValidRes = m != null ? m : Collections.<Integer, Boolean>emptyMap();
+            cacheValidRes = m != null ? m : Collections.<Integer, Boolean>emptyMap();
+        }
 
         cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
 
         cctx.exchange().onExchangeDone(this, err);
 
-        if (super.onDone(res, err) && !dummy && !forcePreload) {
+        if (super.onDone(res, err) && realExchange) {
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
                     "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
 
             initFut.onDone(err == null);
 
-            GridTimeoutObject timeoutObj = this.timeoutObj;
-
-            // Deschedule timeout object.
-            if (timeoutObj != null)
-                cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj);
-
             if (exchId.isLeft()) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts())
                     cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
@@ -1170,56 +1125,47 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         topSnapshot.set(null);
         singleMsgs.clear();
         fullMsgs.clear();
-        rcvdIds.clear();
-        oldestNode.set(null);
+        crd = null;
         partReleaseFut = null;
-
-        Collection<ClusterNode> rmtNodes = this.rmtNodes;
-
-        if (rmtNodes != null)
-            rmtNodes.clear();
     }
 
     /**
-     * @return {@code True} if all replies are received.
+     * @param ver Version.
      */
-    private boolean allReceived() {
-        Collection<UUID> rmtIds = this.rmtIds;
+    private void updateLastVersion(GridCacheVersion ver) {
+        assert ver != null;
 
-        assert rmtIds != null : "Remote Ids can't be null: " + this;
+        while (true) {
+            GridCacheVersion old = lastVer.get();
 
-        synchronized (rcvdIds) {
-            return rcvdIds.containsAll(rmtIds);
+            if (old == null || Long.compare(old.order(), ver.order()) < 0) {
+                if (lastVer.compareAndSet(old, ver))
+                    break;
+            }
+            else
+                break;
         }
     }
 
     /**
-     * @param nodeId Sender node id.
+     * @param node Sender node.
      * @param msg Single partition info.
      */
-    public void onReceive(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) {
+    public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
         assert msg != null;
+        assert msg.exchangeId().equals(exchId) : msg;
+        assert msg.lastVersion() != null : msg;
 
-        assert msg.exchangeId().equals(exchId);
-
-        // Update last seen version.
-        while (true) {
-            GridCacheVersion old = lastVer.get();
-
-            if (old == null || old.compareTo(msg.lastVersion()) < 0) {
-                if (lastVer.compareAndSet(old, msg.lastVersion()))
-                    break;
-            }
-            else
-                break;
-        }
+        if (!msg.client())
+            updateLastVersion(msg.lastVersion());
 
         if (isDone()) {
             if (log.isDebugEnabled())
                 log.debug("Received message for finished future (will reply only to sender) [msg=" + msg +
                     ", fut=" + this + ']');
 
-            sendAllPartitions(nodeId, cctx.gridConfig().getNetworkSendRetryCount());
+            if (!centralizedAff)
+                sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount());
         }
         else {
             initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@@ -1234,46 +1180,108 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                         return;
                     }
 
-                    ClusterNode loc = cctx.localNode();
+                    processMessage(node, msg);
+                }
+            });
+        }
+    }
 
-                    singleMsgs.put(nodeId, msg);
+    /**
+     * @param node Sender node.
+     * @param msg Message.
+     */
+    private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+        boolean allReceived = false;
 
-                    boolean match = true;
+        synchronized (mux) {
+            assert crd != null;
 
-                    // Check if oldest node has changed.
-                    if (!oldestNode.get().equals(loc)) {
-                        match = false;
+            if (crd.isLocal()) {
+                if (remaining.remove(node.id())) {
+                    updatePartitionSingleMap(msg);
 
-                        synchronized (mux) {
-                            // Double check.
-                            if (oldestNode.get().equals(loc))
-                                match = true;
-                        }
-                    }
+                    allReceived = remaining.isEmpty();
+                }
+            }
+            else
+                singleMsgs.put(node, msg);
+        }
+
+        if (allReceived)
+            onAllReceived(false);
+    }
 
-                    if (match) {
-                        boolean allReceived;
+    /**
+     * @param fut Affinity future.
+     */
+    private void onAffinityInitialized(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
+        try {
+            assert fut.isDone();
 
-                        synchronized (rcvdIds) {
-                            if (rcvdIds.add(nodeId))
-                                updatePartitionSingleMap(msg);
+            Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
-                            allReceived = allReceived();
-                        }
+            GridDhtPartitionsFullMessage m = createPartitionsMessage(null);
 
-                        // If got all replies, and initialization finished, and reply has not been sent yet.
-                        if (allReceived && ready.get() && replied.compareAndSet(false, true)) {
-                            spreadPartitions();
+            CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
 
-                            onDone(exchId.topologyVersion());
+            if (log.isDebugEnabled())
+                log.debug("Centralized affinity exchange, send affinity change message: " + msg);
+
+            cctx.discovery().sendCustomEvent(msg);
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
+        }
+    }
+
+    /**
+     * @param discoThread If {@code true} completes future from another thread (to do not block discovery thread).
+     */
+    private void onAllReceived(boolean discoThread) {
+        try {
+            assert crd.isLocal();
+
+            if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    if (!cacheCtx.isLocal())
+                        cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff);
+                }
+            }
+
+            updateLastVersion(cctx.versions().last());
+
+            cctx.versions().onExchange(lastVer.get().order());
+
+            if (centralizedAff) {
+                IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut = cctx.affinity().initAffinityOnNodeLeft(this);
+
+                if (!fut.isDone()) {
+                    fut.listen(new IgniteInClosure<IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>>>() {
+                        @Override public void apply(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
+                            onAffinityInitialized(fut);
                         }
-                        else if (log.isDebugEnabled())
-                            log.debug("Exchange future full map is not sent [allReceived=" + allReceived() +
-                                ", ready=" + ready + ", replied=" + replied.get() + ", init=" + init.get() +
-                                ", fut=" + GridDhtPartitionsExchangeFuture.this + ']');
-                    }
+                    });
                 }
-            });
+                else
+                    onAffinityInitialized(fut);
+            }
+            else {
+                List<ClusterNode> nodes;
+
+                synchronized (mux) {
+                    srvNodes.remove(cctx.localNode());
+
+                    nodes = new ArrayList<>(srvNodes);
+                }
+
+                if (!nodes.isEmpty())
+                    sendAllPartitions(nodes);
+
+                onDone(exchangeId().topologyVersion());
+            }
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
         }
     }
 
@@ -1286,7 +1294,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         try {
             if (n != null)
-                sendAllPartitions(F.asList(n), exchId);
+                sendAllPartitions(F.asList(n));
         }
         catch (IgniteCheckedException e) {
             if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
@@ -1314,12 +1322,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param nodeId Sender node ID.
+     * @param node Sender node.
      * @param msg Full partition info.
      */
-    public void onReceive(final UUID nodeId, final GridDhtPartitionsFullMessage msg) {
+    public void onReceive(final ClusterNode node, final GridDhtPartitionsFullMessage msg) {
         assert msg != null;
 
+        final UUID nodeId = node.id();
+
         if (isDone()) {
             if (log.isDebugEnabled())
                 log.debug("Received message for finished future [msg=" + msg + ", fut=" + this + ']');
@@ -1330,8 +1340,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (log.isDebugEnabled())
             log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
 
-        assert exchId.topologyVersion().equals(msg.topologyVersion());
-
         initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
             @Override public void apply(IgniteInternalFuture<Boolean> f) {
                 try {
@@ -1344,41 +1352,38 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     return;
                 }
 
-                ClusterNode curOldest = oldestNode.get();
-
-                if (!nodeId.equals(curOldest.id())) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
-                            ", unexpectedNodeId=" + nodeId + ']');
-
-                    ClusterNode snd = cctx.discovery().node(nodeId);
-
-                    if (snd == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
-                                ", exchId=" + msg.exchangeId() + ']');
-
-                        return;
-                    }
+                processMessage(node, msg);
+            }
+        });
+    }
 
-                    // Will process message later if sender node becomes oldest node.
-                    if (snd.order() > curOldest.order())
-                        fullMsgs.put(nodeId, msg);
+    /**
+     * @param node Sender node.
+     * @param msg Message.
+     */
+    private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
+        assert msg.exchangeId().equals(exchId) : msg;
+        assert msg.lastVersion() != null : msg;
 
-                    return;
-                }
+        synchronized (mux) {
+            if (crd == null)
+                return;
 
-                assert msg.exchangeId().equals(exchId);
+            if (!crd.equals(node)) {
+                if (log.isDebugEnabled())
+                    log.debug("Received full partition map from unexpected node [oldest=" + crd.id() +
+                            ", nodeId=" + node.id() + ']');
 
-                assert msg.lastVersion() != null;
+                if (node.order() > crd.order())
+                    fullMsgs.put(node, msg);
 
-                cctx.versions().onReceived(nodeId, msg.lastVersion());
+                return;
+            }
+        }
 
-                updatePartitionFullMap(msg);
+        updatePartitionFullMap(msg);
 
-                onDone(exchId.topologyVersion());
-            }
-        });
+        onDone(exchId.topologyVersion());
     }
 
     /**
@@ -1387,6 +1392,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param msg Partitions full messages.
      */
     private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) {
+        cctx.versions().onExchange(msg.lastVersion().order());
+
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
             Integer cacheId = entry.getKey();
 
@@ -1423,251 +1430,183 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param nodeId Left node id.
+     * Affinity change message callback, processed from the same thread as {@link #onNodeLeft}.
+     *
+     * @param node Message sender node.
+     * @param msg Message.
      */
-    public void onNodeLeft(final UUID nodeId) {
-        if (isDone())
-            return;
-
-        if (!enterBusy())
-            return;
-
-        try {
-            // Wait for initialization part of this future to complete.
-            initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                @Override public void apply(IgniteInternalFuture<Boolean> f) {
-                    try {
-                        if (!f.get())
-                            return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to initialize exchange future: " + this, e);
+    public void onAffinityChangeMessage(final ClusterNode node, final CacheAffinityChangeMessage msg) {
+        assert exchId.equals(msg.exchangeId()) : msg;
 
-                        return;
-                    }
-
-                    if (isDone())
-                        return;
-
-                    if (!enterBusy())
-                        return;
+        onDiscoveryEvent(new IgniteRunnable() {
+            @Override public void run() {
+                if (isDone() || !enterBusy())
+                    return;
 
-                    try {
-                        // Pretend to have received message from this node.
-                        rcvdIds.add(nodeId);
-
-                        Collection<UUID> rmtIds = GridDhtPartitionsExchangeFuture.this.rmtIds;
-
-                        assert rmtIds != null;
-
-                        ClusterNode oldest = oldestNode.get();
-
-                        if (oldest.id().equals(nodeId)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Oldest node left or failed on partition exchange " +
-                                    "(will restart exchange process)) [oldestNodeId=" + oldest.id() +
-                                    ", exchangeId=" + exchId + ']');
-
-                            boolean set = false;
-
-                            ClusterNode newOldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
-
-                            if (newOldest != null) {
-                                // If local node is now oldest.
-                                if (newOldest.id().equals(cctx.localNodeId())) {
-                                    synchronized (mux) {
-                                        if (oldestNode.compareAndSet(oldest, newOldest)) {
-                                            // If local node is just joining.
-                                            if (exchId.nodeId().equals(cctx.localNodeId())) {
-                                                try {
-                                                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                                        if (!cacheCtx.isLocal())
-                                                            cacheCtx.topology().beforeExchange(
-                                                                GridDhtPartitionsExchangeFuture.this);
-                                                    }
-                                                }
-                                                catch (IgniteCheckedException e) {
-                                                    onDone(e);
-
-                                                    return;
-                                                }
-                                            }
-
-                                            set = true;
-                                        }
-                                    }
-                                }
-                                else {
-                                    synchronized (mux) {
-                                        set = oldestNode.compareAndSet(oldest, newOldest);
-                                    }
-
-                                    if (set && log.isDebugEnabled())
-                                        log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
-                                            ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
-                                }
-                            }
-                            else {
-                                ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to " +
-                                    "wait for exchange future, all server nodes left.");
+                try {
+                    assert centralizedAff;
 
-                                onDone(err);
-                            }
+                    if (crd.equals(node)) {
+                        cctx.affinity().onExchangeChangeAffinityMessage(GridDhtPartitionsExchangeFuture.this,
+                            crd.isLocal(),
+                            msg);
 
-                            if (set) {
-                                // If received any messages, process them.
-                                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
-                                    onReceive(m.getKey(), m.getValue());
+                        if (!crd.isLocal()) {
+                            GridDhtPartitionsFullMessage partsMsg = msg.partitionsMessage();
 
-                                for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
-                                    onReceive(m.getKey(), m.getValue());
+                            assert partsMsg != null : msg;
+                            assert partsMsg.lastVersion() != null : partsMsg;
 
-                                // Reassign oldest node and resend.
-                                recheck();
-                            }
+                            updatePartitionFullMap(partsMsg);
                         }
-                        else if (rmtIds.contains(nodeId)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Remote node left of failed during partition exchange (will ignore) " +
-                                    "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
-
-                            assert rmtNodes != null;
-
-                            for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) {
-                                if (it.next().id().equals(nodeId))
-                                    it.remove();
-                            }
 
-                            if (allReceived() && ready.get() && replied.compareAndSet(false, true))
-                                if (spreadPartitions())
-                                    onDone(exchId.topologyVersion());
-                        }
+                        onDone(topologyVersion());
                     }
-                    finally {
-                        leaveBusy();
+                    else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Ignore affinity change message, coordinator changed [node=" + node.id() +
+                                ", crd=" + crd.id() +
+                                ", msg=" + msg +
+                                ']');
+                        }
                     }
                 }
-            });
-        }
-        finally {
-            leaveBusy();
+                finally {
+                    leaveBusy();
+                }
+            }
+        });
+    }
+
+    /**
+     * @param c Closure.
+     */
+    private void onDiscoveryEvent(IgniteRunnable c) {
+        synchronized (discoEvts) {
+            if (!init) {
+                discoEvts.add(c);
+
+                return;
+            }
+
+            assert discoEvts.isEmpty() : discoEvts;
         }
+
+        c.run();
     }
 
     /**
      *
      */
-    private void recheck() {
-        ClusterNode oldest = oldestNode.get();
+    private void initDone() {
+        while (!isDone()) {
+            List<IgniteRunnable> evts;
 
-        // If this is the oldest node.
-        if (oldest.id().equals(cctx.localNodeId())) {
-            Collection<UUID> remaining = remaining();
+            synchronized (discoEvts) {
+                if (discoEvts.isEmpty()) {
+                    init = true;
 
-            if (!remaining.isEmpty()) {
-                try {
-                    cctx.io().safeSend(cctx.discovery().nodes(remaining),
-                        new GridDhtPartitionsSingleRequest(exchId), SYSTEM_POOL, null);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to request partitions from nodes [exchangeId=" + exchId +
-                        ", nodes=" + remaining + ']', e);
+                    break;
                 }
+
+                evts = new ArrayList<>(discoEvts);
+
+                discoEvts.clear();
             }
-            // Resend full partition map because last attempt failed.
-            else {
-                if (spreadPartitions())
-                    onDone(exchId.topologyVersion());
-            }
+
+            for (IgniteRunnable c : evts)
+                c.run();
         }
-        else
-            sendPartitions(oldest);
 
-        // Schedule another send.
-        scheduleRecheck();
+        initFut.onDone(true);
     }
 
     /**
+     * Node left callback, processed from the same thread as {@link #onAffinityChangeMessage}.
      *
+     * @param node Left node.
      */
-    private void scheduleRecheck() {
-        if (!isDone()) {
-            GridTimeoutObject old = timeoutObj;
-
-            if (old != null)
-                cctx.kernalContext().timeout().removeTimeoutObject(old);
-
-            GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter(
-                cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) {
-                @Override public void onTimeout() {
-                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                        @Override public void run() {
-                            if (isDone())
-                                return;
+    public void onNodeLeft(final ClusterNode node) {
+        if (isDone() || !enterBusy())
+            return;
+
+        try {
+            onDiscoveryEvent(new IgniteRunnable() {
+                @Override public void run() {
+                    if (isDone() || !enterBusy())
+                        return;
 
-                            if (!enterBusy())
+                    try {
+                        boolean crdChanged = false;
+                        boolean allReceived = false;
+
+                        ClusterNode crd0;
+
+                        synchronized (mux) {
+                            if (!srvNodes.remove(node))
                                 return;
 
-                            try {
-                                U.warn(log,
-                                    "Retrying preload partition exchange due to timeout [done=" + isDone() +
-                                        ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + id8s(rcvdIds) +
-                                        ", rmtIds=" + id8s(rmtIds) + ", remaining=" + id8s(remaining()) +
-                                        ", init=" + init + ", initFut=" + initFut.isDone() +
-                                        ", ready=" + ready + ", replied=" + replied + ", added=" + added +
-                                        ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
-                                        oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
-                                        ", locNodeOrder=" + cctx.localNode().order() +
-                                        ", locNodeId=" + cctx.localNode().id() + ']',
-                                    "Retrying preload partition exchange due to timeout.");
-
-                                recheck();
-                            }
-                            finally {
-                                leaveBusy();
+                            boolean rmvd = remaining.remove(node.id());
+
+                            if (node.equals(crd)) {
+                                crdChanged = true;
+
+                                crd = srvNodes.size() > 0 ? srvNodes.get(0) : null;
                             }
+
+                            if (crd != null && crd.isLocal() && rmvd)
+                                allReceived = remaining.isEmpty();
+
+                            crd0 = crd;
                         }
-                    });
-                }
-            };
 
-            this.timeoutObj = timeoutObj;
+                        if (crd0 == null) {
+                            assert cctx.kernalContext().clientNode() || cctx.localNode().isDaemon() : cctx.localNode();
 
-            cctx.kernalContext().timeout().addTimeoutObject(timeoutObj);
-        }
-    }
+                            List<ClusterNode> empty = Collections.emptyList();
 
-    /**
-     * @return Remaining node IDs.
-     */
-    Collection<UUID> remaining() {
-        if (rmtIds == null)
-            return Collections.emptyList();
+                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                List<List<ClusterNode>> affAssignment = new ArrayList<>(cacheCtx.affinity().partitions());
 
-        return F.lose(rmtIds, true, rcvdIds);
-    }
+                                for (int i = 0; i < cacheCtx.affinity().partitions(); i++)
+                                    affAssignment.add(empty);
 
-    /**
-     * Convenient utility method that returns collection of node ID8s for a given
-     * collection of node IDs. ID8 is a shorter string representation of node ID,
-     * mainly the first 8 characters.
-     * <p>
-     * Note that this method doesn't create a new collection but simply iterates
-     * over the input one.
-     *
-     * @param ids Collection of nodeIds.
-     * @return Collection of node IDs for given collection of grid nodes.
-     */
-    private static Collection<String> id8s(@Nullable Collection<UUID> ids) {
-        if (ids == null || ids.isEmpty())
-            return Collections.emptyList();
+                                cacheCtx.affinity().affinityCache().initialize(topologyVersion(), affAssignment);
+                            }
+
+                            onDone(topologyVersion());
+
+                            return;
+                        }
+
+                        if (crd0.isLocal()) {
+                            if (allReceived) {
+                                onAllReceived(true);
 
-        Collection<String> res = new ArrayList<>(ids.size());
+                                return;
+                            }
 
-        for (UUID id : ids)
-            res.add(U.id8(id));
+                            for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
+                                processMessage(m.getKey(), m.getValue());
+                        }
+                        else {
+                            if (crdChanged) {
+                                sendPartitions(crd0);
 
-        return res;
+                                for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
+                                    processMessage(m.getKey(), m.getValue());
+                            }
+                        }
+                    }
+                    finally {
+                        leaveBusy();
+                    }
+                }
+            });
+        }
+        finally {
+            leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
@@ -1690,15 +1629,33 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         return exchId.hashCode();
     }
 
+    /**
+     *
+     */
+    enum ExchangeType {
+        /** */
+        CLIENT,
+        /** */
+        ALL,
+        /** */
+        NONE
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
-        ClusterNode oldestNode = this.oldestNode.get();
+        ClusterNode oldestNode;
+        Set<UUID> remaining;
+
+        synchronized (mux) {
+            oldestNode = this.crd;
+            remaining = new HashSet<>(this.remaining);
+        }
 
         return S.toString(GridDhtPartitionsExchangeFuture.class, this,
             "oldest", oldestNode == null ? "null" : oldestNode.id(),
             "oldestOrder", oldestNode == null ? "null" : oldestNode.order(),
             "evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
-            "remaining", remaining(),
+            "remaining", remaining,
             "super", super.toString());
     }
 }


Mime
View raw message