ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/10] ignite git commit: ignite-6124 Merge exchanges for multiple discovery events
Date Mon, 21 Aug 2017 10:22:31 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d476091..5d77c9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture;
@@ -51,10 +52,15 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -63,6 +69,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -83,12 +90,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final long clientCacheMsgTimeout =
         IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, 10_000);
 
+    /** */
+    private static final IgniteClosure<ClusterNode, UUID> NODE_TO_ID = new IgniteClosure<ClusterNode, UUID>() {
+        @Override public UUID apply(ClusterNode node) {
+            return node.id();
+        }
+    };
+
+    /** */
+    private static final IgniteClosure<ClusterNode, Long> NODE_TO_ORDER = new IgniteClosure<ClusterNode, Long>() {
+        @Override public Long apply(ClusterNode node) {
+            return node.order();
+        }
+    };
+
     /** Affinity information for all started caches (initialized on coordinator). */
     private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<>();
 
-    /** Last topology version when affinity was calculated (updated from exchange thread). */
-    private AffinityTopologyVersion affCalcVer;
-
     /** Topology version which requires affinity re-calculation (set from discovery thread). */
     private AffinityTopologyVersion lastAffVer;
 
@@ -153,8 +171,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             // Clean-up in case of client reconnect.
             caches.clear();
 
-            affCalcVer = null;
-
             lastAffVer = null;
 
             caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors());
@@ -170,9 +186,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
-            assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0;
+            synchronized (mux) {
+                assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0;
 
-            lastAffVer = topVer;
+                lastAffVer = topVer;
+            }
         }
     }
 
@@ -250,13 +268,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         CacheAffinityChangeMessage msg = null;
 
         synchronized (mux) {
-            if (waitInfo == null)
+            if (waitInfo == null || !waitInfo.topVer.equals(lastAffVer) )
                 return;
 
-            assert affCalcVer != null;
-            assert affCalcVer.equals(waitInfo.topVer) : "Invalid affinity version [calcVer=" + affCalcVer +
-                ", waitVer=" + waitInfo.topVer + ']';
-
             Map<Integer, UUID> partWait = waitInfo.waitGrps.get(checkGrpId);
 
             boolean rebalanced = true;
@@ -293,14 +307,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     }
                 }
             }
-        }
 
-        try {
-            if (msg != null)
-                cctx.discovery().sendCustomEvent(msg);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send affinity change message.", e);
+            try {
+                if (msg != null)
+                    cctx.discovery().sendCustomEvent(msg);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send affinity change message.", e);
+            }
         }
     }
 
@@ -436,20 +450,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                             grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
 
+                            grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
+
+                            grpHolders.put(grp.groupId(), grpHolder);
+
                             GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId());
 
                             if (clientTop != null) {
-                                grp.topology().update(topVer,
+                                grp.topology().update(grpHolder.affinity().lastVersion(),
                                     clientTop.partitionMap(true),
                                     clientTop.updateCounters(false),
                                     Collections.<Integer>emptySet(),
                                     null);
                             }
 
-                            grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
-
-                            grpHolders.put(grp.groupId(), grpHolder);
-
                             assert grpHolder.affinity().lastVersion().equals(grp.affinity().lastVersion());
                         }
                     }
@@ -524,7 +538,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 assert grp != null;
 
-                grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+                grp.topology().onExchangeDone(null, grp.affinity().cachedAffinity(topVer), true);
             }
         }
 
@@ -677,12 +691,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     public void onCacheChangeRequest(
-        final GridDhtPartitionsExchangeFuture fut,
+        GridDhtPartitionsExchangeFuture fut,
         boolean crd,
         final ExchangeActions exchActions
     ) throws IgniteCheckedException {
         assert exchActions != null && !exchActions.empty() : exchActions;
 
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
         caches.updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
@@ -691,7 +707,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 if (exchActions.cacheGroupStopping(aff.groupId()))
                     return;
 
-                aff.clientEventTopologyChange(fut.discoveryEvent(), fut.topologyVersion());
+                aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion());
             }
         });
 
@@ -735,10 +751,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     cctx.cache().prepareCacheStart(req.startCacheConfiguration(),
                         cacheDesc,
                         nearCfg,
-                        fut.topologyVersion());
+                        evts.topologyVersion());
 
                     if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
-                        if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
+                        if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
                             U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
                     }
                 }
