Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 36DBA18CF6 for ; Tue, 21 Jul 2015 13:59:28 +0000 (UTC) Received: (qmail 1274 invoked by uid 500); 21 Jul 2015 13:59:28 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 1243 invoked by uid 500); 21 Jul 2015 13:59:28 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 1232 invoked by uid 99); 21 Jul 2015 13:59:28 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Jul 2015 13:59:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 96E8A18AB83 for ; Tue, 21 Jul 2015 13:59:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.79 X-Spam-Level: * X-Spam-Status: No, score=1.79 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id i4BHlAGPnmG7 for ; Tue, 21 Jul 2015 13:59:12 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 9728323117 for ; Tue, 21 Jul 2015 13:59:03 +0000 (UTC) Received: (qmail 91942 invoked by uid 99); 21 Jul 2015 13:57:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Jul 2015 13:57:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 09C31E0F7C; Tue, 21 Jul 2015 13:57:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 21 Jul 2015 13:57:48 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/6] incubator-ignite git commit: #ignite-1109: Add consistentId to IgniteConfiguration. Repository: incubator-ignite Updated Branches: refs/heads/ignite-1090 1617a99da -> 869ac6cf9 #ignite-1109: Add consistentId to IgniteConfiguration. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fa21d8c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fa21d8c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fa21d8c0 Branch: refs/heads/ignite-1090 Commit: fa21d8c0272887bf46a35b8ba333b90707e41d3c Parents: 83bba05 Author: ivasilinets Authored: Tue Jul 21 14:14:31 2015 +0300 Committer: ivasilinets Committed: Tue Jul 21 14:14:31 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 3 +- .../AffinityNodeAddressHashResolver.java | 8 +- .../affinity/AffinityNodeHashResolver.java | 5 + .../affinity/AffinityNodeIdHashResolver.java | 6 + .../rendezvous/RendezvousAffinityFunction.java | 22 +++- .../configuration/CacheConfiguration.java | 4 +- .../configuration/IgniteConfiguration.java | 26 +++++ .../apache/ignite/internal/IgniteKernal.java | 3 + .../ignite/internal/IgniteNodeAttributes.java | 3 + .../processors/cache/GridCacheAttributes.java | 8 +- .../processors/cache/GridCacheProcessor.java | 113 ++++++++++++------- .../processors/cache/IgniteInternalCache.java | 3 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 3 +- .../tcp/internal/TcpDiscoveryNode.java | 18 ++- ...DiscoveryNodeConfigConsistentIdSelfTest.java | 76 +++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 1 + 16 files changed, 244 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 4938ab1..fd0112c 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -34,6 +34,7 @@ import javax.cache.event.*; import javax.cache.expiry.*; import javax.cache.integration.*; import javax.cache.processor.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; @@ -571,7 +572,7 @@ public interface IgniteCache extends javax.cache.Cache, IgniteAsyncS * the left nodes, and that nodes are restarted before * {@link CacheConfiguration#getRebalanceDelay() rebalanceDelay} expires. To place nodes * on the same place in consistent hash ring, use - * {@link RendezvousAffinityFunction#setHashIdResolver(AffinityNodeHashResolver)} to make sure that + * {@link IgniteConfiguration#setConsistentId(Serializable)} to make sure that * a node maps to the same hash ID if re-started. *

