Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6D949200C88 for ; Fri, 2 Jun 2017 19:13:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6AEBB160BDD; Fri, 2 Jun 2017 17:13:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 68CE5160BD2 for ; Fri, 2 Jun 2017 19:13:15 +0200 (CEST) Received: (qmail 78724 invoked by uid 500); 2 Jun 2017 17:13:14 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 78662 invoked by uid 99); 2 Jun 2017 17:13:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Jun 2017 17:13:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 62404DF97F; Fri, 2 Jun 2017 17:13:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Fri, 02 Jun 2017 17:13:15 -0000 Message-Id: <69ac510c0ef642f5aa18105cfac00d3c@git.apache.org> In-Reply-To: <4c530179ed0447069b35932ef372a970@git.apache.org> References: <4c530179ed0447069b35932ef372a970@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/49] ignite git commit: Multiple optimizations from ignite-gg-8.0.3.ea5-atomicbench. archived-at: Fri, 02 Jun 2017 17:13:19 -0000 Multiple optimizations from ignite-gg-8.0.3.ea5-atomicbench. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52556f4b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52556f4b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52556f4b Branch: refs/heads/ignite-5398 Commit: 52556f4bf6d544a44bfd49b02d84aa32f741813f Parents: a3ad6e0 Author: Ilya Lantukh Authored: Fri Apr 28 19:40:31 2017 +0300 Committer: Ilya Lantukh Committed: Fri Apr 28 19:40:31 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../rendezvous/RendezvousAffinityFunction.java | 263 +++++++++++-------- .../discovery/GridDiscoveryManager.java | 1 + .../cache/CacheAffinitySharedManager.java | 68 +++++ .../cache/DynamicCacheChangeBatch.java | 17 ++ .../GridCachePartitionExchangeManager.java | 22 +- .../processors/cache/GridCacheProcessor.java | 25 +- .../dht/GridClientPartitionTopology.java | 88 ++++--- .../dht/GridDhtPartitionTopology.java | 9 + .../dht/GridDhtPartitionTopologyImpl.java | 104 +++++--- .../dht/preloader/GridDhtPartitionMap2.java | 45 +--- .../GridDhtPartitionsExchangeFuture.java | 61 ++++- .../internal/util/GridPartitionStateMap.java | 174 ++++++++++++ .../communication/tcp/TcpCommunicationSpi.java | 17 +- .../resources/META-INF/classnames.properties | 1 + .../GridCachePartitionedAffinitySpreadTest.java | 8 +- 16 files changed, 669 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 0b2fe65..ce2666b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -552,6 +552,9 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_MAX_INDEX_PAYLOAD_SIZE = "IGNITE_MAX_INDEX_PAYLOAD_SIZE"; + /** */ + public static final String IGNITE_START_CACHES_ON_JOIN = "IGNITE_START_CACHES_ON_JOIN"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index 25fa3a1..021f4e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -22,18 +22,16 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.UUID; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; @@ -48,7 +46,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; @@ -84,20 +81,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** Comparator. */ private static final Comparator> COMPARATOR = new HashComparator(); - /** Thread local message digest. */ - private ThreadLocal digest = new ThreadLocal() { - @Override protected MessageDigest initialValue() { - try { - return MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) { - assert false : "Should have failed in constructor"; - - throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e); - } - } - }; - /** Number of partitions. */ private int parts; @@ -118,10 +101,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** Hash ID resolver. */ private AffinityNodeHashResolver hashIdRslvr = null; - /** Ignite instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** Logger instance. */ @LoggerResource private transient IgniteLogger log; @@ -186,18 +165,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza private RendezvousAffinityFunction(boolean exclNeighbors, int parts, IgniteBiPredicate backupFilter) { A.ensure(parts > 0, "parts > 0"); - A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <=" + CacheConfiguration.MAX_PARTITIONS_COUNT); this.exclNeighbors = exclNeighbors; this.parts = parts; this.backupFilter = backupFilter; - - try { - MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) { - throw new IgniteException("Failed to obtain MD5 message digest instance.", e); - } } /** @@ -222,7 +193,8 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * @param parts Total number of partitions. */ public void setPartitions(int parts) { - A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT); + A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, + "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT); this.parts = parts; } @@ -355,116 +327,90 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** * Returns collection of nodes (primary first) for specified partition. * - * @param d Message digest. * @param part Partition. * @param nodes Nodes. - * @param nodesHash Serialized nodes hashes. * @param backups Number of backups. * @param neighborhoodCache Neighborhood. * @return Assignment. */ - public List assignPartition(MessageDigest d, - int part, + public List assignPartition(int part, List nodes, - Map nodesHash, int backups, @Nullable Map> neighborhoodCache) { if (nodes.size() <= 1) return nodes; - if (d == null) - d = digest.get(); - - List> lst = new ArrayList<>(nodes.size()); - - try { - for (int i = 0; i < nodes.size(); i++) { - ClusterNode node = nodes.get(i); - - byte[] nodeHashBytes = nodesHash.get(node); - - if (nodeHashBytes == null) { - Object nodeHash = resolveNodeHash(node); - - byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash); - - // Add 4 bytes for partition bytes. - nodeHashBytes = new byte[nodeHashBytes0.length + 4]; - - System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length); - - nodesHash.put(node, nodeHashBytes); - } + IgniteBiTuple [] hashArr = + (IgniteBiTuple [])new IgniteBiTuple[nodes.size()]; - U.intToBytes(part, nodeHashBytes, 0); + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); - d.reset(); + Object nodeHash = resolveNodeHash(node); - byte[] bytes = d.digest(nodeHashBytes); + long hash = hash(nodeHash.hashCode(), part); - long hash = - (bytes[0] & 0xFFL) - | ((bytes[1] & 0xFFL) << 8) - | ((bytes[2] & 0xFFL) << 16) - | ((bytes[3] & 0xFFL) << 24) - | ((bytes[4] & 0xFFL) << 32) - | ((bytes[5] & 0xFFL) << 40) - | ((bytes[6] & 0xFFL) << 48) - | ((bytes[7] & 0xFFL) << 56); - - lst.add(F.t(hash, node)); - } - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + hashArr[i] = F.t(hash, node); } - Collections.sort(lst, COMPARATOR); + final int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size()); - int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size()); + Iterable sortedNodes = new LazyLinearSortedContainer(hashArr, primaryAndBackups); + + Iterator it = sortedNodes.iterator(); List res = new ArrayList<>(primaryAndBackups); - ClusterNode primary = lst.get(0).get2(); + Collection allNeighbors = new HashSet<>(); + + ClusterNode primary = it.next(); res.add(primary); + if (exclNeighbors) + allNeighbors.addAll(neighborhoodCache.get(primary.id())); + // Select backups. if (backups > 0) { - for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { - IgniteBiTuple next = lst.get(i); - - ClusterNode node = next.get2(); + while (it.hasNext() && res.size() < primaryAndBackups) { + ClusterNode node = it.next(); if (exclNeighbors) { - Collection allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res); - - if (!allNeighbors.contains(node)) + if (!allNeighbors.contains(node)) { res.add(node); + + allNeighbors.addAll(neighborhoodCache.get(node.id())); + } + } + else if ((backupFilter != null && backupFilter.apply(primary, node)) + || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res)) + || (affinityBackupFilter == null && backupFilter == null) ) { + res.add(node); + + if (exclNeighbors) + allNeighbors.addAll(neighborhoodCache.get(node.id())); } - else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res)) - res.add(next.get2()); - else if (backupFilter != null && backupFilter.apply(primary, node)) - res.add(next.get2()); - else if (affinityBackupFilter == null && backupFilter == null) - res.add(next.get2()); } } if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria. - for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { - IgniteBiTuple next = lst.get(i); + it = sortedNodes.iterator(); - ClusterNode node = next.get2(); + it.next(); + + while (it.hasNext() && res.size() < primaryAndBackups) { + ClusterNode node = it.next(); if (!res.contains(node)) - res.add(next.get2()); + res.add(node); } if (!exclNeighborsWarn) { LT.warn(log, "Affinity function excludeNeighbors property is ignored " + - "because topology has no enough nodes to assign backups."); + "because topology has no enough nodes to assign backups.", + "Affinity function excludeNeighbors property is ignored " + + "because topology has no enough nodes to assign backups."); exclNeighborsWarn = true; } @@ -475,6 +421,31 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza return res; } + /** + * The pack partition number and nodeHash.hashCode to long and mix it by hash function based on the Wang/Jenkins + * hash. + * + * @param key0 Hash key. + * @param key1 Hash key. + * @see 64 bit mix functions + * @return Long hash key. + */ + private static long hash(int key0, int key1) { + long key = (key0 & 0xFFFFFFFFL) + | ((key1 & 0xFFFFFFFFL) << 32); + + key = (~key) + (key << 21); // key = (key << 21) - key - 1; + key ^= (key >>> 24); + key += (key << 3) + (key << 8); // key * 265 + key ^= (key >>> 14); + key += (key << 2) + (key << 4); // key * 21 + key ^= (key >>> 28); + key += (key << 31); + + return key; + } + + /** {@inheritDoc} */ @Override public void reset() { // No-op. @@ -501,19 +472,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza Map> neighborhoodCache = exclNeighbors ? GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; - MessageDigest d = digest.get(); - List nodes = affCtx.currentTopologySnapshot(); - Map nodesHash = U.newHashMap(nodes.size()); - for (int i = 0; i < parts; i++) { - List partAssignment = assignPartition(d, - i, - nodes, - nodesHash, - affCtx.backups(), - neighborhoodCache); + List partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache); assignments.add(partAssignment); } @@ -556,4 +518,83 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza o1.get2().id().compareTo(o2.get2().id()); } } + + /** + * Sorts the initial array with linear sort algorithm array + */ + private static class LazyLinearSortedContainer implements Iterable { + /** Initial node-hash array. */ + private final IgniteBiTuple[] arr; + + /** Count of the sorted elements */ + private int sorted; + + /** + * @param arr Node / partition hash list. + * @param needFirstSortedCnt Estimate count of elements to return by iterator. + */ + LazyLinearSortedContainer(IgniteBiTuple[] arr, int needFirstSortedCnt) { + this.arr = arr; + + if (needFirstSortedCnt > (int)Math.log(arr.length)) { + Arrays.sort(arr, COMPARATOR); + + sorted = arr.length; + } + } + + /** {@inheritDoc} */ + @Override public Iterator iterator() { + return new SortIterator(); + } + + /** + * + */ + private class SortIterator implements Iterator { + /** Index of the first unsorted element. */ + private int cur; + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return cur < arr.length; + } + + /** {@inheritDoc} */ + @Override public ClusterNode next() { + if (!hasNext()) + throw new NoSuchElementException(); + + if (cur < sorted) + return arr[cur++].get2(); + + IgniteBiTuple min = arr[cur]; + + int minIdx = cur; + + for (int i = cur + 1; i < arr.length; i++) { + if (COMPARATOR.compare(arr[i], min) < 0) { + minIdx = i; + + min = arr[i]; + } + } + + if (minIdx != cur) { + arr[minIdx] = arr[cur]; + + arr[cur] = min; + } + + sorted = cur++; + + return min.get2(); + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException("Remove doesn't supported"); + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 960a064..8703e29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index c1dde13..814d7e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -91,6 +92,9 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap private WaitRebalanceInfo waitInfo; /** */ + private IgniteLogger exchLog; + + /** */ private final Object mux = new Object(); /** Pending affinity assignment futures. */ @@ -123,6 +127,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap lateAffAssign = cctx.kernalContext().config().isLateAffinityAssignment() || cctx.database().persistenceEnabled(); cctx.kernalContext().event().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + + exchLog = cctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG); } /** @@ -342,6 +348,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap throws IgniteCheckedException { assert !F.isEmpty(reqs) : fut; + exchLog.info("onCacheChangeRequest start [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']'); + for (DynamicCacheChangeRequest req : reqs) { Integer cacheId = CU.cacheId(req.cacheName()); @@ -384,8 +392,12 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap Integer cacheId = CU.cacheId(req.cacheName()); if (req.start()) { + exchLog.info("prepareCacheStart start [topVer=" + fut.topologyVersion() + ", cache=" + req.cacheName() + ']'); + cctx.cache().prepareCacheStart(req, fut.topologyVersion()); + exchLog.info("prepareCacheStart end [topVer=" + fut.topologyVersion() + ", cache=" + req.cacheName() + ']'); + if (fut.isCacheAdded(cacheId, fut.topologyVersion())) { if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty()) U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName()); @@ -401,6 +413,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap if (clientCacheStarted) initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign); else if (!req.clientStartOnly()) { + exchLog.info("calculateAff start [topVer=" + fut.topologyVersion() + ", cache=" + req.cacheName() + ']'); + assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion()); GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache(); @@ -411,6 +425,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap fut.discoveryEvent(), fut.discoCache()); aff.initialize(fut.topologyVersion(), assignment); + + exchLog.info("calculateAff end [topVer=" + fut.topologyVersion() + ", cache=" + req.cacheName() + ']'); } } } @@ -487,6 +503,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap } } + exchLog.info("onCacheChangeRequest end [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']'); + return clientOnly; } @@ -564,6 +582,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap assert msg.topologyVersion() != null && msg.exchangeId() == null : msg; assert affCalcVer == null || affCalcVer.equals(msg.topologyVersion()); + exchLog.info("onChangeAffinityMessage start [topVer=" + exchFut.topologyVersion() + ", crd=" + crd + ']'); + final AffinityTopologyVersion topVer = exchFut.topologyVersion(); if (log.isDebugEnabled()) { @@ -631,6 +651,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap if (affCalcVer == null) affCalcVer = msg.topologyVersion(); } + + exchLog.info("onChangeAffinityMessage end [topVer=" + exchFut.topologyVersion() + ", crd=" + crd + ']'); } /** @@ -832,11 +854,17 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch) throws IgniteCheckedException { if (!fetch && canCalculateAffinity(aff, fut)) { + exchLog.info("initAffinity start [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']'); + List> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); aff.initialize(fut.topologyVersion(), assignment); + + exchLog.info("initAffinity end [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']'); } else { + exchLog.info("fetchAffinity start [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']'); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, aff.cacheName(), fut.topologyVersion(), @@ -845,6 +873,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap fetchFut.init(); fetchAffinity(fut, aff, fetchFut); + + exchLog.info("fetchAffinity end [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']'); } } @@ -879,6 +909,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. */ public void onServerJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { + exchLog.info("onServerJoin start [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']'); + assert !fut.discoveryEvent().eventNode().isClient(); boolean locJoin = fut.discoveryEvent().eventNode().isLocal(); @@ -894,9 +926,13 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap CacheHolder cache = cache(fut, cacheDesc); + exchLog.info("onServerJoin calc aff start [topVer=" + fut.topologyVersion() + ", cache=" + cache.name() + ']'); + List> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache()); cache.affinity().initialize(topVer, newAff); + + exchLog.info("onServerJoin calc aff end [topVer=" + fut.topologyVersion() + ", cache=" + cache.name() + ']'); } }); } @@ -923,6 +959,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap } } } + + exchLog.info("onServerJoin end [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']'); } /** @@ -949,6 +987,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. */ private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + exchLog.info("fetchAffinityOnJoin start [topVer=" + fut.topologyVersion() + ']'); + AffinityTopologyVersion topVer = fut.topologyVersion(); List fetchFuts = new ArrayList<>(); @@ -984,6 +1024,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut); } + + exchLog.info("fetchAffinityOnJoin end [topVer=" + fut.topologyVersion() + ']'); } /** @@ -1034,6 +1076,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap * @return {@code True} if affinity should be assigned by coordinator. */ public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + exchLog.info("onServerLeft start [topVer=" + fut.topologyVersion() + ']'); + ClusterNode leftNode = fut.discoveryEvent().eventNode(); assert !leftNode.isClient() : leftNode; @@ -1045,7 +1089,11 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap if (cacheCtx.isLocal()) continue; + exchLog.info("onServerLeft calc aff start [topVer=" + fut.topologyVersion() + ", cache=" + cacheCtx.name() + ']'); + cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + + exchLog.info("onServerLeft calc aff end [topVer=" + fut.topologyVersion() + ", cache=" + cacheCtx.name() + ']'); } centralizedAff = true; @@ -1062,6 +1110,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap this.waitInfo = null; } + exchLog.info("onServerLeft end [topVer=" + fut.topologyVersion() + ']'); + return centralizedAff; } @@ -1231,6 +1281,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap final Map>> affCache = new HashMap<>(); if (!crd) { + exchLog.info("initAffinityOnNodeJoin start [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']'); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -1240,9 +1292,13 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache); } + exchLog.info("initAffinityOnNodeJoin end [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']'); + return null; } else { + exchLog.info("initAffinityOnNodeJoin start [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']'); + final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); forAllRegisteredCaches(new IgniteInClosureX() { @@ -1255,6 +1311,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap } }); + exchLog.info("initAffinityOnNodeJoin end [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']'); + return waitRebalanceInfo; } } @@ -1273,6 +1331,9 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap boolean latePrimary, Map>> affCache) throws IgniteCheckedException { + exchLog.info("initAffinityOnNodeJoin start [topVer=" + fut.topologyVersion() + + ", cache=" + aff.cacheName() + ']'); + assert lateAffAssign; AffinityTopologyVersion topVer = fut.topologyVersion(); @@ -1318,6 +1379,9 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap newAssignment = idealAssignment; aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); + + exchLog.info("initAffinityOnNodeJoin end [topVer=" + fut.topologyVersion() + ", cache=" + + aff.cacheName() + ']'); } /** @@ -1413,6 +1477,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap */ private Map>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + exchLog.info("initAffinityOnNodeLeft start [topVer=" + fut.topologyVersion() + ']'); + final AffinityTopologyVersion topVer = fut.topologyVersion(); final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); @@ -1531,6 +1597,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap } } + exchLog.info("initAffinityOnNodeLeft end [topVer=" + fut.topologyVersion() + ']'); + return assignment; } http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index 4dcff9b..0e4373c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -47,6 +47,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** */ private boolean clientReconnect; + /** */ + private boolean startCaches; + /** * @param reqs Requests. */ @@ -113,6 +116,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { return clientReconnect; } + /** + * @return {@code True} if required to start all caches on client node. + */ + public boolean startCaches() { + return startCaches; + } + + /** + * @param startCaches {@code True} if required to start all caches on client node. + */ + public void startCaches(boolean startCaches) { + this.startCaches = startCaches; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeBatch.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index ff7feb8..95cb452 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; import org.apache.ignite.internal.managers.discovery.DiscoCache; @@ -118,7 +119,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.preloa */ public class GridCachePartitionExchangeManager extends GridCacheSharedManagerAdapter { /** Exchange history size. */ - private static final int EXCHANGE_HISTORY_SIZE = 1000; + private static final int EXCHANGE_HISTORY_SIZE = + IgniteSystemProperties.getInteger("IGNITE_EXCHANGE_HISTORY_SIZE", 1000); /** Atomic reference for pending timeout object. */ private AtomicReference pendingResend = new AtomicReference<>(); @@ -221,7 +223,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + - "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; exchId = exchangeId(n.id(), affinityTopologyVersion(evt), @@ -230,10 +232,10 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache,null, null); } else { - DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt; + DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage(); - if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { - DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); + if (customMsg instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg; Collection valid = new ArrayList<>(batch.requests().size()); @@ -266,8 +268,8 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache, valid, null); } } - else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) { - CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage(); + else if (customMsg instanceof CacheAffinityChangeMessage) { + CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg; if (msg.exchangeId() == null) { if (msg.exchangeNeeded()) { @@ -277,10 +279,10 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } } else - exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(evt.eventNode(), msg); } - else if (customEvt.customMessage() instanceof StartSnapshotOperationAckDiscoveryMessage - && !((StartSnapshotOperationAckDiscoveryMessage)customEvt.customMessage()).hasError()) { + else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage + && !((StartSnapshotOperationAckDiscoveryMessage)customMsg).hasError()) { exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); exchFut = exchangeFuture(exchId, evt, null, null, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 2bb1e42..8ad3c8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -43,6 +43,7 @@ import javax.management.JMException; import javax.management.MBeanServer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheExistsException; import org.apache.ignite.cache.CacheMode; @@ -147,6 +148,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.internal.IgniteComponentType.JTA; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.affinityNode; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.clientNode; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; /** @@ -154,6 +157,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearE */ @SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"}) public class GridCacheProcessor extends GridProcessorAdapter { + /** */ + private static final boolean startCaches = + IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false); + /** Null cache name. */ private static final String NULL_NAME = U.id8(UUID.randomUUID()); @@ -816,7 +823,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean loc = desc.locallyConfigured(); - if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { + if (loc || (desc.receivedOnDiscovery() && (CU.affinityNode(locNode, filter) || + startAllCachesOnClientStart()))) { boolean started = desc.onStart(); assert started : "Failed to change started flag for locally configured cache: " + desc; @@ -2165,6 +2173,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { batch.clientReconnect(reconnect); + if (!reconnect) + batch.startCaches(startAllCachesOnClientStart()); + //todo check // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same. batch.id(null); @@ -2172,6 +2183,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { return batch; } + /** + * @return {@code True} if need locally start all existing caches on client node start. + */ + private boolean startAllCachesOnClientStart() { + return startCaches && ctx.clientNode(); + } + /** {@inheritDoc} */ @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { if (data instanceof DynamicCacheChangeBatch) { @@ -2276,6 +2294,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); } } + + if (batch.startCaches()) { + for (Map.Entry entry : registeredCaches.entrySet()) + ctx.discovery().addClientNode(entry.getKey(), joiningNodeId, false); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index a96cc43..5c5a3c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -34,6 +34,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -59,6 +60,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh */ @GridToStringExclude public class GridClientPartitionTopology implements GridDhtPartitionTopology { + /** */ + private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING}; + /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -81,7 +85,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { private GridDhtPartitionFullMap node2part; /** Partition to node map. */ - private Map> part2node = new HashMap<>(); + private final Map> part2node = new HashMap<>(); /** */ private GridDhtPartitionExchangeId lastExchangeId; @@ -93,7 +97,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { private volatile boolean stopping; /** A future that will be completed when topology with version topVer will be ready to use. */ - private GridDhtTopologyFuture topReadyFut; + private volatile GridDhtTopologyFuture topReadyFut; /** */ private final GridAtomicLong updateSeq = new GridAtomicLong(1); @@ -225,16 +229,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public GridDhtTopologyFuture topologyVersionFuture() { - lock.readLock().lock(); - - try { - assert topReadyFut != null; + assert topReadyFut != null; - return topReadyFut; - } - finally { - lock.readLock().unlock(); - } + return topReadyFut; } /** {@inheritDoc} */ @@ -425,6 +422,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Nullable @Override public List nodes(int p, + AffinityAssignment affAssignment, + List affNodes) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public List nodes(int p, AffinityTopologyVersion topVer) { lock.readLock().lock(); @@ -503,7 +507,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List owners(int p, AffinityTopologyVersion topVer) { - return nodes(p, topVer, OWNING); + return nodes(p, topVer, OWNING, null); } /** {@inheritDoc} */ @@ -513,7 +517,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List moving(int p) { - return nodes(p, AffinityTopologyVersion.NONE, MOVING); + return nodes(p, AffinityTopologyVersion.NONE, MOVING, null); } /** @@ -522,7 +526,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { * @return List of nodes in state OWNING or MOVING. */ private List ownersAndMoving(int p, AffinityTopologyVersion topVer) { - return nodes(p, topVer, OWNING, MOVING); + return nodes(p, topVer, OWNING, MOVING_STATES); } /** {@inheritDoc} */ @@ -564,9 +568,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionsExchangeFuture exchFut, - GridDhtPartitionFullMap partMap, Map> cntrMap, Set partsToReload) { - + @Nullable @Override public GridDhtPartitionMap2 update( + @Nullable GridDhtPartitionsExchangeFuture exchFut, + GridDhtPartitionFullMap partMap, + Map> cntrMap, + Set partsToReload + ) { GridDhtPartitionExchangeId exchId = exchFut != null ? exchFut.exchangeId() : null; if (log.isDebugEnabled()) @@ -624,25 +631,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } - node2part = partMap; - - Map> p2n = new HashMap<>(); + part2node.clear(); for (Map.Entry e : partMap.entrySet()) { - for (Integer p : e.getValue().keySet()) { - Set ids = p2n.get(p); + for (Map.Entry e0 : e.getValue().entrySet()) { + if (e0.getValue() != MOVING && e0.getValue() != OWNING) + continue; + + int p = e0.getKey(); + + Set ids = part2node.get(p); if (ids == null) // Initialize HashSet to size 3 in anticipation that there won't be // more than 3 nodes per partitions. - p2n.put(p, ids = U.newHashSet(3)); + part2node.put(p, ids = U.newHashSet(3)); ids.add(e.getKey()); } } - part2node = p2n; - if (cntrMap != null) this.cntrMap = new HashMap<>(cntrMap); @@ -660,9 +668,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public GridDhtPartitionMap2 update( + @Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - Map> cntrMap) { + Map> cntrMap + ) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -716,18 +726,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { node2part.put(parts.nodeId(), parts); - part2node = new HashMap<>(part2node); - // Add new mappings. - for (Integer p : parts.keySet()) { + for (Map.Entry e : parts.entrySet()) { + int p = e.getKey(); + Set ids = part2node.get(p); - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); + if (e.getValue() == MOVING || e.getValue() == OWNING) { + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); - changed |= ids.add(parts.nodeId()); + changed |= ids.add(parts.nodeId()); + } + else { + if (ids != null) + changed |= ids.remove(parts.nodeId()); + } } // Remove obsolete mappings. @@ -862,8 +878,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { else node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); - part2node = new HashMap<>(part2node); - GridDhtPartitionMap2 parts = node2part.remove(nodeId); if (parts != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 37c3af9..2bef267 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; @@ -191,6 +192,14 @@ public interface GridDhtPartitionTopology { /** * @param p Partition ID. + * @param affAssignment Assignments. + * @param affNodes Node assigned for given partition by affinity. + * @return Collection of all nodes responsible for this partition with primary node being first. + */ + @Nullable public List nodes(int p, AffinityAssignment affAssignment, List affNodes); + + /** + * @param p Partition ID. * @return Collection of all nodes who {@code own} this partition. */ public List owners(int p); http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 36c1ae5..fb09b38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -69,6 +69,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Partition topology. */ @GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { + /** */ + private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING}; + /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -91,7 +94,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh private GridDhtPartitionFullMap node2part; /** Partition to node map. */ - private Map> part2node = new HashMap<>(); + private final Map> part2node; /** */ private GridDhtPartitionExchangeId lastExchangeId; @@ -106,7 +109,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh private volatile boolean stopping; /** A future that will be completed when topology with version topVer will be ready to use. */ - private GridDhtTopologyFuture topReadyFut; + private volatile GridDhtTopologyFuture topReadyFut; /** */ private final GridAtomicLong updateSeq = new GridAtomicLong(1); @@ -139,6 +142,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh log = cctx.logger(getClass()); locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions()); + + part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f); } /** {@inheritDoc} */ @@ -155,7 +160,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh try { node2part = null; - part2node = new HashMap<>(); + part2node.clear(); lastExchangeId = null; @@ -245,16 +250,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Override public GridDhtTopologyFuture topologyVersionFuture() { - lock.readLock().lock(); - - try { - assert topReadyFut != null; + assert topReadyFut != null; - return topReadyFut; - } - finally { - lock.readLock().unlock(); - } + return topReadyFut; } /** {@inheritDoc} */ @@ -753,6 +751,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (part != null && part.state().active()) list.add(part); } + return list; } @@ -827,11 +826,32 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } /** {@inheritDoc} */ + @Nullable @Override public List nodes(int p, + AffinityAssignment affAssignment, + List affNodes) { + return nodes0(p, affAssignment, affNodes); + } + + /** {@inheritDoc} */ @Override public List nodes(int p, AffinityTopologyVersion topVer) { AffinityAssignment affAssignment = cctx.affinity().assignment(topVer); List affNodes = affAssignment.get(p); + List nodes = nodes0(p, affAssignment, affNodes); + + return nodes != null ? nodes : affNodes; + } + + /** + * @param p Partition. + * @param affAssignment Assignments. + * @param affNodes Node assigned for given partition by affinity. + * @return Nodes responsible for given partition (primary is first). + */ + @Nullable private List nodes0(int p, AffinityAssignment affAssignment, List affNodes) { + AffinityTopologyVersion topVer = affAssignment.topologyVersion(); + lock.readLock().lock(); try { @@ -849,7 +869,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh for (UUID nodeId : nodeIds) { HashSet affIds = affAssignment.getIds(p); - if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) { + if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING)) { ClusterNode n = cctx.discovery().node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { @@ -865,7 +885,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } } - return nodes != null ? nodes : affNodes; + return nodes; } finally { lock.readLock().unlock(); @@ -927,7 +947,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (!cctx.rebalanceEnabled()) return ownersAndMoving(p, topVer); - return nodes(p, topVer, OWNING); + return nodes(p, topVer, OWNING, null); } /** {@inheritDoc} */ @@ -940,7 +960,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (!cctx.rebalanceEnabled()) return ownersAndMoving(p, AffinityTopologyVersion.NONE); - return nodes(p, AffinityTopologyVersion.NONE, MOVING); + return nodes(p, AffinityTopologyVersion.NONE, MOVING, null); } /** @@ -949,7 +969,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * @return List of nodes in state OWNING or MOVING. */ private List ownersAndMoving(int p, AffinityTopologyVersion topVer) { - return nodes(p, topVer, OWNING, MOVING); + return nodes(p, topVer, OWNING, MOVING_STATES); } /** {@inheritDoc} */ @@ -980,7 +1000,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update( + @Override public GridDhtPartitionMap2 update( @Nullable GridDhtPartitionsExchangeFuture exchFut, GridDhtPartitionFullMap partMap, @Nullable Map> cntrMap, @@ -1078,23 +1098,26 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh node2part = partMap; - Map> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f); + part2node.clear(); for (Map.Entry e : partMap.entrySet()) { - for (Integer p : e.getValue().keySet()) { - Set ids = p2n.get(p); + for (Map.Entry e0 : e.getValue().entrySet()) { + if (e0.getValue() != MOVING && e0.getValue() != OWNING) + continue; + + int p = e0.getKey(); + + Set ids = part2node.get(p); if (ids == null) // Initialize HashSet to size 3 in anticipation that there won't be // more than 3 nodes per partitions. - p2n.put(p, ids = U.newHashSet(3)); + part2node.put(p, ids = U.newHashSet(3)); ids.add(e.getKey()); } } - part2node = p2n; - boolean changed = false; AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); @@ -1273,18 +1296,24 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh node2part.put(parts.nodeId(), parts); - part2node = new HashMap<>(part2node); - // Add new mappings. - for (Integer p : parts.keySet()) { + for (Map.Entry e : parts.entrySet()) { + int p = e.getKey(); + Set ids = part2node.get(p); - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); + if (e.getValue() == MOVING || e.getValue() == OWNING) { + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); - changed |= ids.add(parts.nodeId()); + changed |= ids.add(parts.nodeId()); + } + else { + if (ids != null) + changed |= ids.remove(parts.nodeId()); + } } // Remove obsolete mappings. @@ -1607,7 +1636,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh List affNodes = aff.get(p); if (!affNodes.contains(cctx.localNode())) { - List nodes = nodes(p, topVer, OWNING); + List nodes = nodes(p, topVer, OWNING, null); Collection nodeIds = F.nodeIds(nodes); // If all affinity nodes are owners, then evict partition from local node. @@ -1740,12 +1769,12 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh ClusterNode oldest = discoCache.oldestAliveServerNode(); - assert oldest != null; + assert oldest != null || cctx.kernalContext().clientNode(); ClusterNode loc = cctx.localNode(); if (node2part != null) { - if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) { + if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) { updateSeq.setIfGreater(node2part.updateSequence()); node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(), @@ -1910,7 +1939,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { - X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); + X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.gridName() + + ", cache=" + cctx.name() + ']'); lock.readLock().lock(); @@ -1921,7 +1951,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (part == null) continue; - int size = part.size(); + int size = part.dataStore().size(); if (size >= threshold) X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java index 9837f69..534abb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java @@ -21,13 +21,12 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.UUID; -import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; @@ -54,7 +53,7 @@ public class GridDhtPartitionMap2 implements Comparable, E protected AffinityTopologyVersion top; /** */ - protected Map map; + protected GridPartitionStateMap map; /** */ private volatile int moving; @@ -85,7 +84,8 @@ public class GridDhtPartitionMap2 implements Comparable, E this.updateSeq = updateSeq; this.top = top; - map = U.newHashMap(m.size()); + map = new GridPartitionStateMap(m.size()); + //map = U.newHashMap(m.size()); for (Map.Entry e : m.entrySet()) { GridDhtPartitionState state = e.getValue(); @@ -105,7 +105,7 @@ public class GridDhtPartitionMap2 implements Comparable, E private GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top, - Map map, + GridPartitionStateMap map, int moving) { this.nodeId = nodeId; this.updateSeq = updateSeq; @@ -121,7 +121,7 @@ public class GridDhtPartitionMap2 implements Comparable, E return new GridDhtPartitionMap2(nodeId, updateSeq, top, - U.newHashMap(0), + new GridPartitionStateMap(0), 0); } @@ -243,25 +243,7 @@ public class GridDhtPartitionMap2 implements Comparable, E out.writeLong(updateSeq); - int size = map.size(); - - out.writeInt(size); - - int i = 0; - - for (Map.Entry entry : map.entrySet()) { - int ordinal = entry.getValue().ordinal(); - - assert ordinal == (ordinal & 0xFF); - assert entry.getKey() >= 0 && entry.getKey() <= CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey(); - - out.write(ordinal); - out.writeShort((short)(int)entry.getKey()); - - i++; - } - - assert i == size; + out.writeObject(map); if (top != null) { out.writeLong(topologyVersion().topologyVersion()); @@ -279,16 +261,13 @@ public class GridDhtPartitionMap2 implements Comparable, E updateSeq = in.readLong(); - int size = in.readInt(); - - map = U.newHashMap(size); - - for (int i = 0; i < size; i++) { - int ordinal = in.readByte() & 0xFF; + map = (GridPartitionStateMap)in.readObject(); - int part = in.readShort() & 0xFFFF; + Set> entries = map.entrySet(); - put(part, GridDhtPartitionState.fromOrdinal(ordinal)); + for (Map.Entry entry : entries) { + if (entry.getValue() == MOVING) + moving++; } long ver = in.readLong(); http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index b0d776f..078e67b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; @@ -73,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -110,6 +112,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter reqs; @@ -258,6 +265,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter(); @@ -535,6 +545,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter + * Null values are prohibited. + */ +public class GridPartitionStateMap extends AbstractMap implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Required bits to hold all state. Additional zero state is required as well. */ + private static final int BITS = Integer.SIZE - + Integer.numberOfLeadingZeros(GridDhtPartitionState.values().length + 1); + + /** */ + private final BitSet states; + + /** */ + private int size; + + /** {@inheritDoc} */ + @Override public Set> entrySet() { + return new AbstractSet>() { + @Override public Iterator> iterator() { + final int size = states.length() == 0 ? 0 : (states.length() - 1)/ BITS + 1; + + return new Iterator>() { + private int next; + private int cur; + + @Override public boolean hasNext() { + while(state(next) == null && next < size) + next++; + + return next < size; + } + + @Override public Entry next() { + cur = next; + next++; + + return new Entry() { + int p = cur; + + @Override public Integer getKey() { + return p; + } + + @Override public GridDhtPartitionState getValue() { + return state(p); + } + + @Override public GridDhtPartitionState setValue(GridDhtPartitionState val) { + return setState(p, val); + } + }; + } + + @Override public void remove() { + setState(cur, null); + } + }; + } + + @Override public int size() { + return GridPartitionStateMap.this.size(); + } + }; + } + + /** + * Default constructor. + */ + public GridPartitionStateMap() { + states = new BitSet(); + } + + /** + * @param parts Partitions to hold. + */ + public GridPartitionStateMap(int parts) { + states = new BitSet(parts); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionState put(Integer key, GridDhtPartitionState val) { + assert val != null; + + return setState(key, val); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionState get(Object key) { + return state((Integer)key); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionState remove(Object key) { + return setState((Integer)key, null); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(Object key) { + return state((Integer)key) != null; + } + + /** {@inheritDoc} */ + @Override public int size() { + return size; + } + + /** */ + private GridDhtPartitionState setState(int part, GridDhtPartitionState st) { + GridDhtPartitionState old = state(part); + + if (old == st) + return old; + + int off = part * BITS; + + int ist = st == null ? 0 : st.ordinal() + 1; // Reserve all zero bits for empty value + + for (int i = 0; i < BITS; i++) { + states.set(off + i, (ist & 1) == 1); + + ist >>>= 1; + } + + size += (st == null ? -1 : old == null ? 1 : 0); + + return old; + } + + /** */ + private GridDhtPartitionState state(int part) { + int off = part * BITS; + + int st = 0; + + for (int i = 0; i < BITS; i++) + st |= ((states.get(off + i) ? 1 : 0) << i); + + return st == 0 ? null : GridDhtPartitionState.values()[st - 1]; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index d812c21..47498dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2784,10 +2784,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Try to connect first on bound addresses. if (isRmtAddrsExist) { - List addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort)); - boolean sameHost = U.sameMacs(getSpiContext().localNode(), node); + List addrs0; + + Collection socketAddrs = U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort); + + if (sameHost) + addrs0 = new ArrayList<>(socketAddrs); + else { + addrs0 = new ArrayList<>(socketAddrs.size()); + + for (InetSocketAddress addr0 : socketAddrs) { + if (!addr0.getAddress().isLoopbackAddress()) + addrs0.add(addr0); + } + } + Collections.sort(addrs0, U.inetAddressesComparator(sameHost)); addrs = new LinkedHashSet<>(addrs0);