@@ -764,7 +780,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 else {
                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                    if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) {
+                    if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.initialVersion())) {
                         assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
 
                         initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
@@ -799,7 +815,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         if (stoppedGrps != null) {
-            boolean notify = false;
+            AffinityTopologyVersion notifyTopVer = null;
 
             synchronized (mux) {
                 if (waitInfo != null) {
@@ -807,7 +823,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         boolean rmv = waitInfo.waitGrps.remove(grpId) != null;
 
                         if (rmv) {
-                            notify = true;
+                            notifyTopVer = waitInfo.topVer;
 
                             waitInfo.assignments.remove(grpId);
                         }
@@ -815,8 +831,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
             }
 
-            if (notify) {
-                final AffinityTopologyVersion topVer = affCalcVer;
+            if (notifyTopVer != null) {
+                final AffinityTopologyVersion topVer = notifyTopVer;
 
                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                     @Override public void run() {
@@ -856,13 +872,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         boolean crd,
         CacheAffinityChangeMessage msg) {
         if (log.isDebugEnabled()) {
-            log.debug("Process exchange affinity change message [exchVer=" + exchFut.topologyVersion() +
+            log.debug("Process exchange affinity change message [exchVer=" + exchFut.initialVersion() +
                 ", msg=" + msg + ']');
         }
 
         assert exchFut.exchangeId().equals(msg.exchangeId()) : msg;
 
-        final AffinityTopologyVersion topVer = exchFut.topologyVersion();
+        final AffinityTopologyVersion topVer = exchFut.initialVersion();
 
         final Map<Integer, Map<Integer, List<UUID>>> assignment = msg.assignmentChange();
 
@@ -906,16 +922,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         boolean crd,
         final CacheAffinityChangeMessage msg)
         throws IgniteCheckedException {
-        assert affCalcVer != null || cctx.kernalContext().clientNode();
         assert msg.topologyVersion() != null && msg.exchangeId() == null : msg;
-        assert affCalcVer == null || affCalcVer.equals(msg.topologyVersion()) :
-            "Invalid version [affCalcVer=" + affCalcVer + ", msg=" + msg + ']';
 
-        final AffinityTopologyVersion topVer = exchFut.topologyVersion();
+        final AffinityTopologyVersion topVer = exchFut.initialVersion();
 
         if (log.isDebugEnabled()) {
-            log.debug("Process affinity change message [exchVer=" + exchFut.topologyVersion() +
-                ", affCalcVer=" + affCalcVer +
+            log.debug("Process affinity change message [exchVer=" + topVer +
                 ", msgVer=" + msg.topologyVersion() + ']');
         }
 
@@ -940,7 +952,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 IgniteUuid deploymentId = desc.deploymentId();
 
                 if (!deploymentId.equals(deploymentIds.get(aff.groupId()))) {
-                    aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
+                    aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
 
                     return;
                 }
@@ -964,7 +976,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             ", part=" + part +
                             ", cur=" + F.nodeIds(assignment.get(part)) +
                             ", new=" + F.nodeIds(nodes) +
-                            ", exchVer=" + exchFut.topologyVersion() +
+                            ", exchVer=" + exchFut.initialVersion() +
                             ", msgVer=" + msg.topologyVersion() +
                             ']';
 
@@ -974,14 +986,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     aff.initialize(topVer, cachedAssignment(aff, assignment, affCache));
                 }
                 else
-                    aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
+                    aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
             }
         });
-
-        synchronized (mux) {
-            if (affCalcVer == null)
-                affCalcVer = msg.topologyVersion();
-        }
     }
 
     /**
@@ -992,14 +999,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     public void onClientEvent(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
-        boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
+        boolean locJoin = fut.firstEvent().eventNode().isLocal();
 
         if (!locJoin) {
             forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                    AffinityTopologyVersion topVer = fut.topologyVersion();
+                    AffinityTopologyVersion topVer = fut.initialVersion();
 
-                    aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
+                    aff.clientEventTopologyChange(fut.firstEvent(), topVer);
                 }
             });
         }
@@ -1093,17 +1100,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (grpHolder == null) {
             grpHolder = grp != null ?
                 new CacheGroupHolder1(grp, null) :
-                CacheGroupHolder2.create(cctx, grpDesc, fut.topologyVersion(), null);
+                CacheGroupHolder2.create(cctx, grpDesc, fut.initialVersion(), null);
 
             CacheGroupHolder old = grpHolders.put(grpId, grpHolder);
 
             assert old == null : old;
 
-            List<List<ClusterNode>> newAff = grpHolder.affinity().calculate(fut.topologyVersion(),
-                fut.discoveryEvent(),
-                fut.discoCache());
-
-            grpHolder.affinity().initialize(fut.topologyVersion(), newAff);
+            calculateAndInit(fut.events(), grpHolder.affinity(), fut.initialVersion());
         }
         else if (grpHolder.client() && grp != null) {
             assert grpHolder.affinity().idealAssignment() != null;
@@ -1129,17 +1132,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     ) throws IgniteCheckedException {
         caches.initStartedCaches(descs);
 
+        if (fut.context().mergeExchanges())
+            return;
+
         if (crd) {
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc);
+                    CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
 
-                    if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
-                        List<List<ClusterNode>> assignment =
-                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
-
-                        cache.affinity().initialize(fut.topologyVersion(), assignment);
-                    }
+                    if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE))
+                        calculateAndInit(fut.events(), cache.affinity(), fut.initialVersion());
                 }
             });
         }
@@ -1165,22 +1167,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         assert desc != null : aff.cacheOrGroupName();
 
-        if (canCalculateAffinity(desc, aff, fut)) {
-            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+        ExchangeDiscoveryEvents evts = fut.context().events();
 
-            aff.initialize(fut.topologyVersion(), assignment);
-        }
+        if (canCalculateAffinity(desc, aff, fut))
+            calculateAndInit(evts, aff, evts.topologyVersion());
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                 desc.groupId(),
-                fut.topologyVersion(),
-                fut.discoCache());
+                evts.topologyVersion(),
+                evts.discoveryCache());
 
             fetchFut.init(false);
 
-            fetchAffinity(fut.topologyVersion(),
-                fut.discoveryEvent(),
-                fut.discoCache(),
+            fetchAffinity(evts.topologyVersion(),
+                evts.lastEvent(),
+                evts.discoveryCache(),
                 aff, fetchFut);
         }
     }
@@ -1201,7 +1202,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             return true;
 
         // If local node did not initiate exchange or local node is the only cache node in grid.
-        Collection<ClusterNode> affNodes = fut.discoCache().cacheGroupAffinityNodes(aff.groupId());
+        Collection<ClusterNode> affNodes = fut.events().discoveryCache().cacheGroupAffinityNodes(aff.groupId());
 
         return fut.cacheGroupAddedOnExchange(aff.groupId(), desc.receivedFrom()) ||
             !fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
@@ -1209,6 +1210,185 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param grpId Cache group ID.
+     * @return Affinity assignments.
+     */
+    public GridAffinityAssignmentCache affinity(Integer grpId) {
+        CacheGroupHolder grpHolder = grpHolders.get(grpId);
+
+        assert grpHolder != null : debugGroupName(grpId);
+
+        return grpHolder.affinity();
+    }
+
+    /**
+     * @param fut Current exchange future.
+     * @param msg Finish exchange message.
+     */
+    public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut,
+        final GridDhtPartitionsFullMessage msg) {
+        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
+        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+                ExchangeDiscoveryEvents evts = fut.context().events();
+
+                Map<Integer, CacheGroupAffinityMessage> idealAffDiff = msg.idealAffinityDiff();
+
+                List<List<ClusterNode>> idealAssignment =
+                    aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
+
+                CacheGroupAffinityMessage affMsg = idealAffDiff != null ? idealAffDiff.get(aff.groupId()) : null;
+
+                List<List<ClusterNode>> newAssignment;
+
+                if (affMsg != null) {
+                    Map<Integer, GridLongList> diff = affMsg.assignmentsDiff();
+
+                    assert !F.isEmpty(diff);
+
+                    newAssignment = new ArrayList<>(idealAssignment);
+
+                    for (Map.Entry<Integer, GridLongList> e : diff.entrySet()) {
+                        GridLongList assign = e.getValue();
+
+                        newAssignment.set(e.getKey(), CacheGroupAffinityMessage.toNodes(assign,
+                            nodesByOrder,
+                            evts.discoveryCache()));
+                    }
+                }
+                else
+                    newAssignment = idealAssignment;
+
+                aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+            }
+        });
+    }
+
+    /**
+     * @param fut Current exchange future.
+     * @param msg Message finish message.
+     * @param resTopVer Result topology version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut,
+        GridDhtPartitionsFullMessage msg,
+        final AffinityTopologyVersion resTopVer)
+        throws IgniteCheckedException {
+        final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
+
+        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+        final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
+
+        assert !F.isEmpty(joinedNodeAff) : msg;
+        assert joinedNodeAff.size() >= affReq.size();
+
+        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+                ExchangeDiscoveryEvents evts = fut.context().events();
+
+                CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+
+                assert grp != null;
+
+                if (affReq.contains(aff.groupId())) {
+                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+
+                    CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
+
+                    assert affMsg != null;
+
+                    List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache());
+
+                    assert resTopVer.equals(evts.topologyVersion());
+
+                    List<List<ClusterNode>> idealAssign =
+                        affMsg.createIdealAssignments(nodesByOrder, evts.discoveryCache());
+
+                    if (idealAssign != null)
+                        aff.idealAssignment(idealAssign);
+                    else {
+                        assert !aff.centralizedAffinityFunction();
+
+                        // Calculate ideal assignments.
+                        aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
+                    }
+
+                    aff.initialize(evts.topologyVersion(), assignments);
+                }
+                else if (fut.cacheGroupAddedOnExchange(aff.groupId(), grp.receivedFrom()))
+                    calculateAndInit(evts, aff, evts.topologyVersion());
+
+                grp.topology().initPartitionsWhenAffinityReady(resTopVer, fut);
+            }
+        });
+    }
+
+    /**
+     * @param fut Current exchange future.
+     * @param crd Coordinator flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onServerJoinWithExchangeMergeProtocol(GridDhtPartitionsExchangeFuture fut, boolean crd)
+        throws IgniteCheckedException {
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
+        assert fut.context().mergeExchanges();
+        assert evts.hasServerJoin() && !evts.hasServerLeft();
+
+        WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
+
+        this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
+
+        WaitRebalanceInfo info = this.waitInfo;
+
+        if (crd) {
+            if (log.isDebugEnabled()) {
+                log.debug("Computed new affinity after node join [topVer=" + evts.topologyVersion() +
+                    ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
+            }
+        }
+    }
+
+    /**
+     * @param fut Current exchange future.
+     * @return Computed difference with ideal affinity.
+     * @throws IgniteCheckedException If failed.
+     */
+    public  Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergeProtocol(
+        final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
+    {
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
+        assert fut.context().mergeExchanges();
+        assert evts.hasServerLeft();
+
+        forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+            @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+                AffinityTopologyVersion topVer = evts.topologyVersion();
+
+                CacheGroupHolder cache = groupHolder(topVer, desc);
+
+                List<List<ClusterNode>> assign =
+                    cache.affinity().calculate(topVer, evts.lastEvent(), evts.discoveryCache());
+
+                if (!cache.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
+                    cache.affinity().initialize(topVer, assign);
+            }
+        });
+
+        Map<Integer, Map<Integer, List<Long>>> diff = initAffinityOnNodeLeft0(evts.topologyVersion(),
+            fut,
+            NODE_TO_ORDER,
+            true);
+
+        return CacheGroupAffinityMessage.createAffinityDiffMessages(diff);
+    }
+
+    /**
      * Called on exchange initiated by server node join.
      *
      * @param fut Exchange future.
@@ -1216,9 +1396,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     public void onServerJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
-        assert !fut.discoveryEvent().eventNode().isClient();
+        assert !fut.firstEvent().eventNode().isClient();
 
-        boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
+        boolean locJoin = fut.firstEvent().eventNode().isLocal();
 
         WaitRebalanceInfo waitRebalanceInfo = null;
 
@@ -1226,40 +1406,28 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (crd) {
                 forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                     @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                        AffinityTopologyVersion topVer = fut.topologyVersion();
-
-                        CacheGroupHolder cache = groupHolder(topVer, desc);
+                        AffinityTopologyVersion topVer = fut.initialVersion();
 
-                        List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
-                            fut.discoveryEvent(),
-                            fut.discoCache());
+                        CacheGroupHolder grpHolder = groupHolder(topVer, desc);
 
-                        cache.affinity().initialize(topVer, newAff);
+                        calculateAndInit(fut.events(), grpHolder.affinity(), topVer);
                     }
                 });
             }
             else
                 fetchAffinityOnJoin(fut);
         }
-        else {
-            waitRebalanceInfo = initAffinityOnNodeJoin(fut.topologyVersion(),
-                fut.discoveryEvent(),
-                fut.discoCache(),
-                crd);
-        }
-
-        synchronized (mux) {
-            affCalcVer = fut.topologyVersion();
+        else
+            waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
 
-            this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
+        this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
 
-            WaitRebalanceInfo info = this.waitInfo;
+        WaitRebalanceInfo info = this.waitInfo;
 
-            if (crd) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() +
-                        ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
-                }
+        if (crd) {
+            if (log.isDebugEnabled()) {
+                log.debug("Computed new affinity after node join [topVer=" + fut.initialVersion() +
+                    ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
             }
         }
     }
@@ -1297,11 +1465,27 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param evts Discovery events.
+     * @param aff Affinity.
+     * @param topVer Topology version.
+     */
+    private void calculateAndInit(ExchangeDiscoveryEvents evts,
+        GridAffinityAssignmentCache aff,
+        AffinityTopologyVersion topVer)
+    {
+        List<List<ClusterNode>> assignment = aff.calculate(topVer,
+            evts.lastEvent(),
+            evts.discoveryCache());
+
+        aff.initialize(topVer, assignment);
+    }
+
+    /**
      * @param fut Exchange future.
      * @throws IgniteCheckedException If failed.
      */
     private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
