Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A0E0C200C28 for ; Mon, 13 Mar 2017 09:45:28 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9FB32160B92; Mon, 13 Mar 2017 08:45:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AC907160BA2 for ; Mon, 13 Mar 2017 09:45:25 +0100 (CET) Received: (qmail 9186 invoked by uid 500); 13 Mar 2017 08:45:24 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 8134 invoked by uid 99); 13 Mar 2017 08:45:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Mar 2017 08:45:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8C364DFE61; Mon, 13 Mar 2017 08:45:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 13 Mar 2017 08:45:57 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [36/50] [abbrv] ignite git commit: ignite-4779 Missed discovery data snapshot during exchange processing (do not use discovery manager cache to handle exchange) archived-at: Mon, 13 Mar 2017 08:45:28 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index b5cb5cf..56acc26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -28,6 +28,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -71,17 +72,19 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter(cacheId, topVer); - Collection availableNodes = ctx.discovery().cacheAffinityNodes(cacheId, topVer); + Collection availableNodes = discoCache.cacheAffinityNodes(cacheId); LinkedList tmp = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 966a186..84ff96b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -95,6 +96,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** */ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; + /** Discovery cache. */ + private volatile DiscoCache discoCache; + /** */ private volatile boolean stopping; @@ -151,6 +155,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = AffinityTopologyVersion.NONE; topVer = AffinityTopologyVersion.NONE; + + discoCache = cctx.discovery().discoCache(); } finally { lock.writeLock().unlock(); @@ -293,6 +299,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = AffinityTopologyVersion.NONE; topVer = exchId.topologyVersion(); + + discoCache = exchFut.discoCache(); } finally { lock.writeLock().unlock(); @@ -349,7 +357,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { ClusterNode loc = cctx.localNode(); - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); @@ -474,7 +482,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (exchId.isLeft()) removeNode(exchId.nodeId()); - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -876,7 +884,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; + Collection allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null; lock.readLock().lock(); @@ -973,7 +981,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, @Nullable Map cntrMap) { if (log.isDebugEnabled()) @@ -1106,7 +1114,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, @Nullable Map cntrMap, boolean checkEvictions) { @@ -1278,7 +1286,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { List affNodes = aff.get(p); if (!affNodes.contains(cctx.localNode())) { - Collection nodeIds = F.nodeIds(nodes(p, topVer, OWNING)); + List nodes = nodes(p, topVer, OWNING); + Collection nodeIds = F.nodeIds(nodes); // If all affinity nodes are owners, then evict partition from local node. if (nodeIds.containsAll(F.nodeIds(affNodes))) { @@ -1296,15 +1305,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { int affCnt = affNodes.size(); if (ownerCnt > affCnt) { - List sorted = new ArrayList<>(cctx.discovery().nodes(nodeIds)); - // Sort by node orders in ascending order. - Collections.sort(sorted, CU.nodeComparator(true)); + Collections.sort(nodes, CU.nodeComparator(true)); - int diff = sorted.size() - affCnt; + int diff = nodes.size() - affCnt; for (int i = 0; i < diff; i++) { - ClusterNode n = sorted.get(i); + ClusterNode n = nodes.get(i); if (locId.equals(n.id())) { part.rent(false); @@ -1330,17 +1337,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** - * @return Current coordinator node. - */ - @Nullable private ClusterNode currentCoordinator() { - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); - - assert oldest != null || cctx.kernalContext().clientNode(); - - return oldest; - } - - /** * Updates value for single partition. * * @param p Partition. @@ -1350,7 +1346,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) { - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); assert oldest != null || cctx.kernalContext().clientNode(); @@ -1415,7 +1411,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private void removeNode(UUID nodeId) { assert nodeId != null; - ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer)); + ClusterNode oldest = discoCache.oldestAliveServerNode(); assert oldest != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a334fd5..46fb144 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -44,7 +44,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -101,6 +101,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter topSnapshot = new AtomicReference<>(); - /** Last committed cache version before next topology version use. */ private AtomicReference lastVer = new AtomicReference<>(); @@ -335,6 +336,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter(cctx.discovery().serverNodes(topologyVersion())); + discoCache.updateAlives(cctx.discovery()); + + srvNodes = new ArrayList<>(discoCache.serverNodes()); remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId())))); @@ -560,7 +572,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter cachesWithoutNodes = null; for (String name : cctx.cache().cacheNames()) { - if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { + if (discoCache.cacheAffinityNodes(name).isEmpty()) { if (cachesWithoutNodes == null) cachesWithoutNodes = new ArrayList<>(); @@ -1106,7 +1118,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter cache; /** Topology listener. */ - private GridLocalEventListener topLsnr = new TopologyListener(); + private DiscoveryEventListener topLsnr = new TopologyListener(); static { Set versions = new TreeSet<>(new Comparator() { @@ -251,7 +251,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { cache = ctx.cache().utilityCache(); if (!ctx.clientNode()) - ctx.event().addLocalEventListener(topLsnr, EVTS); + ctx.event().addDiscoveryEventListener(topLsnr, EVTS); try { if (ctx.deploy().enabled()) @@ -314,7 +314,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { busyLock.block(); if (!ctx.clientNode()) - ctx.event().removeLocalEventListener(topLsnr); + ctx.event().removeDiscoveryEventListener(topLsnr); Collection ctxs = new ArrayList<>(); @@ -1568,9 +1568,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** * Topology listener. */ - private class TopologyListener implements GridLocalEventListener { + private class TopologyListener implements DiscoveryEventListener { /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { + @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) { if (!busyLock.enterBusy()) return; @@ -1588,11 +1588,14 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } else - topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); + topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0); depExe.execute(new BusyRunnable() { @Override public void run0() { - ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer); + // In case the cache instance isn't tracked by DiscoveryManager anymore. + discoCache.updateAlives(ctx.discovery()); + + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (oldest != null && oldest.isLocal()) { final Collection retries = new ConcurrentLinkedQueue<>();