* See {@link CacheConfiguration#getRebalanceDelay()} for more information on how to configure http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeAddressHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeAddressHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeAddressHashResolver.java index 7ce49ec..533174f 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeAddressHashResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeAddressHashResolver.java @@ -18,11 +18,17 @@ package org.apache.ignite.cache.affinity; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; + /** - * Node hash resolver which uses {@link org.apache.ignite.cluster.ClusterNode#consistentId()} as alternate hash value. + * Node hash resolver which uses {@link ClusterNode#consistentId()} as alternate hash value. + * + * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead. */ +@Deprecated public class AffinityNodeAddressHashResolver implements AffinityNodeHashResolver { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeHashResolver.java index d00b043..f990951 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeHashResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeHashResolver.java @@ -18,6 +18,7 @@ package org.apache.ignite.cache.affinity; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import java.io.*; @@ -31,7 +32,10 @@ import java.io.*; * Note that on case clients exist they will query this object from the server and use it for affinity calculation. * Therefore you must ensure that server and clients can marshal and unmarshal this object in portable format, * i.e. all parties have object class(es) configured as portable. + * + * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead. */ +@Deprecated public interface AffinityNodeHashResolver extends Serializable { /** * Resolve alternate hash value for the given Grid node. @@ -39,5 +43,6 @@ public interface AffinityNodeHashResolver extends Serializable { * @param node Grid node. * @return Resolved hash ID. */ + @Deprecated public Object resolve(ClusterNode node); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeIdHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeIdHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeIdHashResolver.java index 65c6f13..2580c69 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeIdHashResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityNodeIdHashResolver.java @@ -18,12 +18,18 @@ package org.apache.ignite.cache.affinity; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; + /** * Node hash resolver which uses generated node ID as node hash value. As new node ID is generated * on each node start, this resolver do not provide ability to map keys to the same nodes after restart. + * + * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead. */ +@Deprecated public class AffinityNodeIdHashResolver implements AffinityNodeHashResolver { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index 2b26630..6736c4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -20,6 +20,7 @@ package org.apache.ignite.cache.affinity.rendezvous; import org.apache.ignite.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -88,7 +89,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza private IgniteBiPredicate backupFilter; /** Hash ID resolver. */ - private AffinityNodeHashResolver hashIdRslvr = new AffinityNodeAddressHashResolver(); + private AffinityNodeHashResolver hashIdRslvr = null; /** Ignite instance. */ @IgniteInstanceResource @@ -204,6 +205,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * * @return Hash ID resolver. */ + @Deprecated public AffinityNodeHashResolver getHashIdResolver() { return hashIdRslvr; } @@ -219,7 +221,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * repartitioning. * * @param hashIdRslvr Hash ID resolver. + * + * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead. */ + @Deprecated public void setHashIdResolver(AffinityNodeHashResolver hashIdRslvr) { this.hashIdRslvr = hashIdRslvr; } @@ -273,6 +278,19 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza } /** + * Resolves node hash. + * + * @param node Cluster node; + * @return Node hash. + */ + public Object resolveNodeHash(ClusterNode node) { + if (hashIdRslvr != null) + return hashIdRslvr.resolve(node); + else + return node.consistentId(); + } + + /** * Returns collection of nodes (primary first) for specified partition. */ public List assignPartition(int part, List nodes, int backups, @@ -285,7 +303,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza MessageDigest d = digest.get(); for (ClusterNode node : nodes) { - Object nodeHash = hashIdRslvr.resolve(node); + Object nodeHash = resolveNodeHash(node); try { ByteArrayOutputStream out = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 57f1e9d..3ad0f01 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -20,7 +20,6 @@ package org.apache.ignite.configuration; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cache.eviction.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.cache.store.*; @@ -33,6 +32,7 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.configuration.*; import javax.cache.expiry.*; +import java.io.*; import java.util.*; /** @@ -1310,7 +1310,7 @@ public class CacheConfiguration extends MutableConfiguration { * For better efficiency user should usually make sure that new nodes get placed on * the same place of consistent hash ring as the left nodes, and that nodes are * restarted before this delay expires. To place nodes on the same place in consistent hash ring, - * use {@link RendezvousAffinityFunction#setHashIdResolver(AffinityNodeHashResolver)} + * use {@link IgniteConfiguration#setConsistentId(Serializable)} * to make sure that a node maps to the same hash ID event if restarted. As an example, * node IP address and port combination may be used in this case. *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 2d36c7a..823ddcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -59,6 +59,7 @@ import javax.cache.expiry.*; import javax.cache.integration.*; import javax.cache.processor.*; import javax.management.*; +import java.io.*; import java.lang.management.*; import java.util.*; @@ -400,6 +401,9 @@ public class IgniteConfiguration { /** Cache store session listeners. */ private Factory[] storeSesLsnrs; + /** Consistent globally unique node ID which survives node restarts. */ + private Serializable consistentId; + /** * Creates valid grid configuration with all default values. */ @@ -442,6 +446,7 @@ public class IgniteConfiguration { clientMode = cfg.isClientMode(); clockSyncFreq = cfg.getClockSyncFrequency(); clockSyncSamples = cfg.getClockSyncSamples(); + consistentId = cfg.getConsistentId(); deployMode = cfg.getDeploymentMode(); discoStartupDelay = cfg.getDiscoveryStartupDelay(); pubPoolSize = cfg.getPublicThreadPoolSize(); @@ -558,6 +563,27 @@ public class IgniteConfiguration { } /** + * Sets consistent globally unique node ID which survives node restarts. + * + * @param consistentId Node consistent ID. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setConsistentId(Serializable consistentId) { + this.consistentId = consistentId; + + return this; + } + + /** + * Gets consistent globally unique node ID which survives node restarts. + * + * @return Node consistent ID. + */ + public Serializable getConsistentId() { + return consistentId; + } + + /** * Should return any user-defined attributes to be added to this node. These attributes can * then be accessed on nodes by calling {@link ClusterNode#attribute(String)} or * {@link ClusterNode#attributes()} methods. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c411f2e..d2f018a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1173,6 +1173,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)); + if (cfg.getConsistentId() != null) + add(ATTR_NODE_CONSISTENT_ID, cfg.getConsistentId()); + // Build a string from JVM arguments, because parameters with spaces are split. SB jvmArgs = new SB(512); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 928db5e..10b8df0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -132,6 +132,9 @@ public final class IgniteNodeAttributes { /** Configuration consistency check disabled flag. */ public static final String ATTR_CONSISTENCY_CHECK_SKIPPED = ATTR_PREFIX + ".consistency.check.skipped"; + /** Node consistent id. */ + public static final String ATTR_NODE_CONSISTENT_ID = ATTR_PREFIX + ".consistent.id"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java index 78c4722..389c0fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java @@ -145,8 +145,12 @@ public class GridCacheAttributes implements Serializable { public String affinityHashIdResolverClassName() { AffinityFunction aff = ccfg.getAffinity(); - if (aff instanceof RendezvousAffinityFunction) - return className(((RendezvousAffinityFunction)aff).getHashIdResolver()); + if (aff instanceof RendezvousAffinityFunction) { + if (((RendezvousAffinityFunction) aff).getHashIdResolver() == null) + return null; + + return className(((RendezvousAffinityFunction) aff).getHashIdResolver()); + } return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index bda0485..f5ccaec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -144,11 +144,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param internalCache Internal cache flag. * @param cfg Initializes cache configuration with proper defaults. * @param cacheObjCtx Cache object context. * @throws IgniteCheckedException If configuration is not valid. */ - private void initialize(CacheConfiguration cfg, CacheObjectContext cacheObjCtx) throws IgniteCheckedException { + private void initialize(boolean internalCache, CacheConfiguration cfg, CacheObjectContext cacheObjCtx) + throws IgniteCheckedException { if (cfg.getCacheMode() == null) cfg.setCacheMode(DFLT_CACHE_MODE); @@ -162,14 +164,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cfg.getCacheMode() == PARTITIONED) { RendezvousAffinityFunction aff = new RendezvousAffinityFunction(); - aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); + if (internalCache) + aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); cfg.setAffinity(aff); } else if (cfg.getCacheMode() == REPLICATED) { RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 512); - aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); + if (internalCache) + aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); cfg.setAffinity(aff); @@ -183,7 +187,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cfg.getAffinity() instanceof RendezvousAffinityFunction) { RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity(); - if (aff.getHashIdResolver() == null) + if (internalCache && aff.getHashIdResolver() == null) aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); } } @@ -551,27 +555,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, new CustomEventListener() { - @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) { - onCacheChangeRequested(msg); - } - }); - - // Internal caches which should not be returned to user. - Set internalCaches = new HashSet<>(); - - FileSystemConfiguration[] igfsCfgs = ctx.grid().configuration().getFileSystemConfiguration(); - - if (igfsCfgs != null) { - for (FileSystemConfiguration igfsCfg : igfsCfgs) { - internalCaches.add(maskNull(igfsCfg.getMetaCacheName())); - internalCaches.add(maskNull(igfsCfg.getDataCacheName())); - } - } + @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) { + onCacheChangeRequested(msg); + } + }); - if (IgniteComponentType.HADOOP.inClassPath()) - internalCaches.add(CU.SYS_CACHE_HADOOP_MR); - - internalCaches.add(CU.ATOMICS_CACHE_NAME); + Set internalCaches = internalCachesNames(); CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); @@ -589,7 +578,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); // Initialize defaults. - initialize(cfg, cacheObjCtx); + initialize(internalCaches.contains(maskNull(cfg.getName())), cfg, cacheObjCtx); cfgs[i] = cfg; // Replace original configuration value. @@ -671,6 +660,30 @@ public class GridCacheProcessor extends GridProcessorAdapter { log.debug("Started cache processor."); } + /** + * @return Internal caches names. + */ + private Set internalCachesNames() { + // Internal caches which should not be returned to user. + Set internalCaches = new HashSet<>(); + + FileSystemConfiguration[] igfsCfgs = ctx.grid().configuration().getFileSystemConfiguration(); + + if (igfsCfgs != null) { + for (FileSystemConfiguration igfsCfg : igfsCfgs) { + internalCaches.add(maskNull(igfsCfg.getMetaCacheName())); + internalCaches.add(maskNull(igfsCfg.getDataCacheName())); + } + } + + if (IgniteComponentType.HADOOP.inClassPath()) + internalCaches.add(CU.SYS_CACHE_HADOOP_MR); + + internalCaches.add(CU.ATOMICS_CACHE_NAME); + + return internalCaches; + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { @@ -680,7 +693,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (CU.isMarshallerCache(ccfg.getName())) { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - initialize(ccfg, cacheObjCtx); + initialize(internalCachesNames().contains(maskNull(ccfg.getName())), ccfg, cacheObjCtx); GridCacheContext ctx = createCache(ccfg, null, CacheType.MARSHALLER, cacheObjCtx, true); @@ -2133,7 +2146,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); - initialize(cfg, cacheObjCtx); + initialize(false, cfg, cacheObjCtx); req.startCacheConfiguration(cfg); } @@ -2475,25 +2488,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cfg.getAffinity() instanceof RendezvousAffinityFunction) { RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity(); - AffinityNodeHashResolver hashIdRslvr = aff.getHashIdResolver(); - - assert hashIdRslvr != null; - - Object nodeHashObj = hashIdRslvr.resolve(node); + Object nodeHashObj = aff.resolveNodeHash(node); for (ClusterNode topNode : ctx.discovery().allNodes()) { - Object topNodeHashObj = hashIdRslvr.resolve(topNode); + Object topNodeHashObj = aff.resolveNodeHash(topNode); if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) { + String hashIdRslvrName = ""; + + if (aff.getHashIdResolver() != null) + hashIdRslvrName = ", hashIdResolverClass=" + + aff.getHashIdResolver().getClass().getName(); + String errMsg = "Failed to add node to topology because it has the same hash code for " + - "partitioned affinity as one of existing nodes [cacheName=" + U.maskName(cfg.getName()) + - ", hashIdResolverClass=" + hashIdRslvr.getClass().getName() + - ", existingNodeId=" + topNode.id() + ']'; + "partitioned affinity as one of existing nodes [cacheName=" + + U.maskName(cfg.getName()) + hashIdRslvrName + ", existingNodeId=" + topNode.id() + ']'; String sndMsg = "Failed to add node to topology because it has the same hash code for " + - "partitioned affinity as one of existing nodes [cacheName=" + U.maskName(cfg.getName()) + - ", hashIdResolverClass=" + hashIdRslvr.getClass().getName() + ", existingNodeId=" + - topNode.id() + ']'; + "partitioned affinity as one of existing nodes [cacheName=" + + U.maskName(cfg.getName()) + hashIdRslvrName + ", existingNodeId=" + topNode.id() + ']'; return new IgniteNodeValidationResult(topNode.id(), errMsg, sndMsg); } @@ -2619,10 +2632,24 @@ public class GridCacheProcessor extends GridProcessorAdapter { "Affinity key backups", locAttr.affinityKeyBackups(), rmtAttr.affinityKeyBackups(), true); - CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity.hashIdResolver", - "Partitioned cache affinity hash ID resolver class", - locAttr.affinityHashIdResolverClassName(), rmtAttr.affinityHashIdResolverClassName(), - true); + String locHashIdResolver = locAttr.affinityHashIdResolverClassName(); + String rmtHashIdResolver = rmtAttr.affinityHashIdResolverClassName(); + String defHashIdResolver = AffinityNodeAddressHashResolver.class.getName(); + + if (!((locHashIdResolver == null && rmtHashIdResolver == null) || + (locHashIdResolver == null && rmtHashIdResolver.equals(defHashIdResolver)) || + (rmtHashIdResolver == null && locHashIdResolver.equals(defHashIdResolver)))) { + + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity.hashIdResolver", + "Partitioned cache affinity hash ID resolver class", + locHashIdResolver, rmtHashIdResolver, true); + } + + if (locHashIdResolver == null && + (rmtHashIdResolver != null && rmtHashIdResolver.equals(defHashIdResolver))) { + U.warn(log, "Set " + RendezvousAffinityFunction.class + " with " + defHashIdResolver + + " to CacheConfiguration to start node [cacheName=" + rmtAttr.cacheName() + "]"); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 7396c84..42c648b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -34,6 +34,7 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.expiry.*; import javax.cache.processor.*; +import java.io.*; import java.sql.*; import java.util.*; import java.util.Date; @@ -1456,7 +1457,7 @@ public interface IgniteInternalCache extends Iterable> { * the left nodes, and that nodes are restarted before * {@link CacheConfiguration#getRebalanceDelay() rebalanceDelay} expires. To place nodes * on the same place in consistent hash ring, use - * {@link org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction#setHashIdResolver(AffinityNodeHashResolver)} to make sure that + * {@link IgniteConfiguration#setConsistentId(Serializable)} to make sure that * a node maps to the same hash ID if re-started. *

* See {@link org.apache.ignite.configuration.CacheConfiguration#getRebalanceDelay()} for more information on how to configure http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 431d198..b7d6e3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -878,7 +878,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T addrs.get2(), srvPort, metricsProvider, - locNodeVer); + locNodeVer, + ignite.configuration().getConsistentId()); if (addExtAddrAttr) { Collection extAddrs = addrRslvr == null ? null : http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 032cf01..142dbea 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -131,11 +131,15 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste * @param discPort Port. * @param metricsProvider Metrics provider. * @param ver Version. + * @param consistentId Node consistent ID. */ public TcpDiscoveryNode(UUID id, Collection addrs, - Collection hostNames, int discPort, - DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver) + Collection hostNames, + int discPort, + DiscoveryMetricsProvider metricsProvider, + IgniteProductVersion ver, + Serializable consistentId) { assert id != null; assert !F.isEmpty(addrs); @@ -145,6 +149,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste this.id = id; List sortedAddrs = new ArrayList<>(addrs); + Collections.sort(sortedAddrs); this.addrs = sortedAddrs; @@ -153,7 +158,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste this.metricsProvider = metricsProvider; this.ver = ver; - consistentId = U.consistentId(sortedAddrs, discPort); + this.consistentId = consistentId != null ? consistentId : U.consistentId(sortedAddrs, discPort); metrics = metricsProvider.metrics(); cacheMetrics = metricsProvider.cacheMetrics(); @@ -452,7 +457,8 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste * @return Copy of local node for client reconnect request. */ public TcpDiscoveryNode clientReconnectNode() { - TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver); + TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver, + null); node.attrs = attrs; node.clientRouterNodeId = clientRouterNodeId; @@ -522,7 +528,9 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste sockAddrs = U.toSocketAddresses(this, discPort); - consistentId = U.consistentId(addrs, discPort); + Object consistentIdAttr = attrs.get(ATTR_NODE_CONSISTENT_ID); + + consistentId = consistentIdAttr != null ? consistentIdAttr : U.consistentId(addrs, discPort); // Cluster metrics byte[] mtr = U.readByteArray(in); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeConfigConsistentIdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeConfigConsistentIdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeConfigConsistentIdSelfTest.java new file mode 100644 index 0000000..6abe1d6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeConfigConsistentIdSelfTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; + +/** + * Test for {@link IgniteConfiguration#consistentId}. + */ +public class TcpDiscoveryNodeConfigConsistentIdSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost("0.0.0.0"); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testConsistentId() throws Exception { + Object id0 = grid(0).localNode().consistentId(); + Serializable id1 = grid(0).configuration().getConsistentId(); + + assertEquals(id0, id1); + assertEquals(grid(0).name(), id0); + assertEquals(id0, grid(1).cluster().forRemotes().node().consistentId()); + + for (int i = 0; i < 4; ++i) { + stopAllGrids(); + + startGrids(2); + + assertEquals(id0, grid(0).localNode().consistentId()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa21d8c0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 6f59f14..b7014ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -56,6 +56,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class)); suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class)); + suite.addTest(new TestSuite(TcpDiscoveryNodeConfigConsistentIdSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryRestartTest.class)); suite.addTest(new TestSuite(TcpDiscoveryMultiThreadedTest.class));