-        AffinityTopologyVersion topVer = fut.topologyVersion();
+        AffinityTopologyVersion topVer = fut.initialVersion();
 
         List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
 
@@ -1310,25 +1494,31 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 continue;
 
             if (fut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) {
-                List<List<ClusterNode>> assignment = grp.affinity().calculate(fut.topologyVersion(),
-                    fut.discoveryEvent(),
-                    fut.discoCache());
-
-                grp.affinity().initialize(fut.topologyVersion(), assignment);
+                // In case if merge is allowed do not calculate affinity since it can change on exchange end.
+                if (!fut.context().mergeExchanges())
+                    calculateAndInit(fut.events(), grp.affinity(), topVer);
             }
             else {
-                CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
+                if (fut.context().fetchAffinityOnJoin()) {
+                    CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
 
-                assert grpDesc != null : grp.cacheOrGroupName();
+                    assert grpDesc != null : grp.cacheOrGroupName();
 
-                GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                    grpDesc.groupId(),
-                    topVer,
-                    fut.discoCache());
+                    GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+                        grpDesc.groupId(),
+                        topVer,
+                        fut.events().discoveryCache());
 
-                fetchFut.init(false);
+                    fetchFut.init(false);
 
-                fetchFuts.add(fetchFut);
+                    fetchFuts.add(fetchFut);
+                }
+                else {
+                    if (fut.events().discoveryCache().serverNodes().size() > 0)
+                        fut.context().addGroupAffinityRequestOnJoin(grp.groupId());
+                    else
+                        calculateAndInit(fut.events(), grp.affinity(), topVer);
+                }
             }
         }
 
