Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91153200CF0 for ; Mon, 21 Aug 2017 12:22:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8FB38164F4A; Mon, 21 Aug 2017 10:22:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1B19B164F46 for ; Mon, 21 Aug 2017 12:22:28 +0200 (CEST) Received: (qmail 45998 invoked by uid 500); 21 Aug 2017 10:22:28 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 45916 invoked by uid 99); 21 Aug 2017 10:22:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Aug 2017 10:22:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AF93DF5534; Mon, 21 Aug 2017 10:22:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 21 Aug 2017 10:22:27 -0000 Message-Id: <9c2c47dc1add41468b99e2b425c77bb9@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/10] ignite git commit: ignite-6124 Merge exchanges for multiple discovery events archived-at: Mon, 21 Aug 2017 10:22:31 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 609021b..ca6ee5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -23,25 +23,26 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; @@ -51,16 +52,20 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; +import org.apache.ignite.internal.processors.cache.ExchangeContext; +import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -75,7 +80,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -84,7 +89,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -118,10 +122,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ @GridToStringExclude - private volatile DiscoCache discoCache; + private final Object mux = new Object(); - /** Discovery event. */ - private volatile DiscoveryEvent discoEvt; + /** */ + @GridToStringExclude + private volatile DiscoCache firstEvtDiscoCache; + + /** Discovery event triggered this exchange. */ + private volatile DiscoveryEvent firstDiscoEvt; /** */ @GridToStringExclude @@ -168,10 +176,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private AtomicReference lastVer = new AtomicReference<>(); /** + * Message received from node joining cluster (if this is 'node join' exchange), + * needed if this exchange is merged with another one. + */ + @GridToStringExclude + private GridDhtPartitionsSingleMessage pendingJoinMsg; + + /** * Messages received on non-coordinator are stored in case if this node * becomes coordinator. */ - private final Map singleMsgs = new ConcurrentHashMap8<>(); + private final Map pendingSingleMsgs = new ConcurrentHashMap8<>(); /** Messages received from new coordinator. */ private final Map fullMsgs = new ConcurrentHashMap8<>(); @@ -211,6 +226,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private ConcurrentMap msgs = new ConcurrentHashMap8<>(); + /** Single messages from merged 'node join' exchanges. */ + @GridToStringExclude + private Map mergedJoinExchMsgs; + + /** Number of awaited messages for merged 'node join' exchanges. */ + @GridToStringExclude + private int awaitMergedMsgs; + /** */ @GridToStringExclude private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); @@ -225,6 +248,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private final AtomicBoolean done = new AtomicBoolean(); + /** */ + private ExchangeLocalState state; + + /** */ + @GridToStringExclude + private ExchangeContext exchCtx; + + /** */ + @GridToStringExclude + private FinishState finishState; + + /** Initialized when node becomes new coordinator. */ + @GridToStringExclude + private InitNewCoordinatorFuture newCrdFut; + + /** */ + @GridToStringExclude + private GridDhtPartitionsExchangeFuture mergedWith; + /** * @param cctx Cache context. * @param busyLock Busy lock. @@ -253,13 +295,45 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log = cctx.logger(getClass()); exchLog = cctx.logger(EXCHANGE_LOG); - initFut = new GridFutureAdapter<>(); + initFut = new GridFutureAdapter() { + @Override public IgniteLogger logger() { + return log; + } + }; if (log.isDebugEnabled()) log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); } /** + * @return Future mutex. + */ + public Object mutex() { + return mux; + } + + /** + * @return Shared cache context. + */ + public GridCacheSharedContext sharedContext() { + return cctx; + } + + /** {@inheritDoc} */ + @Override public boolean skipForExchangeMerge() { + return false; + } + + /** + * @return Exchange context. + */ + public ExchangeContext context() { + assert exchCtx != null : this; + + return exchCtx; + } + + /** * @param exchActions Exchange actions. */ public void exchangeActions(ExchangeActions exchActions) { @@ -276,9 +350,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte this.affChangeMsg = affChangeMsg; } + /** + * @return Initial exchange version. + */ + public AffinityTopologyVersion initialVersion() { + return exchId.topologyVersion(); + } + /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { - return exchId.topologyVersion(); + /* + Should not be called before exchange is finished since result version can change in + case of merged exchanges. + */ + assert exchangeDone() : "Should not be called before exchange is finished"; + + return exchCtx.events().topologyVersion(); } /** @@ -291,19 +378,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @return Discovery cache. - */ - public DiscoCache discoCache() { - return discoCache; - } - - /** * @param cacheId Cache ID. * @param rcvdFrom Node ID cache was received from. * @return {@code True} if cache was added during this exchange. */ public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) { - return dynamicCacheStarted(cacheId) || (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom)); + return dynamicCacheStarted(cacheId) || exchCtx.events().nodeJoined(rcvdFrom); } /** @@ -312,8 +392,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @return {@code True} if cache group was added during this exchange. */ public boolean cacheGroupAddedOnExchange(int grpId, UUID rcvdFrom) { - return dynamicCacheGroupStarted(grpId) || - (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom)); + return dynamicCacheGroupStarted(grpId) || exchCtx.events().nodeJoined(rcvdFrom); } /** @@ -350,8 +429,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert exchId.equals(this.exchId); this.exchId.discoveryEvent(discoEvt); - this.discoEvt = discoEvt; - this.discoCache = discoCache; + this.firstDiscoEvt= discoEvt; + this.firstEvtDiscoCache = discoCache; evtLatch.countDown(); } @@ -378,10 +457,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @return Discovery event. + * @return First event discovery event. + * + */ + public DiscoveryEvent firstEvent() { + return firstDiscoEvt; + } + + /** + * @return Discovery cache for first event. + */ + public DiscoCache firstEventCache() { + return firstEvtDiscoCache; + } + + /** + * @return Events processed in this exchange. */ - public DiscoveryEvent discoveryEvent() { - return discoEvt; + public ExchangeDiscoveryEvents events() { + return exchCtx.events(); } /** @@ -412,6 +506,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param newCrd {@code True} if node become coordinator on this exchange. + * @throws IgniteCheckedException If failed. + */ + private void initCoordinatorCaches(boolean newCrd) throws IgniteCheckedException { + if (newCrd) { + IgniteInternalFuture fut = cctx.affinity().initCoordinatorCaches(this, false); + + if (fut != null) + fut.get(); + + cctx.exchange().onCoordinatorInitialized(); + } + } + + /** * Starts activity. * * @param newCrd {@code True} if node become coordinator on this exchange. @@ -427,15 +536,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte U.await(evtLatch); - assert discoEvt != null : this; - assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this; + assert firstDiscoEvt != null : this; + assert exchId.nodeId().equals(firstDiscoEvt.eventNode().id()) : this; try { - discoCache.updateAlives(cctx.discovery()); - - AffinityTopologyVersion topVer = topologyVersion(); + AffinityTopologyVersion topVer = initialVersion(); - srvNodes = new ArrayList<>(discoCache.serverNodes()); + srvNodes = new ArrayList<>(firstEvtDiscoCache.serverNodes()); remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId())))); @@ -443,19 +550,30 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean crdNode = crd != null && crd.isLocal(); - if (exchLog.isInfoEnabled()) + exchCtx = new ExchangeContext(crdNode, this); + + assert state == null : state; + + if (crdNode) + state = ExchangeLocalState.CRD; + else + state = cctx.kernalContext().clientNode() ? ExchangeLocalState.CLIENT : ExchangeLocalState.SRV; + + if (exchLog.isInfoEnabled()) { exchLog.info("Started exchange init [topVer=" + topVer + ", crd=" + crdNode + - ", evt=" + discoEvt.type() + - ", node=" + discoEvt.node() + - ", evtNode=" + discoEvt.node() + - ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)discoEvt).customMessage() : null) + - ']'); + ", evt=" + IgniteUtils.gridEventName(firstDiscoEvt.type()) + + ", evtNode=" + firstDiscoEvt.eventNode().id() + + ", customEvt=" + (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)firstDiscoEvt).customMessage() : null) + + ", allowMerge=" + exchCtx.mergeExchanges() + ']'); + } ExchangeType exchange; - if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { - DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage(); + if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { + assert !exchCtx.mergeExchanges(); + + DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(); if (msg instanceof ChangeGlobalStateMessage) { assert exchActions != null && !exchActions.empty(); @@ -468,7 +586,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchange = onCacheChangeRequest(crdNode); } else if (msg instanceof SnapshotDiscoveryMessage) { - exchange = CU.clientNode(discoEvt.eventNode()) ? + exchange = CU.clientNode(firstDiscoEvt.eventNode()) ? onClientNodeEvent(crdNode) : onServerNodeEvent(crdNode); } @@ -477,12 +595,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchange = onAffinityChangeRequest(crdNode); } + + initCoordinatorCaches(newCrd); } else { - if (discoEvt.type() == EVT_NODE_JOINED) { - if (!discoEvt.eventNode().isLocal()) { + if (firstDiscoEvt.type() == EVT_NODE_JOINED) { + if (!firstDiscoEvt.eventNode().isLocal()) { Collection receivedCaches = cctx.cache().startReceivedCaches( - discoEvt.eventNode().id(), + firstDiscoEvt.eventNode().id(), topVer); cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); @@ -491,16 +611,35 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte initCachesOnLocalJoin(); } - if (newCrd) { - IgniteInternalFuture fut = cctx.affinity().initCoordinatorCaches(this); + initCoordinatorCaches(newCrd); - if (fut != null) - fut.get(); - } + if (exchCtx.mergeExchanges()) { + if (localJoinExchange()) { + if (cctx.kernalContext().clientNode()) { + onClientNodeEvent(crdNode); + + exchange = ExchangeType.CLIENT; + } + else { + onServerNodeEvent(crdNode); + + exchange = ExchangeType.ALL; + } + } + else { + if (CU.clientNode(firstDiscoEvt.eventNode())) + exchange = onClientNodeEvent(crdNode); + else + exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; + } - exchange = CU.clientNode(discoEvt.eventNode()) ? - onClientNodeEvent(crdNode) : - onServerNodeEvent(crdNode); + if (exchId.isLeft()) + onLeft(); + } + else { + exchange = CU.clientNode(firstDiscoEvt.eventNode()) ? onClientNodeEvent(crdNode) : + onServerNodeEvent(crdNode); + } } updateTopologies(crdNode); @@ -513,7 +652,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } case CLIENT: { - initTopologies(); + if (!exchCtx.mergeExchanges() && exchCtx.fetchAffinityOnJoin()) + initTopologies(); clientOnlyExchange(); @@ -579,7 +719,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.database().readCheckpointAndRestoreMemory(startDescs); } - cctx.cache().startCachesOnLocalJoin(caches, topologyVersion()); + cctx.cache().startCachesOnLocalJoin(caches, initialVersion()); } /** @@ -594,7 +734,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (grp.isLocal()) continue; - grp.topology().beforeExchange(this, !centralizedAff); + grp.topology().beforeExchange(this, !centralizedAff, false); } } } @@ -622,7 +762,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop && clientTop != null) { - top.update(topologyVersion(), + top.update(null, clientTop.partitionMap(true), clientTop.updateCounters(false), Collections.emptySet(), @@ -632,13 +772,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte top.updateTopologyVersion( this, - discoCache(), + events().discoveryCache(), updSeq, cacheGroupStopping(grp.groupId())); } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) - top.updateTopologyVersion(this, discoCache(), -1, cacheGroupStopping(top.groupId())); + top.updateTopologyVersion(this, events().discoveryCache(), -1, cacheGroupStopping(top.groupId())); } /** @@ -656,7 +796,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (log.isInfoEnabled()) { log.info("Start activation process [nodeId=" + cctx.localNodeId() + ", client=" + cctx.kernalContext().clientNode() + - ", topVer=" + topologyVersion() + "]"); + ", topVer=" + initialVersion() + "]"); } try { @@ -676,18 +816,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (log.isInfoEnabled()) { log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() + ", client=" + cctx.kernalContext().clientNode() + - ", topVer=" + topologyVersion() + "]"); + ", topVer=" + initialVersion() + "]"); } } catch (Exception e) { U.error(log, "Failed to activate node components [nodeId=" + cctx.localNodeId() + ", client=" + cctx.kernalContext().clientNode() + - ", topVer=" + topologyVersion() + "]", e); + ", topVer=" + initialVersion() + "]", e); changeGlobalStateE = e; if (crd) { - synchronized (this) { + synchronized (mux) { changeGlobalStateExceptions.put(cctx.localNodeId(), e); } } @@ -697,7 +837,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (log.isInfoEnabled()) { log.info("Start deactivation process [nodeId=" + cctx.localNodeId() + ", client=" + cctx.kernalContext().clientNode() + - ", topVer=" + topologyVersion() + "]"); + ", topVer=" + initialVersion() + "]"); } try { @@ -711,13 +851,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log.info("Successfully deactivated data structures, services and caches [" + "nodeId=" + cctx.localNodeId() + ", client=" + cctx.kernalContext().clientNode() + - ", topVer=" + topologyVersion() + "]"); + ", topVer=" + initialVersion() + "]"); } } catch (Exception e) { U.error(log, "Failed to deactivate node components [nodeId=" + cctx.localNodeId() + ", client=" + cctx.kernalContext().clientNode() + - ", topVer=" + topologyVersion() + "]", e); + ", topVer=" + initialVersion() + "]", e); changeGlobalStateE = e; } @@ -763,19 +903,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @return Exchange type. */ private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedException { - assert CU.clientNode(discoEvt.eventNode()) : this; + assert CU.clientNode(firstDiscoEvt.eventNode()) : this; - if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) { + if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) { onLeft(); - assert !discoEvt.eventNode().isLocal() : discoEvt; + assert !firstDiscoEvt.eventNode().isLocal() : firstDiscoEvt; } else - assert discoEvt.type() == EVT_NODE_JOINED || discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt; + assert firstDiscoEvt.type() == EVT_NODE_JOINED || firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : firstDiscoEvt; cctx.affinity().onClientEvent(this, crd); - return discoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE; + return firstDiscoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE; } /** @@ -784,12 +924,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @return Exchange type. */ private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedException { - assert !CU.clientNode(discoEvt.eventNode()) : this; + assert !CU.clientNode(firstDiscoEvt.eventNode()) : this; - if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) { + if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) { onLeft(); - warnNoAffinityNodes(); + exchCtx.events().warnNoAffinityNodes(cctx); centralizedAff = cctx.affinity().onServerLeft(this, crd); } @@ -818,12 +958,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (CacheGroupContext grp : cctx.cache().cacheGroups()) { GridAffinityAssignmentCache aff = grp.affinity(); - aff.initialize(topologyVersion(), aff.idealAssignment()); + aff.initialize(initialVersion(), aff.idealAssignment()); } } + else + onAllServersLeft(); } - onDone(topologyVersion()); + onDone(initialVersion()); } /** @@ -848,7 +990,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte waitPartitionRelease(); - boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; + boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal() || cacheStopping(cacheCtx.cacheId())) @@ -860,18 +1002,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal() || cacheGroupStopping(grp.groupId())) - continue; + if (!exchCtx.mergeExchanges()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; - grp.topology().beforeExchange(this, !centralizedAff); + // It is possible affinity is not initialized yet if node joins to cluster. + if (grp.affinity().lastVersion().topologyVersion() > 0) + grp.topology().beforeExchange(this, !centralizedAff, false); + } } cctx.database().beforeExchange(this); if (crd.isLocal()) { if (remaining.isEmpty()) - onAllReceived(); + onAllReceived(null); } else sendPartitions(crd); @@ -886,7 +1032,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { long start = U.currentTimeMillis(); - IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt); + IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt); if (fut != null) { fut.get(); @@ -913,7 +1059,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @throws IgniteCheckedException If failed. */ private void waitPartitionRelease() throws IgniteCheckedException { - IgniteInternalFuture partReleaseFut = cctx.partitionReleaseFuture(topologyVersion()); + IgniteInternalFuture partReleaseFut = cctx.partitionReleaseFuture(initialVersion()); // Assign to class variable so it will be included into toString() method. this.partReleaseFut = partReleaseFut; @@ -941,7 +1087,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte catch (IgniteFutureTimeoutCheckedException ignored) { // Print pending transactions and locks that might have led to hang. if (nextDumpTime <= U.currentTimeMillis()) { - dumpPendingObjects(); + dumpPendingObjects(partReleaseFut); nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout); } @@ -1015,71 +1161,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * + * @param partReleaseFut Partition release future. */ - private void warnNoAffinityNodes() { - List cachesWithoutNodes = null; - - for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors().values()) { - if (discoCache.cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) { - if (cachesWithoutNodes == null) - cachesWithoutNodes = new ArrayList<>(); - - cachesWithoutNodes.add(cacheDesc.cacheName()); - - // Fire event even if there is no client cache started. - if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) { - Event evt = new CacheEvent( - cacheDesc.cacheName(), - cctx.localNode(), - cctx.localNode(), - "All server nodes have left the cluster.", - EventType.EVT_CACHE_NODES_LEFT, - 0, - false, - null, - null, - null, - null, - false, - null, - false, - null, - null, - null - ); - - cctx.gridEvents().record(evt); - } - } - } - - if (cachesWithoutNodes != null) { - StringBuilder sb = - new StringBuilder("All server nodes for the following caches have left the cluster: "); - - for (int i = 0; i < cachesWithoutNodes.size(); i++) { - String cache = cachesWithoutNodes.get(i); - - sb.append('\'').append(cache).append('\''); - - if (i != cachesWithoutNodes.size() - 1) - sb.append(", "); - } - - U.quietAndWarn(log, sb.toString()); + private void dumpPendingObjects(IgniteInternalFuture partReleaseFut) { + U.warn(cctx.kernalContext().cluster().diagnosticLog(), + "Failed to wait for partition release future [topVer=" + initialVersion() + + ", node=" + cctx.localNodeId() + "]"); - U.quietAndWarn(log, "Must have server nodes for caches to operate."); - } - } + U.warn(log, "Partition release future: " + partReleaseFut); - /** - * - */ - private void dumpPendingObjects() { U.warn(cctx.kernalContext().cluster().diagnosticLog(), - "Failed to wait for partition release future [topVer=" + topologyVersion() + - ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: "); + "Dumping pending objects that might be the cause: "); try { cctx.exchange().dumpDebugInfo(this); @@ -1106,6 +1198,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @return {@code True} if exchange for local node join. + */ + public boolean localJoinExchange() { + return firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isLocal(); + } + + /** * @param node Target Node. * @throws IgniteCheckedException If failed. */ @@ -1129,19 +1228,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte true); } else { - msg = cctx.exchange().createPartitionsSingleMessage(node, - exchangeId(), + msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(), false, true); - } - Map> partHistReserved0 = partHistReserved; + Map> partHistReserved0 = partHistReserved; - if (partHistReserved0 != null) - msg.partitionHistoryCounters(partHistReserved0); + if (partHistReserved0 != null) + msg.partitionHistoryCounters(partHistReserved0); + } if (stateChangeExchange() && changeGlobalStateE != null) msg.setError(changeGlobalStateE); + else if (localJoinExchange()) + msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); @@ -1176,30 +1276,72 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param msg Message to send. * @param nodes Nodes. - * @throws IgniteCheckedException If failed. + * @param mergedJoinExchMsgs Messages received from merged 'join node' exchanges. + * @param joinedNodeAff Affinity if was requested by some nodes. */ - private void sendAllPartitions(Collection nodes) throws IgniteCheckedException { - GridDhtPartitionsFullMessage m = createPartitionsMessage(true); + private void sendAllPartitions( + GridDhtPartitionsFullMessage msg, + Collection nodes, + Map mergedJoinExchMsgs, + Map joinedNodeAff) { + boolean singleNode = nodes.size() == 1; + + GridDhtPartitionsFullMessage joinedNodeMsg = null; assert !nodes.contains(cctx.localNode()); if (log.isDebugEnabled()) { log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + - ", exchId=" + exchId + ", msg=" + m + ']'); + ", exchId=" + exchId + ", msg=" + msg + ']'); } for (ClusterNode node : nodes) { + GridDhtPartitionsFullMessage sndMsg = msg; + + if (joinedNodeAff != null) { + if (singleNode) + msg.joinedNodeAffinity(joinedNodeAff); + else { + GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id()); + + if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) { + if (joinedNodeMsg == null) { + joinedNodeMsg = msg.copy(); + + joinedNodeMsg.joinedNodeAffinity(joinedNodeAff); + } + + sndMsg = joinedNodeMsg; + } + } + } + try { - cctx.io().send(node, m, SYSTEM_POOL); + GridDhtPartitionExchangeId sndExchId = exchangeId(); + + if (mergedJoinExchMsgs != null) { + GridDhtPartitionsSingleMessage mergedMsg = mergedJoinExchMsgs.get(node.id()); + + if (mergedMsg != null) + sndExchId = mergedMsg.exchangeId(); + } + + if (sndExchId != null && !sndExchId.equals(exchangeId())) { + sndMsg = sndMsg.copy(); + + sndMsg.exchangeId(sndExchId); + } + + cctx.io().send(node, sndMsg, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); } catch (IgniteCheckedException e) { - if (cctx.io().checkNodeLeft(node.id(), e, false)) { - if (log.isDebugEnabled()) - log.debug("Failed to send partitions, node failed: " + node); - } - else - U.error(log, "Failed to send partitions [node=" + node + ']', e); + U.error(log, "Failed to send partitions [node=" + node + ']', e); } } } @@ -1226,16 +1368,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @return {@code True} if exchange triggered by server node join or fail. */ public boolean serverNodeDiscoveryEvent() { - assert discoEvt != null; + assert exchCtx != null; - return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode()); + return exchCtx.events().hasServerJoin() || exchCtx.events().hasServerLeft(); + } + + /** {@inheritDoc} */ + @Override public boolean exchangeDone() { + return done.get(); + } + + /** + * Finish merged future to allow GridCachePartitionExchangeManager.ExchangeFutureSet cleanup. + */ + public void finishMerged() { + super.onDone(null, null); } /** {@inheritDoc} */ @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { - if (!done.compareAndSet(false, true)) + if (isDone() || !done.compareAndSet(false, true)) return false; + log.info("Finish exchange future [startVer=" + initialVersion() + + ", resVer=" + res + + ", err=" + err + ']'); + + assert res != null || err != null; + if (err == null && !cctx.kernalContext().clientNode() && (serverNodeDiscoveryEvent() || affChangeMsg != null)) { @@ -1243,18 +1403,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!cacheCtx.affinityNode() || cacheCtx.isLocal()) continue; - cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion()); + cacheCtx.continuousQueries().flushBackupQueue(res); } } if (err == null) { if (centralizedAff) { + assert !exchCtx.mergeExchanges(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; try { - grp.topology().initPartitions(this); + grp.topology().initPartitionsWhenAffinityReady(res, this); } catch (IgniteInterruptedCheckedException e) { U.error(log, "Failed to initialize partitions.", e); @@ -1267,7 +1429,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (drCacheCtx.isDrEnabled()) { try { - drCacheCtx.dr().onExchange(topologyVersion(), exchId.isLeft()); + drCacheCtx.dr().onExchange(res, exchId.isLeft()); } catch (IgniteCheckedException e) { U.error(log, "Failed to notify DR: " + e, e); @@ -1275,25 +1437,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (serverNodeDiscoveryEvent() && - (discoEvt.type() == EVT_NODE_LEFT || - discoEvt.type() == EVT_NODE_FAILED || - discoEvt.type() == EVT_NODE_JOINED)) - detectLostPartitions(); + if (serverNodeDiscoveryEvent()) + detectLostPartitions(res); Map m = U.newHashMap(cctx.cache().cacheGroups().size()); for (CacheGroupContext grp : cctx.cache().cacheGroups()) - m.put(grp.groupId(), validateCacheGroup(grp, discoEvt.topologyNodes())); + m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes())); grpValidRes = m; } tryToPerformLocalSnapshotOperation(); - cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err); + cctx.cache().onExchangeDone(initialVersion(), exchActions, err); - cctx.exchange().onExchangeDone(this, err); + cctx.exchange().onExchangeDone(res, initialVersion(), err); if (exchActions != null && err == null) exchActions.completeRequestFutures(cctx); @@ -1320,7 +1479,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (err == null) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) - grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false); + grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false); } } @@ -1331,17 +1490,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte initFut.onDone(err == null); - if (exchId.isLeft()) { - for (CacheGroupContext grp : cctx.cache().cacheGroups()) - grp.affinityFunction().removeNode(exchId.nodeId()); + if (exchCtx != null && exchCtx.events().hasServerLeft()) { + ExchangeDiscoveryEvents evts = exchCtx.events(); + + for (DiscoveryEvent evt : exchCtx.events().events()) { + if (evts.serverLeftEvent(evt)) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) + grp.affinityFunction().removeNode(evt.eventNode().id()); + } + } } exchActions = null; - if (discoEvt instanceof DiscoveryCustomEvent) - ((DiscoveryCustomEvent)discoEvt).customMessage(null); + if (firstDiscoEvt instanceof DiscoveryCustomEvent) + ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null); - cctx.exchange().lastFinishedFuture(this); + if (err == null) + cctx.exchange().lastFinishedFuture(this); return true; } @@ -1353,7 +1519,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * Cleans up resources to avoid excessive memory usage. */ public void cleanUp() { - singleMsgs.clear(); + pendingSingleMsgs.clear(); fullMsgs.clear(); msgs.clear(); changeGlobalStateExceptions.clear(); @@ -1361,6 +1527,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte partReleaseFut = null; changeGlobalStateE = null; exchActions = null; + mergedJoinExchMsgs = null; + pendingJoinMsg = null; + exchCtx = null; + newCrdFut = null; } /** @@ -1382,137 +1552,401 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * Processing of received single message. Actual processing in future may be delayed if init method was not - * completed, see {@link #initDone()} + * Records that this exchange if merged with another 'node join' exchange. * - * @param node Sender node. - * @param msg Single partition info. + * @param node Joined node. + * @param msg Joined node message if already received. + * @return {@code True} if need to wait for message from joined server node. */ - public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { - assert msg != null; - assert msg.exchangeId().equals(exchId) : msg; + private boolean addMergedJoinExchange(ClusterNode node, @Nullable GridDhtPartitionsSingleMessage msg) { + assert Thread.holdsLock(mux); + assert node != null; + assert state == ExchangeLocalState.CRD : state; - if (!msg.client()) { - assert msg.lastVersion() != null : msg; + if (msg == null && newCrdFut != null) + msg = newCrdFut.joinExchangeMessage(node.id()); - updateLastVersion(msg.lastVersion()); - } + UUID nodeId = node.id(); - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Received message for finished future (will reply only to sender) [msg=" + msg + - ", fut=" + this + ']'); + boolean wait = false; - if (!centralizedAff) - sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount()); + if (CU.clientNode(node)) { + if (msg != null) + waitAndReplyToNode(nodeId, msg); } else { - initFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - try { - if (!f.get()) - return; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize exchange future: " + this, e); + if (mergedJoinExchMsgs == null) + mergedJoinExchMsgs = new LinkedHashMap<>(); - return; - } + if (msg != null) { + assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order())); + + log.info("Merge server join exchange, message received [curFut=" + initialVersion() + + ", node=" + nodeId + ']'); - processMessage(node, msg); + mergedJoinExchMsgs.put(nodeId, msg); + } + else { + if (cctx.discovery().alive(nodeId)) { + log.info("Merge server join exchange, wait for message [curFut=" + initialVersion() + + ", node=" + nodeId + ']'); + + wait = true; + + mergedJoinExchMsgs.put(nodeId, null); + + awaitMergedMsgs++; } - }); + else { + log.info("Merge server join exchange, awaited node left [curFut=" + initialVersion() + + ", node=" + nodeId + ']'); + } + } } + + return wait; } /** - * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the - * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section. + * Merges this exchange with given one. * - * @param node Sender node. - * @param msg Partition single message. + * @param fut Current exchange to merge with. + * @return {@code True} if need wait for message from joined server node. */ - private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { - boolean allReceived = false; // Received all expected messages. - boolean updateSingleMap = false; + public boolean mergeJoinExchange(GridDhtPartitionsExchangeFuture fut) { + boolean wait; - synchronized (this) { - assert crd != null; + synchronized (mux) { + assert (!isDone() && !initFut.isDone()) || cctx.kernalContext().isStopping() : this; + assert (mergedWith == null && state == null) || cctx.kernalContext().isStopping() : this; - if (crd.isLocal()) { - if (remaining.remove(node.id())) { - updateSingleMap = true; + state = ExchangeLocalState.MERGED; - pendingSingleUpdates++; + mergedWith = fut; - if (stateChangeExchange() && msg.getError() != null) - changeGlobalStateExceptions.put(node.id(), msg.getError()); + ClusterNode joinedNode = firstDiscoEvt.eventNode(); - allReceived = remaining.isEmpty(); - } - } - else - singleMsgs.put(node, msg); + wait = fut.addMergedJoinExchange(joinedNode, pendingJoinMsg); } - if (updateSingleMap) { - try { - // Do not update partition map, in case cluster transitioning to inactive state. - if (!deactivateCluster()) - updatePartitionSingleMap(node, msg); - } - finally { - synchronized (this) { - assert pendingSingleUpdates > 0; + return wait; + } - pendingSingleUpdates--; + /** + * @param fut Current future. + * @return Pending join request if any. + */ + @Nullable public GridDhtPartitionsSingleMessage mergeJoinExchangeOnDone(GridDhtPartitionsExchangeFuture fut) { + synchronized (mux) { + assert !isDone(); + assert !initFut.isDone(); + assert mergedWith == null; + assert state == null; - if (pendingSingleUpdates == 0) - notifyAll(); - } - } - } + state = ExchangeLocalState.MERGED; - if (allReceived) { - awaitSingleMapUpdates(); + mergedWith = fut; - onAllReceived(); + return pendingJoinMsg; } } /** - * + * @param node Sender node. + * @param msg Message. */ - private synchronized void awaitSingleMapUpdates() { - try { - while (pendingSingleUpdates > 0) - U.wait(this); - } - catch (IgniteInterruptedCheckedException e) { - U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e); + private void processMergedMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + if (msg.client()) { + waitAndReplyToNode(node.id(), msg); + + return; } - } - /** - * @param fut Affinity future. - */ - private void onAffinityInitialized(IgniteInternalFuture>>> fut) { - try { - assert fut.isDone(); + boolean done = false; - Map>> assignmentChange = fut.get(); + FinishState finishState0 = null; - GridDhtPartitionsFullMessage m = createPartitionsMessage(false); + synchronized (mux) { + if (state == ExchangeLocalState.DONE) { + assert finishState != null; - CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange); + finishState0 = finishState; + } + else { + boolean process = mergedJoinExchMsgs != null && + mergedJoinExchMsgs.containsKey(node.id()) && + mergedJoinExchMsgs.get(node.id()) == null; - if (log.isDebugEnabled()) - log.debug("Centralized affinity exchange, send affinity change message: " + msg); + log.info("Merge server join exchange, received message [curFut=" + initialVersion() + + ", node=" + node.id() + + ", msgVer=" + msg.exchangeId().topologyVersion() + + ", process=" + process + + ", awaited=" + awaitMergedMsgs + ']'); - cctx.discovery().sendCustomEvent(msg); + if (process) { + mergedJoinExchMsgs.put(node.id(), msg); + + assert awaitMergedMsgs > 0 : awaitMergedMsgs; + + awaitMergedMsgs--; + + done = awaitMergedMsgs == 0; + } + } } - catch (IgniteCheckedException e) { - onDone(e); + + if (finishState0 != null) { + sendAllPartitionsToNode(finishState0, msg, node.id()); + + return; + } + + if (done) + finishExchangeOnCoordinator(null); + } + + /** + * Processing of received single message. Actual processing in future may be delayed if init method was not + * completed, see {@link #initDone()} + * + * @param node Sender node. + * @param msg Single partition info. + */ + public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + assert !node.isDaemon() : node; + assert msg != null; + assert exchId.equals(msg.exchangeId()) : msg; + assert !cctx.kernalContext().clientNode(); + + if (msg.restoreState()) { + InitNewCoordinatorFuture newCrdFut0; + + synchronized (mux) { + assert newCrdFut != null; + + newCrdFut0 = newCrdFut; + } + + newCrdFut0.onMessage(node, msg); + + return; + } + + if (!msg.client()) { + assert msg.lastVersion() != null : msg; + + updateLastVersion(msg.lastVersion()); + } + + GridDhtPartitionsExchangeFuture mergedWith0 = null; + + synchronized (mux) { + if (state == ExchangeLocalState.MERGED) { + assert mergedWith != null; + + mergedWith0 = mergedWith; + } + else { + assert state != ExchangeLocalState.CLIENT; + + if (exchangeId().isJoined() && node.id().equals(exchId.nodeId())) + pendingJoinMsg = msg; + } + } + + if (mergedWith0 != null) { + mergedWith0.processMergedMessage(node, msg); + + return; + } + + initFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture f) { + try { + if (!f.get()) + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); + + return; + } + + processSingleMessage(node.id(), msg); + } + }); + } + + /** + * @param nodeId Node ID. + * @param msg Client's message. + */ + public void waitAndReplyToNode(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) { + listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + if (cctx.kernalContext().isStopping()) + return; + + FinishState finishState0; + + synchronized (mux) { + finishState0 = finishState; + } + + if (finishState0 == null) { + assert firstDiscoEvt.type() == EVT_NODE_JOINED && CU.clientNode(firstDiscoEvt.eventNode()) : this; + + finishState0 = new FinishState(cctx.localNodeId(), + initialVersion(), + createPartitionsMessage(true)); + } + + sendAllPartitionsToNode(finishState0, msg, nodeId); + } + }); + } + + /** + * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the + * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section. + * + * @param nodeId Sender node. + * @param msg Partition single message. + */ + private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { + if (msg.client()) { + waitAndReplyToNode(nodeId, msg); + + return; + } + + boolean allReceived = false; // Received all expected messages. + boolean updateSingleMap = false; + + FinishState finishState0 = null; + + synchronized (mux) { + assert crd != null; + + switch (state) { + case DONE: { + log.info("Received single message, already done [ver=" + initialVersion() + + ", node=" + nodeId + ']'); + + assert finishState != null; + + finishState0 = finishState; + + break; + } + + case CRD: { + assert crd.isLocal() : crd; + + if (remaining.remove(nodeId)) { + updateSingleMap = true; + + pendingSingleUpdates++; + + if (stateChangeExchange() && msg.getError() != null) + changeGlobalStateExceptions.put(nodeId, msg.getError()); + + allReceived = remaining.isEmpty(); + + log.info("Coordinator received single message [ver=" + initialVersion() + + ", node=" + nodeId + + ", allReceived=" + allReceived + ']'); + } + + break; + } + + case SRV: + case BECOME_CRD: { + log.info("Non-coordinator received single message [ver=" + initialVersion() + + ", node=" + nodeId + ", state=" + state + ']'); + + pendingSingleMsgs.put(nodeId, msg); + + break; + } + + default: + assert false : state; + } + } + + if (finishState0 != null) { + sendAllPartitionsToNode(finishState0, msg, nodeId); + + return; + } + + if (updateSingleMap) { + try { + // Do not update partition map, in case cluster transitioning to inactive state. + if (!deactivateCluster()) + updatePartitionSingleMap(nodeId, msg); + } + finally { + synchronized (mux) { + assert pendingSingleUpdates > 0; + + pendingSingleUpdates--; + + if (pendingSingleUpdates == 0) + mux.notifyAll(); + } + } + } + + if (allReceived) { + if (!awaitSingleMapUpdates()) + return; + + onAllReceived(null); + } + } + + /** + * @return {@code False} if interrupted. + */ + private boolean awaitSingleMapUpdates() { + try { + synchronized (mux) { + while (pendingSingleUpdates > 0) + U.wait(mux); + } + + return true; + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e); + + return false; + } + } + + /** + * @param fut Affinity future. + */ + private void onAffinityInitialized(IgniteInternalFuture>>> fut) { + try { + assert fut.isDone(); + + Map>> assignmentChange = fut.get(); + + GridDhtPartitionsFullMessage m = createPartitionsMessage(false); + + CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange); + + if (log.isDebugEnabled()) + log.debug("Centralized affinity exchange, send affinity change message: " + msg); + + cctx.discovery().sendCustomEvent(msg); + } + catch (IgniteCheckedException e) { + onDone(e); } } @@ -1657,8 +2091,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * Detect lost partitions. + * + * @param resTopVer Result topology version. */ - private void detectLostPartitions() { + private void detectLostPartitions(AffinityTopologyVersion resTopVer) { boolean detected = false; synchronized (cctx.exchange().interruptLock()) { @@ -1667,7 +2103,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) { - boolean detectedOnGrp = grp.topology().detectLostPartitions(discoEvt); + boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, events().lastEvent()); detected |= detectedOnGrp; } @@ -1682,6 +2118,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param cacheNames Cache names. */ private void resetLostPartitions(Collection cacheNames) { + assert !exchCtx.mergeExchanges(); + synchronized (cctx.exchange().interruptLock()) { if (Thread.currentThread().isInterrupted()) return; @@ -1692,7 +2130,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (String cacheName : cacheNames) { if (grp.hasCache(cacheName)) { - grp.topology().resetLostPartitions(); + grp.topology().resetLostPartitions(initialVersion()); break; } @@ -1702,49 +2140,122 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * + * @param sndResNodes Additional nodes to send finish message to. */ - private void onAllReceived() { + private void onAllReceived(@Nullable Collection sndResNodes) { try { assert crd.isLocal(); - assert partHistSuppliers.isEmpty(); + assert partHistSuppliers.isEmpty() : partHistSuppliers; - if (!crd.equals(discoCache.serverNodes().get(0))) { + if (!exchCtx.mergeExchanges() && !crd.equals(events().discoveryCache().serverNodes().get(0))) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) - grp.topology().beforeExchange(this, !centralizedAff); + grp.topology().beforeExchange(this, !centralizedAff, false); } } - for (GridDhtPartitionsAbstractMessage msg : msgs.values()) { - if (msg instanceof GridDhtPartitionsSingleMessage) { - GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg; + if (exchCtx.mergeExchanges()) { + log.info("Coordinator received all messages, try merge [ver=" + initialVersion() + ']'); + + boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this); + + if (!finish) + return; + } + + finishExchangeOnCoordinator(sndResNodes); + } + catch (IgniteCheckedException e) { + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + onDone(e); + } + } + + /** + * @param sndResNodes Additional nodes to send finish message to. + */ + private void finishExchangeOnCoordinator(@Nullable Collection sndResNodes) { + try { + AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion(); - for (Map.Entry entry : msg0.partitions().entrySet()) { - Integer grpId = entry.getKey(); - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + log.info("finishExchangeOnCoordinator [topVer=" + initialVersion() + + ", resVer=" + resTopVer + ']'); - GridDhtPartitionTopology top = grp != null ? grp.topology() : - cctx.exchange().clientTopology(grpId, this); + Map idealAffDiff = null; - Map> cntrs = msg0.partitionUpdateCounters(grpId); + if (exchCtx.mergeExchanges()) { + synchronized (mux) { + if (mergedJoinExchMsgs != null) { + for (Map.Entry e : mergedJoinExchMsgs.entrySet()) { + msgs.put(e.getKey(), e.getValue()); - if (cntrs != null) - top.applyUpdateCounters(cntrs); + updatePartitionSingleMap(e.getKey(), e.getValue()); + } } } + + assert exchCtx.events().hasServerJoin() || exchCtx.events().hasServerLeft(); + + exchCtx.events().processEvents(this); + + if (exchCtx.events().hasServerLeft()) + idealAffDiff = cctx.affinity().onServerLeftWithExchangeMergeProtocol(this); + else + cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true); + + for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups().values()) { + if (desc.config().getCacheMode() == CacheMode.LOCAL) + continue; + + CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); + + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(desc.groupId()); + + top.beforeExchange(this, true, true); + } } - if (discoEvt.type() == EVT_NODE_JOINED) - assignPartitionsStates(); - else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { - assert discoEvt instanceof DiscoveryCustomEvent; + Map joinedNodeAff = null; + + for (Map.Entry e : msgs.entrySet()) { + GridDhtPartitionsSingleMessage msg = e.getValue(); + + // Apply update counters after all single messages are received. + for (Map.Entry entry : msg.partitions().entrySet()) { + Integer grpId = entry.getKey(); + + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(grpId); + + Map> cntrs = msg.partitionUpdateCounters(grpId); + + if (cntrs != null) + top.applyUpdateCounters(cntrs); + } + + Collection affReq = msg.cacheGroupsAffinityRequest(); + + if (affReq != null) { + joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, + resTopVer, + affReq, + joinedNodeAff); + } + } + + if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { + assert firstDiscoEvt instanceof DiscoveryCustomEvent; if (activateCluster()) assignPartitionsStates(); - if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) { + if (((DiscoveryCustomEvent)firstDiscoEvt).customMessage() instanceof DynamicCacheChangeBatch) { if (exchActions != null) { Set caches = exchActions.cachesToResetLostPartitions(); @@ -1753,14 +2264,40 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } } - else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) - detectLostPartitions(); + else { + if (exchCtx.events().hasServerJoin()) + assignPartitionsStates(); + + if (exchCtx.events().hasServerLeft()) + detectLostPartitions(resTopVer); + } updateLastVersion(cctx.versions().last()); cctx.versions().onExchange(lastVer.get().order()); + GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + + if (exchCtx.mergeExchanges()) { + assert !centralizedAff; + + msg.resultTopologyVersion(resTopVer); + + if (exchCtx.events().hasServerLeft()) + msg.idealAffinityDiff(idealAffDiff); + } + + msg.prepareMarshal(cctx); + + synchronized (mux) { + finishState = new FinishState(crd.id(), resTopVer, msg); + + state = ExchangeLocalState.DONE; + } + if (centralizedAff) { + assert !exchCtx.mergeExchanges(); + IgniteInternalFuture>>> fut = cctx.affinity().initAffinityOnNodeLeft(this); if (!fut.isDone()) { @@ -1774,12 +2311,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte onAffinityInitialized(fut); } else { - List nodes; + Set nodes; + + Map mergedJoinExchMsgs0; - synchronized (this) { + synchronized (mux) { srvNodes.remove(cctx.localNode()); - nodes = new ArrayList<>(srvNodes); + nodes = U.newHashSet(srvNodes.size()); + + nodes.addAll(srvNodes); + + mergedJoinExchMsgs0 = mergedJoinExchMsgs; + + if (mergedJoinExchMsgs != null) { + for (Map.Entry e : mergedJoinExchMsgs.entrySet()) { + if (e.getValue() != null) { + ClusterNode node = cctx.discovery().node(e.getKey()); + + if (node != null) + nodes.add(node); + } + } + } + + if (!F.isEmpty(sndResNodes)) + nodes.addAll(sndResNodes); } IgniteCheckedException err = null; @@ -1801,15 +2358,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean active = !stateChangeErr && req.activate(); - ChangeGlobalStateFinishMessage msg = new ChangeGlobalStateFinishMessage(req.requestId(), active); + ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage( + req.requestId(), + active); - cctx.discovery().sendCustomEvent(msg); + cctx.discovery().sendCustomEvent(stateFinishMsg); } if (!nodes.isEmpty()) - sendAllPartitions(nodes); + sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff); + + onDone(exchCtx.events().topologyVersion(), err); + + for (Map.Entry e : pendingSingleMsgs.entrySet()) { + log.info("Process pending message on coordinator [node=" + e.getKey() + + ", ver=" + initialVersion() + + ", resVer=" + resTopVer + ']'); - onDone(exchangeId().topologyVersion(), err); + processSingleMessage(e.getKey(), e.getValue()); + } } } catch (IgniteCheckedException e) { @@ -1835,127 +2402,332 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @param nodeId Node ID. - * @param retryCnt Number of retries. + * @param finishState State. + * @param msg Request. + * @param nodeId Node ID. + */ + private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsSingleMessage msg, UUID nodeId) { + ClusterNode node = cctx.node(nodeId); + + if (node != null) { + GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy(); + + Collection affReq = msg.cacheGroupsAffinityRequest(); + + if (affReq != null) { + Map aff = CacheGroupAffinityMessage.createAffinityMessages( + cctx, + finishState.resTopVer, + affReq, + null); + + fullMsg.joinedNodeAffinity(aff); + } + + if (!fullMsg.exchangeId().equals(msg.exchangeId())) { + fullMsg = fullMsg.copy(); + + fullMsg.exchangeId(msg.exchangeId()); + } + + try { + cctx.io().send(node, fullMsg, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions [node=" + node + ']', e); + } + } + else if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + nodeId); + + } + + /** + * @param node Sender node. + * @param msg Full partition info. + */ + public void onReceiveFullMessage(final ClusterNode node, final GridDhtPartitionsFullMessage msg) { + assert msg != null; + assert msg.exchangeId() != null : msg; + assert !node.isDaemon() : node; + + initFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture f) { + try { + if (!f.get()) + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); + + return; + } + + processFullMessage(true, node, msg); + } + }); + } + + /** + * @param node Sender node. + * @param msg Message with full partition info. + */ + public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) { + assert !cctx.kernalContext().clientNode() || msg.restoreState(); + assert !node.isDaemon() && !CU.clientNode(node) : node; + + initFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + processSinglePartitionRequest(node, msg); + } + }); + } + + /** + * @param node Sender node. + * @param msg Message. + */ + private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) { + FinishState finishState0 = null; + + synchronized (mux) { + if (crd == null) { + log.info("Ignore partitions request, no coordinator [node=" + node.id() + ']'); + + return; + } + + switch (state) { + case DONE: { + assert finishState != null; + + if (node.id().equals(finishState.crdId)) { + log.info("Ignore partitions request, finished exchange with this coordinator: " + msg); + + return; + } + + finishState0 = finishState; + + break; + } + + case CRD: + case BECOME_CRD: { + log.info("Ignore partitions request, node is coordinator: " + msg); + + return; + } + + case CLIENT: + case SRV: { + if (!cctx.discovery().alive(node)) { + log.info("Ignore partitions request, node is not alive [node=" + node.id() + ']'); + + return; + } + + if (msg.restoreState()) { + if (!node.equals(crd)) { + if (node.order() > crd.order()) { + log.info("Received partitions request, change coordinator [oldCrd=" + crd.id() + + ", newCrd=" + node.id() + ']'); + + crd = node; // Do not allow to process FullMessage from old coordinator. + } + else { + log.info("Ignore restore state request, coordinator changed [oldCrd=" + crd.id() + + ", newCrd=" + node.id() + ']'); + + return; + } + } + } + + break; + } + + default: + assert false : state; + } + } + + if (msg.restoreState()) { + try { + assert msg.restoreExchangeId() != null : msg; + + GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage( + msg.restoreExchangeId(), + cctx.kernalContext().clientNode(), + true); + + if (localJoinExchange() && finishState0 == null) + res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); + + res.restoreState(true); + + log.info("Send restore state response [node=" + node.id() + + ", exchVer=" + msg.restoreExchangeId().topologyVersion() + + ", hasState=" + (finishState0 != null) + + ", affReq=" + !F.isEmpty(res.cacheGroupsAffinityRequest()) + ']'); + + res.finishMessage(finishState0 != null ? finishState0.msg : null); + + cctx.io().send(node, res, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException ignored) { + if (log.isDebugEnabled()) + log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions message [node=" + node + ", msg=" + msg + ']', e); + } + + return; + } + + try { + sendLocalPartitions(node); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message to coordinator: " + e); + } + } + + /** + * @param node Sender node. + * @param msg Message. */ - private void sendAllPartitions(final UUID nodeId, final int retryCnt) { - ClusterNode n = cctx.node(nodeId); - + private void processFullMessage(boolean checkCrd, ClusterNode node, GridDhtPartitionsFullMessage msg) { try { - if (n != null) - sendAllPartitions(F.asList(n)); - } - catch (IgniteCheckedException e) { - if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) { - if (log.isDebugEnabled()) - log.debug("Failed to send full partition map to node, node left grid " + - "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); + assert exchId.equals(msg.exchangeId()) : msg; + assert msg.lastVersion() != null : msg; - return; - } + if (checkCrd) { + assert node != null; - if (reconnectOnError(e)) { - onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + synchronized (mux) { + if (crd == null) { + log.info("Ignore full message, all server nodes left: " + msg); - return; - } + return; + } - if (retryCnt > 0) { - long timeout = cctx.gridConfig().getNetworkSendRetryDelay(); + switch (state) { + case CRD: + case BECOME_CRD: { + log.info("Ignore full message, node is coordinator: " + msg); - LT.error(log, e, "Failed to send full partition map to node (will retry after timeout) " + - "[node=" + nodeId + ", exchangeId=" + exchId + ", timeout=" + timeout + ']'); + return; + } - cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(timeout) { - @Override public void onTimeout() { - sendAllPartitions(nodeId, retryCnt - 1); - } - }); - } - else - U.error(log, "Failed to send full partition map [node=" + n + ", exchangeId=" + exchId + ']', e); - } - } + case DONE: { + log.info("Ignore full message, future is done: " + msg); - /** - * @param node Sender node. - * @param msg Full partition info. - */ - public void onReceive(final ClusterNode node, final GridDhtPartitionsFullMessage msg) { - assert msg != null; + return; + } - final UUID nodeId = node.id(); + case SRV: + case CLIENT: { + if (!crd.equals(node)) { + log.info("Received full message from non-coordinator [node=" + node.id() + + ", nodeOrder=" + node.order() + + ", crd=" + crd.id() + + ", crdOrder=" + crd.order() + ']'); - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Received message for finished future [msg=" + msg + ", fut=" + this + ']'); + if (node.order() > crd.order()) + fullMsgs.put(node, msg); - return; - } + return; + } + else { + AffinityTopologyVersion resVer = msg.resultTopologyVersion() != null ? msg.resultTopologyVersion() : initialVersion(); - if (log.isDebugEnabled()) - log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); + log.info("Received full message, will finish exchange [node=" + node.id() + + ", resVer=" + resVer + ']'); - initFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - try { - if (!f.get()) - return; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize exchange future: " + this, e); + finishState = new FinishState(crd.id(), resVer, msg); - return; - } + state = ExchangeLocalState.DONE; - processMessage(node, msg); + break; + } + } + } + } } - }); - } + else + assert node == null : node; - /** - * @param node Sender node. - * @param msg Message with full partition info. - */ - private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { - assert exchId.equals(msg.exchangeId()) : msg; - assert msg.lastVersion() != null : msg; + AffinityTopologyVersion resTopVer = initialVersion(); - synchronized (this) { - if (crd == null) - return; + if (exchCtx.mergeExchanges()) { + if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) { + log.info("Received full message, need merge [curFut=" + initialVersion() + + ", resVer=" + msg.resultTopologyVersion() + ']'); - if (!crd.equals(node)) { - if (log.isDebugEnabled()) - log.debug("Received full partition map from unexpected node [oldest=" + crd.id() + - ", nodeId=" + node.id() + ']'); + resTopVer = msg.resultTopologyVersion(); - if (node.order() > crd.order()) - fullMsgs.put(node, msg); + if (cctx.exchange().mergeExchanges(this, msg)) { + assert cctx.kernalContext().isStopping(); - return; + return; // Node is stopping, no need to further process exchange. + } + + assert resTopVer.equals(exchCtx.events().topologyVersion()) : "Unexpected result version [" + + "msgVer=" + resTopVer + + ", locVer=" + exchCtx.events().topologyVersion() + ']'; + } + + exchCtx.events().processEvents(this); + + if (localJoinExchange()) + cctx.affinity().onLocalJoin(this, msg, resTopVer); + else { + if (exchCtx.events().hasServerLeft()) + cctx.affinity().mergeExchangesOnServerLeft(this, msg); + else + cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, false); + + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; + + grp.topology().beforeExchange(this, true, false); + } + } } - } + else if (localJoinExchange() && !exchCtx.fetchAffinityOnJoin()) + cctx.affinity().onLocalJoin(this, msg, resTopVer); - updatePartitionFullMap(msg); + updatePartitionFullMap(resTopVer, msg); - IgniteCheckedException err = null; + IgniteCheckedException err = null; - if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) { - err = new IgniteCheckedException("Cluster state change failed"); + if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) { + err = new IgniteCheckedException("Cluster state change failed"); - cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest()); -