@@ -1337,9 +1527,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             int grpId = fetchFut.groupId();
 
-            fetchAffinity(fut.topologyVersion(),
-                fut.discoveryEvent(),
-                fut.discoCache(),
+            fetchAffinity(topVer,
+                fut.events().lastEvent(),
+                fut.events().discoveryCache(),
                 cctx.cache().cacheGroup(grpId).affinity(),
                 fetchFut);
         }
@@ -1399,7 +1589,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @return {@code True} if affinity should be assigned by coordinator.
      */
     public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
-        ClusterNode leftNode = fut.discoveryEvent().eventNode();
+        ClusterNode leftNode = fut.firstEvent().eventNode();
 
         assert !leftNode.isClient() : leftNode;
 
@@ -1407,23 +1597,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             // Need initialize CacheGroupHolders if this node become coordinator on this exchange.
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc);
+                    CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
 
-                    cache.aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                    cache.aff.calculate(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
                 }
             });
         }
         else {
             forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                    aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                    aff.calculate(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
                 }
             });
         }
 
         synchronized (mux) {
-            affCalcVer = fut.topologyVersion();
-
             this.waitInfo = null;
         }
 
@@ -1432,13 +1620,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
     /**
      * @param fut Exchange future.
+     * @param newAff {@code True} if there are no older nodes with affinity info available.
      * @throws IgniteCheckedException If failed.
      * @return Future completed when caches initialization is done.
      */
-    public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut)
-        throws IgniteCheckedException {
+    public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut,
+        final boolean newAff) throws IgniteCheckedException {
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
 
+        final AffinityTopologyVersion topVer = fut.initialVersion();
+
         forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
             @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                 CacheGroupHolder grpHolder = grpHolders.get(desc.groupId());
@@ -1460,55 +1651,74 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         }
                     );
 
-                    grpHolder = CacheGroupHolder2.create(cctx, desc, fut.topologyVersion(), null);
+                    grpHolder = CacheGroupHolder2.create(cctx, desc, topVer, null);
 
                     final GridAffinityAssignmentCache aff = grpHolder.affinity();
 
-                    List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
+                    if (newAff) {
+                        if (!aff.lastVersion().equals(topVer))
+                            calculateAndInit(fut.events(), aff, topVer);
 
-                    int idx = exchFuts.indexOf(fut);
+                        grpHolder.topology().beforeExchange(fut, true, false);
+                    }
+                    else {
+                        List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
 
-                    assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx +
-                        ", total=" + exchFuts.size() + ']';
+                        int idx = exchFuts.indexOf(fut);
 
-                    final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
+                        assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx +
+                            ", total=" + exchFuts.size() + ']';
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("Need initialize affinity on coordinator [" +
-                            "cacheGrp=" + desc.cacheOrGroupName() +
-                            "prevAff=" + prev.topologyVersion() + ']');
-                    }
+                        final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
 
-                    assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
+                        assert prev.isDone() && prev.topologyVersion().compareTo(topVer) < 0 : prev;
 
-                    GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                        desc.groupId(),
-                        prev.topologyVersion(),
-                        prev.discoCache());
+                        if (log.isDebugEnabled()) {
+                            log.debug("Need initialize affinity on coordinator [" +
+                                "cacheGrp=" + desc.cacheOrGroupName() +
+                                "prevAff=" + prev.topologyVersion() + ']');
+                        }
 
-                    fetchFut.init(false);
+                        GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+                            desc.groupId(),
+                            prev.topologyVersion(),
+                            prev.events().discoveryCache());
 
-                    final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
+                        fetchFut.init(false);
 
-                    fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
-                        @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
-                            throws IgniteCheckedException {
-                            fetchAffinity(prev.topologyVersion(),
-                                prev.discoveryEvent(),
-                                prev.discoCache(),
-                                aff, (GridDhtAssignmentFetchFuture)fetchFut);
+                        final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
 
-                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                        fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
+                            @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
+                                throws IgniteCheckedException {
+                                fetchAffinity(prev.topologyVersion(),
+                                    prev.events().lastEvent(),
+                                    prev.events().discoveryCache(),
+                                    aff,
+                                    (GridDhtAssignmentFetchFuture)fetchFut);
 
-                            affFut.onDone(fut.topologyVersion());
-                        }
-                    });
+                                aff.calculate(topVer, fut.events().lastEvent(), fut.events().discoveryCache());
+
+                                affFut.onDone(topVer);
+                            }
+                        });
 
-                    futs.add(affFut);
+                        futs.add(affFut);
+                    }
                 }
-                else
+                else {
                     grpHolder = new CacheGroupHolder1(grp, null);
 
+                    if (newAff) {
+                        GridAffinityAssignmentCache aff = grpHolder.affinity();
+
+                        if (!aff.lastVersion().equals(topVer))
+                            calculateAndInit(fut.events(), aff, topVer);
+
+                        grpHolder.topology().beforeExchange(fut, true, false);
+                    }
+                }
+
                 CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder);
 
                 assert old == null : old;
@@ -1566,18 +1776,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param topVer Topology version.
-     * @param evt Discovery event.
-     * @param discoCache Discovery data cache.
+     * @param fut Current exchange future.
      * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return Rabalance info.
      */
-    @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final AffinityTopologyVersion topVer,
-        final DiscoveryEvent evt,
-        final DiscoCache discoCache,
-        boolean crd)
+    @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd)
         throws IgniteCheckedException {
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
         if (!crd) {
@@ -1587,27 +1794,47 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 boolean latePrimary = grp.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(topVer, evt, discoCache, grp.affinity(), null, latePrimary, affCache);
+                initAffinityOnNodeJoin(evts,
+                    evts.nodeJoined(grp.receivedFrom()),
+                    grp.affinity(),
+                    null,
+                    latePrimary,
+                    affCache);
             }
 
             return null;
         }
         else {
-            final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
+            final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(evts.lastServerEventVersion());
 
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(topVer, desc);
+                    CacheGroupHolder cache = groupHolder(evts.topologyVersion(), desc);
 
                     boolean latePrimary = cache.rebalanceEnabled;
 
-                    initAffinityOnNodeJoin(topVer,
-                        evt,
-                        discoCache,
+                    boolean grpAdded = evts.nodeJoined(desc.receivedFrom());
+
+                    initAffinityOnNodeJoin(evts,
+                        grpAdded,
                         cache.affinity(),
                         waitRebalanceInfo,
                         latePrimary,
                         affCache);
+
+                    if (grpAdded) {
+                        AffinityAssignment aff = cache.aff.cachedAffinity(cache.aff.lastVersion());
+
+                        assert evts.topologyVersion().equals(aff.topologyVersion()) : "Unexpected version [" +
+                            "grp=" + cache.aff.cacheOrGroupName() +
+                            ", evts=" + evts.topologyVersion() +
+                            ", aff=" + cache.aff.lastVersion() + ']';
+
+                        Map<UUID, GridDhtPartitionMap> map = affinityFullMap(aff);
+
+                        for (GridDhtPartitionMap map0 : map.values())
+                            cache.topology().update(fut.exchangeId(), map0, true);
+                    }
                 }
             });
 
@@ -1615,25 +1842,57 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
     }
 
+    private Map<UUID, GridDhtPartitionMap> affinityFullMap(AffinityAssignment aff) {
+        Map<UUID, GridDhtPartitionMap> map = new HashMap<>();
+
+        for (int p = 0; p < aff.assignment().size(); p++) {
+            HashSet<UUID> ids = aff.getIds(p);
+
+            for (UUID nodeId : ids) {
+                GridDhtPartitionMap partMap = map.get(nodeId);
+
+                if (partMap == null) {
+                    partMap = new GridDhtPartitionMap(nodeId,
+                        1L,
+                        aff.topologyVersion(),
+                        new GridPartitionStateMap(),
+                        false);
+
+                    map.put(nodeId, partMap);
+                }
+
+                partMap.put(p, GridDhtPartitionState.OWNING);
+            }
+        }
+
+        return map;
+    }
+
     /**
-     * @param topVer Topology version.
-     * @param evt Discovery event.
-     * @param discoCache Discovery data cache.
+     * @param evts Discovery events processed during exchange.
+     * @param addedOnExchnage {@code True} if cache group was added during this exchange.
      * @param aff Affinity.
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is not owner.
      * @param affCache Already calculated assignments (to reduce data stored in history).
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinityOnNodeJoin(AffinityTopologyVersion topVer,
-        DiscoveryEvent evt,
-        DiscoCache discoCache,
+    private void initAffinityOnNodeJoin(
+        ExchangeDiscoveryEvents evts,
+        boolean addedOnExchnage,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
         boolean latePrimary,
         Map<Object, List<List<ClusterNode>>> affCache)
         throws IgniteCheckedException
     {
+        if (addedOnExchnage) {
+            if (!aff.lastVersion().equals(evts.topologyVersion()))
+                calculateAndInit(evts, aff, evts.topologyVersion());
+
+            return;
+        }
+
         AffinityTopologyVersion affTopVer = aff.lastVersion();
 
         assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() +
@@ -1643,7 +1902,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert aff.idealAssignment() != null : "Previous assignment is not available.";
 
-        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, evt, discoCache);
+        List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
@@ -1655,7 +1914,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 ClusterNode newPrimary = newNodes.size() > 0 ? newNodes.get(0) : null;
 
                 if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) {
-                    assert cctx.discovery().node(topVer, curPrimary.id()) != null : curPrimary;
+                    assert cctx.discovery().node(evts.topologyVersion(), curPrimary.id()) != null : curPrimary;
 
                     List<ClusterNode> nodes0 = latePrimaryAssignment(aff,
                         p,
@@ -1674,7 +1933,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
+        aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
     }
 
     /**
@@ -1738,7 +1997,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinityOnNodeLeft(
         final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
-        IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut);
+        assert !fut.context().mergeExchanges();
+
+        IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut, false);
 
         if (initFut != null && !initFut.isDone()) {
             final GridFutureAdapter<Map<Integer, Map<Integer, List<UUID>>>> resFut = new GridFutureAdapter<>();
@@ -1746,7 +2007,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> initFut) {
                     try {
-                        resFut.onDone(initAffinityOnNodeLeft0(fut));
+                        resFut.onDone(initAffinityOnNodeLeft0(fut.initialVersion(), fut, NODE_TO_ID, false));
                     }
                     catch (IgniteCheckedException e) {
                         resFut.onDone(e);
@@ -1757,29 +2018,33 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             return resFut;
         }
         else
-            return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut));
+            return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut.initialVersion(), fut, NODE_TO_ID, false));
     }
 
     /**
+     * @param topVer Topology version.
      * @param fut Exchange future.
+     * @param c Closure converting affinity diff.
+     * @param initAff {@code True} if need initialize affinity.
      * @return Affinity assignment.
      * @throws IgniteCheckedException If failed.
      */
-    private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
+    private <T> Map<Integer, Map<Integer, List<T>>> initAffinityOnNodeLeft0(final AffinityTopologyVersion topVer,
+        final GridDhtPartitionsExchangeFuture fut,
+        final IgniteClosure<ClusterNode, T> c,
+        final boolean initAff)
         throws IgniteCheckedException {
-        final AffinityTopologyVersion topVer = fut.topologyVersion();
-
-        final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
+        final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
 
-        final Collection<ClusterNode> aliveNodes = cctx.discovery().nodes(topVer);
+        final Collection<ClusterNode> aliveNodes = fut.context().events().discoveryCache().serverNodes();
 
-        final Map<Integer, Map<Integer, List<UUID>>> assignment = new HashMap<>();
+        final Map<Integer, Map<Integer, List<T>>> assignment = new HashMap<>();
 
         forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
             @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                CacheGroupHolder grpHolder = groupHolder(fut.topologyVersion(), desc);
+                CacheGroupHolder grpHolder = groupHolder(topVer, desc);
 
-                if (!grpHolder.rebalanceEnabled)
+                if (!grpHolder.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
                     return;
 
                 AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion();
@@ -1792,14 +2057,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 assert newAssignment != null;
 
-                GridDhtPartitionTopology top = grpHolder.topology(fut);
+                List<List<ClusterNode>> newAssignment0 = initAff ? new ArrayList<>(newAssignment) : null;
 
-                Map<Integer, List<UUID>> cacheAssignment = null;
+                GridDhtPartitionTopology top = grpHolder.topology();
+
+                Map<Integer, List<T>> cacheAssignment = null;
 
                 for (int p = 0; p < newAssignment.size(); p++) {
                     List<ClusterNode> newNodes = newAssignment.get(p);
                     List<ClusterNode> curNodes = curAssignment.get(p);
 
+                    assert aliveNodes.containsAll(newNodes) : "Invalid new assignment [grp=" + grpHolder.aff.cacheOrGroupName() +
+                        ", nodes=" + newNodes +
+                        ", topVer=" + fut.context().events().discoveryCache().version() +
+                        ", evts=" + fut.context().events().events() + "]";
+
                     ClusterNode curPrimary = curNodes.size() > 0 ? curNodes.get(0) : null;
                     ClusterNode newPrimary = newNodes.size() > 0 ? newNodes.get(0) : null;
 
@@ -1829,7 +2101,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                                 for (int i = 1; i < curNodes.size(); i++) {
                                     ClusterNode curNode = curNodes.get(i);
 
-                                    if (top.partitionState(curNode.id(), p) == GridDhtPartitionState.OWNING) {
+                                    if (top.partitionState(curNode.id(), p) == GridDhtPartitionState.OWNING &&
+                                        aliveNodes.contains(curNode)) {
                                         newNodes0 = latePrimaryAssignment(grpHolder.affinity(),
                                             p,
                                             curNode,
@@ -1860,21 +2133,35 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     }
 
                     if (newNodes0 != null) {
+                        assert aliveNodes.containsAll(newNodes0) : "Invalid late assignment [grp=" + grpHolder.aff.cacheOrGroupName() +
+                            ", nodes=" + newNodes +
+                            ", topVer=" + fut.context().events().discoveryCache().version() +
+                            ", evts=" + fut.context().events().events() + "]";
+
+                        if (newAssignment0 != null)
+                            newAssignment0.set(p, newNodes0);
+
                         if (cacheAssignment == null)
                             cacheAssignment = new HashMap<>();
 
-                        cacheAssignment.put(p, toIds0(newNodes0));
+                        List<T> n = new ArrayList<>(newNodes0.size());
+
+                        for (int i = 0; i < newNodes0.size(); i++)
+                            n.add(c.apply(newNodes0.get(i)));
+
+                        cacheAssignment.put(p, n);
                     }
                 }
 
                 if (cacheAssignment != null)
                     assignment.put(grpHolder.groupId(), cacheAssignment);
+
+                if (initAff)
+                    grpHolder.affinity().initialize(topVer, newAssignment0);
             }
         });
 
         synchronized (mux) {
-            assert affCalcVer.equals(topVer);
-
             this.waitInfo = !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
 
             WaitRebalanceInfo info = this.waitInfo;
@@ -1889,6 +2176,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @return All registered cache groups.
+     */
+    public Map<Integer, CacheGroupDescriptor> cacheGroups() {
+        return caches.registeredGrps;
+    }
+
+    /**
      *
      */
     public void dumpDebugInfo() {
@@ -1983,10 +2277,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /**
-         * @param fut Exchange future.
          * @return Cache topology.
          */
-        abstract GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut);
+        abstract GridDhtPartitionTopology topology();
 
         /**
          * @return Affinity.
@@ -2021,7 +2314,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /** {@inheritDoc} */
-        @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) {
+        @Override public GridDhtPartitionTopology topology() {
             return grp.topology();
         }
     }
@@ -2097,8 +2390,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /** {@inheritDoc} */
-        @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) {
-            return cctx.exchange().clientTopology(groupId(), fut);
+        @Override public GridDhtPartitionTopology topology() {
+            return cctx.exchange().clientTopology(groupId());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 14eb362..5e5e02e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -37,15 +37,15 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
-import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
-import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
-import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
+import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -59,6 +59,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
@@ -341,7 +342,7 @@ public class CacheGroupContext {
     public GridCacheContext singleCacheContext() {
         List<GridCacheContext> caches = this.caches;
 
-        assert !sharedGroup() && caches.size() == 1;
+        assert !sharedGroup() && caches.size() == 1 : ctx.kernalContext().isStopping();
 
         return caches.get(0);
     }
@@ -581,6 +582,13 @@ public class CacheGroupContext {
     }
 
     /**
+     * @return {@code True} if cache is local.
+     */
+    public boolean isReplicated() {
+        return ccfg.getCacheMode() == REPLICATED;
+    }
+
+    /**
      * @return Cache configuration.
      */
     public CacheConfiguration config() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
index ad0dcc9..f4c1392 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
@@ -21,5 +21,8 @@ package org.apache.ignite.internal.processors.cache;
  * Cache partition exchange worker task marker interface.
  */
 public interface CachePartitionExchangeWorkerTask {
-    // No-op.
+    /**
+     * @return {@code False} if exchange merge should stop if this task is found in exchange worker queue.
+     */
+    boolean skipForExchangeMerge();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
index 68bca27..44f6002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
@@ -60,6 +60,11 @@ public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMe
         this.cachesToClose = cachesToClose;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean skipForExchangeMerge() {
+        return true;
+    }
+
     /**
      * @return Start request ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
index aab3a3e..73cc69a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
@@ -37,6 +37,11 @@ class ClientCacheUpdateTimeout extends GridTimeoutObjectAdapter implements Cache
     }
 
     /** {@inheritDoc} */
+    @Override public boolean skipForExchangeMerge() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public void onTimeout() {
         if (!cctx.kernalContext().isStopping())
             cctx.exchange().addCustomTask(this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index bb51a3b..5e2c8db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -811,9 +811,26 @@ class ClusterCachesInfo {
 
     /**
      * @param joinedNodeId Joined node ID.
+     * @return {@code True} if there are new caches received from joined node.
+     */
+    boolean hasCachesReceivedFromJoin(UUID joinedNodeId) {
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            if (desc.staticallyConfigured()) {
+                assert desc.receivedFrom() != null : desc;
+
+                if (joinedNodeId.equals(desc.receivedFrom()))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param joinedNodeId Joined node ID.
      * @return New caches received from joined node.
      */
-    @NotNull public List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
+    List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
         assert joinedNodeId != null;
 
         List<DynamicCacheDescriptor> started = null;
@@ -1732,8 +1749,7 @@ class ClusterCachesInfo {
          * DIRECT comparator for cache descriptors (first system caches).
          */
         static Comparator<DynamicCacheDescriptor> DIRECT = new Comparator<DynamicCacheDescriptor>() {
-            @Override
-            public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) {
+            @Override public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) {
                 if (!o1.cacheType().userCache())
                     return -1;
                 if (!o2.cacheType().userCache())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
new file mode 100644
index 0000000..4046c98
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.exchangeProtocolVersion;
+
+/**
+ *
+ */
+public class ExchangeContext {
+    /** */
+    public static final String IGNITE_EXCHANGE_COMPATIBILITY_VER_1 = "IGNITE_EXCHANGE_COMPATIBILITY_VER_1";
+
+    /** Cache groups to request affinity for during local join exchange. */
+    private Set<Integer> requestGrpsAffOnJoin;
+
+    /** Per-group affinity fetch on join (old protocol). */
+    private boolean fetchAffOnJoin;
+
+    /** Merges allowed flag. */
+    private final boolean merge;
+
+    /** */
+    private final ExchangeDiscoveryEvents evts;
+
+    /** */
+    private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false);
+
+    /**
+     * @param crd Coordinator flag.
+     * @param fut Exchange future.
+     */
+    public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
+        int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
+
+        if (compatibilityNode || (crd && fut.localJoinExchange())) {
+            fetchAffOnJoin = true;
+
+            merge = false;
+        }
+        else {
+            boolean startCaches = fut.exchangeId().isJoined() &&
+                fut.sharedContext().cache().hasCachesReceivedFromJoin(fut.exchangeId().eventNode());
+
+            fetchAffOnJoin = protocolVer == 1;
+
+            merge = !startCaches &&
+                protocolVer > 1 &&
+                fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
+        }
+
+        evts = new ExchangeDiscoveryEvents(fut);
+    }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if node supports exchange merge protocol.
+     */
+    boolean supportsMergeExchanges(ClusterNode node) {
+        return !compatibilityNode && exchangeProtocolVersion(node.version()) > 1;
+    }
+
+    /**
+     * @return Discovery events.
+     */
+    public ExchangeDiscoveryEvents events() {
+        return evts;
+    }
+
+    /**
+     * @return {@code True} if on local join need fetch affinity per-group (old protocol),
+     *      otherwise affinity is sent in {@link GridDhtPartitionsFullMessage}.
+     */
+    public boolean fetchAffinityOnJoin() {
+        return fetchAffOnJoin;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     */
+    void addGroupAffinityRequestOnJoin(Integer grpId) {
+        if (requestGrpsAffOnJoin == null)
+            requestGrpsAffOnJoin = new HashSet<>();
+
+        requestGrpsAffOnJoin.add(grpId);
+    }
+
+    /**
+     * @return Groups to request affinity for.
+     */
+    @Nullable public Set<Integer> groupsAffinityRequestOnJoin() {
+        return requestGrpsAffOnJoin;
+    }
+
+    /**
+     * @return {@code True} if exchanges merge is allowed during current exchange.
+     */
+    public boolean mergeExchanges() {
+        return merge;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ExchangeContext.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
new file mode 100644
index 0000000..d4fbe60
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+
+/**
+ * Discovery events processed in single exchange (contain multiple events if exchanges for multiple
+ * discovery events are merged into single exchange).
+ */
+public class ExchangeDiscoveryEvents {
+    /** Last event version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Last server join/fail event version. */
+    private AffinityTopologyVersion srvEvtTopVer;
+
+    /** Discovery data cache for last event. */
+    private DiscoCache discoCache;
+
+    /** Last event. */
+    private DiscoveryEvent lastEvt;
+
+    /** Last server join/fail event. */
+    private DiscoveryEvent lastSrvEvt;
+
+    /** All events. */
+    private List<DiscoveryEvent> evts = new ArrayList<>();
+
+    /** Server join flag. */
+    private boolean srvJoin;
+
+    /** Sever left flag. */
+    private boolean srvLeft;
+
+    /**
+     * @param fut Current exchange future.
+     */
+    ExchangeDiscoveryEvents(GridDhtPartitionsExchangeFuture fut) {
+        addEvent(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
+    }
+
+    /**
+     * @param fut Current exchange future.
+     */
+    public void processEvents(GridDhtPartitionsExchangeFuture fut) {
+        for (DiscoveryEvent evt : evts) {
+            if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
+                fut.sharedContext().mvcc().removeExplicitNodeLocks(evt.eventNode().id(), fut.initialVersion());
+        }
+
+        if (hasServerLeft())
+            warnNoAffinityNodes(fut.sharedContext());
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if has join event for give node.
+     */
+    public boolean nodeJoined(UUID nodeId) {
+        for (int i = 0; i < evts.size(); i++) {
+            DiscoveryEvent evt = evts.get(i);
+
+            if (evt.type() == EVT_NODE_JOINED && nodeId.equals(evt.eventNode().id()))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @return Last server join/fail event version.
+     */
+    AffinityTopologyVersion lastServerEventVersion() {
+        assert srvEvtTopVer != null : this;
+
+        return srvEvtTopVer;
+    }
+
+    /**
+     * @param topVer Event version.
+     * @param evt Event.
+     * @param cache Discovery data cache for given topology version.
+     */
+    void addEvent(AffinityTopologyVersion topVer, DiscoveryEvent evt, DiscoCache cache) {
+        assert evts.isEmpty() || topVer.compareTo(this.topVer) > 0 : topVer;
+
+        evts.add(evt);
+
+        this.topVer = topVer;
+        this.lastEvt = evt;
+        this.discoCache = cache;
+
+        if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
+            ClusterNode node = evt.eventNode();
+
+            if (!CU.clientNode(node)) {
+                lastSrvEvt = evt;
+
+                srvEvtTopVer = new AffinityTopologyVersion(evt.topologyVersion(), 0);
+
+                if (evt.type()== EVT_NODE_JOINED)
+                    srvJoin = true;
+                else {
+                    assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED : evt;
+
+                    srvLeft = !CU.clientNode(node);
+                }
+            }
+        }
+    }
+
+    /**
+     * @return All events.
+     */
+    public List<DiscoveryEvent> events() {
+        return evts;
+    }
+
+    /**
+     * @param evt Event.
+     * @return {@code True} if given event is {@link EventType#EVT_NODE_FAILED} or {@link EventType#EVT_NODE_LEFT}.
+     */
+    public static boolean serverLeftEvent(DiscoveryEvent evt) {
+        return  ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && !CU.clientNode(evt.eventNode()));
+    }
+
+    /**
+     * @return Discovery data cache for last event.
+     */
+    public DiscoCache discoveryCache() {
+        return discoCache;
+    }
+
+    /**
+     * @return Last event.
+     */
+    public DiscoveryEvent lastEvent() {
+        return lastSrvEvt != null ? lastSrvEvt : lastEvt;
+    }
+
+    /**
+     * @return Last event version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return {@code True} if has event for server join.
+     */
+    public boolean hasServerJoin() {
+        return srvJoin;
+    }
+
+    /**
+     * @return {@code True} if has event for server leave.
+     */
+    public boolean hasServerLeft() {
+        return srvLeft;
+    }
+
+    /**
+     * @param cctx Context.
+     */
+    public void warnNoAffinityNodes(GridCacheSharedContext<?, ?> cctx) {
+        List<String> cachesWithoutNodes = null;
+
+        for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors().values()) {
+            if (discoCache.cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) {
+                if (cachesWithoutNodes == null)
+                    cachesWithoutNodes = new ArrayList<>();
+
+                cachesWithoutNodes.add(cacheDesc.cacheName());
+
+                // Fire event even if there is no client cache started.
+                if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
+                    Event evt = new CacheEvent(
+                        cacheDesc.cacheName(),
+                        cctx.localNode(),
+                        cctx.localNode(),
+                        "All server nodes have left the cluster.",
+                        EventType.EVT_CACHE_NODES_LEFT,
+                        0,
+                        false,
+                        null,
+                        null,
+                        null,
+                        null,
+                        false,
+                        null,
+                        false,
+                        null,
+                        null,
+                        null
+                    );
+
+                    cctx.gridEvents().record(evt);
+                }
+            }
+        }
+
+        if (cachesWithoutNodes != null) {
+            StringBuilder sb =
+                new StringBuilder("All server nodes for the following caches have left the cluster: ");
+
+            for (int i = 0; i < cachesWithoutNodes.size(); i++) {
+                String cache = cachesWithoutNodes.get(i);
+
+                sb.append('\'').append(cache).append('\'');
+
+                if (i != cachesWithoutNodes.size() - 1)
+                    sb.append(", ");
+            }
+
+            IgniteLogger log = cctx.logger(getClass());
+
+            U.quietAndWarn(log, sb.toString());
+
+            U.quietAndWarn(log, "Must have server nodes for caches to operate.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ExchangeDiscoveryEvents.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 4ba4e48..fed716c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3328,8 +3328,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         try {
             KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 
-            GridCacheEntryEx e = entry0(cacheKey, ctx.discovery().topologyVersionEx(),
-                false, false);
+            GridCacheEntryEx e = entry0(cacheKey,
+                ctx.shared().exchange().readyAffinityVersion(),
+                false,
+                false);
 
             if (e == null)
                 return false;
@@ -5078,7 +5080,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 return op(tx, (AffinityTopologyVersion)null);
 
             // Tx needs affinity for entry creation, wait when affinity is ready to avoid blocking inside async operation.
-            final AffinityTopologyVersion topVer = ctx.shared().exchange().topologyVersion();
+            final AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
 
             IgniteInternalFuture<?> topFut = ctx.shared().exchange().affinityReadyFuture(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 33db2ff..b6faf47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -59,7 +59,6 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
 import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -72,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTran
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 981c6e2..6529795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -480,6 +480,24 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Lock or {@code null} if node is stopping.
+     */
+    @Nullable public Lock readLock() {
+        Lock lock = rw.readLock();
+
+        if (!lock.tryLock())
+            return null;
+
+        if (stopping) {
+            lock.unlock();
+
+            return null;
+        }
+
+        return lock;
+    }
+
+    /**
      *
      */
     public void writeLock() {
@@ -1051,27 +1069,34 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 throw e;
         }
         finally {
-            // Reset thread local context.
-            cctx.tm().resetContext();
+            onMessageProcessed(msg);
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    public void onMessageProcessed(GridCacheMessage msg) {
+        // Reset thread local context.
+        cctx.tm().resetContext();
 
-            GridCacheMvccManager mvcc = cctx.mvcc();
+        GridCacheMvccManager mvcc = cctx.mvcc();
 
-            if (mvcc != null)
-                mvcc.contextReset();
+        if (mvcc != null)
+            mvcc.contextReset();
 
-            // Unwind eviction notifications.
-            if (msg instanceof IgniteTxStateAware) {
-                IgniteTxState txState = ((IgniteTxStateAware)msg).txState();
+        // Unwind eviction notifications.
+        if (msg instanceof IgniteTxStateAware) {
+            IgniteTxState txState = ((IgniteTxStateAware)msg).txState();
 
-                if (txState != null)
-                    txState.unwindEvicts(cctx);
-            }
-            else if (msg instanceof GridCacheIdMessage) {
-                GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId());
+            if (txState != null)
+                txState.unwindEvicts(cctx);
+        }
+        else if (msg instanceof GridCacheIdMessage) {
+            GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId());
 
-                if (ctx != null)
-                    CU.unwindEvicts(ctx);
-            }
+            if (ctx != null)
+                CU.unwindEvicts(ctx);
         }
     }
 


Mime
View raw message