Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5799010607 for ; Mon, 22 Apr 2013 14:43:06 +0000 (UTC) Received: (qmail 58079 invoked by uid 500); 22 Apr 2013 14:43:04 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 58009 invoked by uid 500); 22 Apr 2013 14:43:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 57662 invoked by uid 99); 22 Apr 2013 14:43:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Apr 2013 14:43:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2481081D718; Mon, 22 Apr 2013 14:43:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Mon, 22 Apr 2013 14:43:09 -0000 Message-Id: <8a112377478b47be876a4db0f27a5bd8@git.apache.org> In-Reply-To: <5e0833f3ab53442cadb417bbcedb3e0b@git.apache.org> References: <5e0833f3ab53442cadb417bbcedb3e0b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/14] git commit: Fix primary range ignores replication strategy patch by yukim; reviewed by jbellis for CASSANDRA-5424 Fix primary range ignores replication strategy patch by yukim; reviewed by jbellis for CASSANDRA-5424 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/debd8f01 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/debd8f01 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/debd8f01 Branch: refs/heads/trunk Commit: debd8f0176038f6933978eb9b4f8ee6adc07b541 Parents: 4b2923a Author: Yuki Morishita Authored: Mon Apr 15 22:01:37 2013 -0500 Committer: Yuki Morishita Committed: Fri Apr 19 16:31:22 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/locator/NetworkTopologyStrategy.java | 3 +- .../apache/cassandra/service/StorageService.java | 46 +++- .../service/AntiEntropyServiceTestAbstract.java | 2 +- .../service/StorageServiceServerTest.java | 187 ++++++++++++++- 5 files changed, 223 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8c4cced..eff7c49 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ * Fix preparing statements when current keyspace is not set (CASSANDRA-5468) * Fix SemanticVersion.isSupportedBy minor/patch handling (CASSANDRA-5496) * Don't provide oldCfId for post-1.1 system cfs (CASSANDRA-5490) + * Fix primary range ignores replication strategy (CASSANDRA-5424) Merged from 1.1 * Add retry mechanism to OTC for non-droppable_verbs (CASSANDRA-5393) * Use allocator information to improve memtable memory usage estimate http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index 91478e8..d354019 100644 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@ -78,7 +78,8 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy @SuppressWarnings("serial") public List calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata) { - Set replicas = new HashSet(); + // we want to preserve insertion order so that the first added endpoint becomes primary + Set replicas = new LinkedHashSet(); // replicas we have found in each DC Map> dcReplicas = new HashMap>(datacenters.size()) {{ http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 658f583..b7bf6f4 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -145,9 +145,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return getRangesForEndpoint(table, FBUtilities.getBroadcastAddress()); } - public Collection> getLocalPrimaryRanges() + public Collection> getLocalPrimaryRanges(String keyspace) { - return getPrimaryRangesForEndpoint(FBUtilities.getBroadcastAddress()); + return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddress()); } @Deprecated @@ -2336,13 +2336,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies) { - final Collection> ranges = primaryRange ? getLocalPrimaryRanges() : getLocalRanges(keyspace); + final Collection> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies); } public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final Collection> ranges, final String... columnFamilies) { - if (Table.SYSTEM_KS.equals(keyspace) || Tracing.TRACE_KS.equals(keyspace)) + if (Table.SYSTEM_KS.equals(keyspace) || Tracing.TRACE_KS.equals(keyspace) || ranges.isEmpty()) return 0; final int cmd = nextRepairCommand.incrementAndGet(); @@ -2377,7 +2377,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException { - forceTableRepairRange(tableName, getLocalPrimaryRanges(), isSequential, isLocal, columnFamilies); + forceTableRepairRange(tableName, getLocalPrimaryRanges(tableName), isSequential, isLocal, columnFamilies); } public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException @@ -2505,17 +2505,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } /** - * Get the primary ranges for the specified endpoint. + * Get the "primary ranges" for the specified keyspace and endpoint. + * "Primary ranges" are the ranges that the node is responsible for storing replica primarily. + * The node that stores replica primarily is defined as the first node returned + * by {@link AbstractReplicationStrategy#calculateNaturalEndpoints}. + * + * @param keyspace * @param ep endpoint we are interested in. - * @return collection of ranges for the specified endpoint. + * @return primary ranges for the specified endpoint. */ - public Collection> getPrimaryRangesForEndpoint(InetAddress ep) + public Collection> getPrimaryRangesForEndpoint(String keyspace, InetAddress ep) { - return tokenMetadata.getPrimaryRangesFor(tokenMetadata.getTokens(ep)); + AbstractReplicationStrategy strategy = Table.open(keyspace).getReplicationStrategy(); + Collection> primaryRanges = new HashSet>(); + TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); + for (Token token : metadata.sortedTokens()) + { + List endpoints = strategy.calculateNaturalEndpoints(token, metadata); + if (endpoints.size() > 0 && endpoints.get(0).equals(ep)) + primaryRanges.add(new Range(metadata.getPredecessor(token), token)); + } + return primaryRanges; } /** - * Get the primary range for the specified endpoint. + * Previously, primary range is the range that the node is responsible for and calculated + * only from the token assigned to the node. + * But this does not take replication strategy into account, and therefore returns insufficient + * range especially using NTS with replication only to certain DC(see CASSANDRA-5424). + * + * @deprecated * @param ep endpoint we are interested in. * @return range for the specified endpoint. */ @@ -3834,8 +3853,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public List sampleKeyRange() // do not rename to getter - see CASSANDRA-4452 for details { List keys = new ArrayList(); - for (Range range : getLocalPrimaryRanges()) - keys.addAll(keySamples(ColumnFamilyStore.allUserDefined(), range)); + for (Table keyspace : Table.nonSystem()) + { + for (Range range : getPrimaryRangesForEndpoint(keyspace.name, FBUtilities.getBroadcastAddress())) + keys.addAll(keySamples(keyspace.getColumnFamilyStores(), range)); + } List sampledKeys = new ArrayList(keys.size()); for (DecoratedKey key : keys) http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java index ef7a2ab..7124ecb 100644 --- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java +++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java @@ -102,7 +102,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Gossiper.instance.initializeNodeUnsafe(REMOTE, UUID.randomUUID(), 1); - local_range = StorageService.instance.getLocalPrimaryRange(); + local_range = StorageService.instance.getPrimaryRangesForEndpoint(tablename, LOCAL).iterator().next(); // (we use REMOTE instead of LOCAL so that the reponses for the validator.complete() get lost) request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, new CFPair(tablename, cfname)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java index 39fbb4a..5ce9160 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java @@ -21,18 +21,28 @@ package org.apache.cassandra.service; import java.io.File; import java.io.IOException; -import java.util.Collections; -import java.util.List; +import java.net.InetAddress; +import java.util.*; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.StringToken; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.Token; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.PropertyFileSnitch; +import org.apache.cassandra.locator.TokenMetadata; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -40,6 +50,13 @@ import static org.junit.Assert.assertTrue; @RunWith(OrderedJUnit4ClassRunner.class) public class StorageServiceServerTest { + @BeforeClass + public static void setUp() throws ConfigurationException + { + IEndpointSnitch snitch = new PropertyFileSnitch(); + DatabaseDescriptor.setEndpointSnitch(snitch); + } + @Test public void testRegularMode() throws IOException, InterruptedException, ConfigurationException { @@ -79,4 +96,170 @@ public class StorageServiceServerTest StorageService.instance.takeColumnFamilySnapshot(Table.SYSTEM_KS, "Schema", "cf_snapshot"); } + @Test + public void testPrimaryRangesWithNetworkTopologyStrategy() throws Exception + { + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.clearUnsafe(); + // DC1 + metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + // DC2 + metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + + Map configOptions = new HashMap(); + configOptions.put("DC1", "1"); + configOptions.put("DC2", "1"); + + Table.clear("Keyspace1"); + KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false); + Schema.instance.setTableDefinition(meta); + + Collection> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1")); + assert primaryRanges.size() == 1; + assert primaryRanges.contains(new Range(new StringToken("D"), new StringToken("A"))); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2")); + assert primaryRanges.size() == 1; + assert primaryRanges.contains(new Range(new StringToken("B"), new StringToken("C"))); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4")); + assert primaryRanges.size() == 1; + assert primaryRanges.contains(new Range(new StringToken("A"), new StringToken("B"))); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5")); + assert primaryRanges.size() == 1; + assert primaryRanges.contains(new Range(new StringToken("C"), new StringToken("D"))); + } + + @Test + public void testPrimaryRangesWithNetworkTopologyStrategyOneDCOnly() throws Exception + { + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.clearUnsafe(); + // DC1 + metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + // DC2 + metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + + Map configOptions = new HashMap(); + configOptions.put("DC2", "2"); + + Table.clear("Keyspace1"); + KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false); + Schema.instance.setTableDefinition(meta); + + // endpoints in DC1 should not have primary range + Collection> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1")); + assert primaryRanges.isEmpty(); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2")); + assert primaryRanges.isEmpty(); + + // endpoints in DC2 should have primary ranges which also cover DC1 + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4")); + assert primaryRanges.size() == 2; + assert primaryRanges.contains(new Range(new StringToken("D"), new StringToken("A"))); + assert primaryRanges.contains(new Range(new StringToken("A"), new StringToken("B"))); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5")); + assert primaryRanges.size() == 2; + assert primaryRanges.contains(new Range(new StringToken("C"), new StringToken("D"))); + assert primaryRanges.contains(new Range(new StringToken("B"), new StringToken("C"))); + } + + @Test + public void testPrimaryRangesWithVnodes() throws Exception + { + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.clearUnsafe(); + // DC1 + Multimap dc1 = HashMultimap.create(); + dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A")); + dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E")); + dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H")); + dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C")); + dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I")); + dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J")); + metadata.updateNormalTokens(dc1); + // DC2 + Multimap dc2 = HashMultimap.create(); + dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B")); + dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G")); + dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L")); + dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D")); + dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F")); + dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K")); + metadata.updateNormalTokens(dc2); + + Map configOptions = new HashMap(); + configOptions.put("DC2", "2"); + + Table.clear("Keyspace1"); + KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false); + Schema.instance.setTableDefinition(meta); + + // endpoints in DC1 should not have primary range + Collection> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1")); + assert primaryRanges.isEmpty(); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2")); + assert primaryRanges.isEmpty(); + + // endpoints in DC2 should have primary ranges which also cover DC1 + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4")); + assert primaryRanges.size() == 4; + assert primaryRanges.contains(new Range(new StringToken("A"), new StringToken("B"))); + assert primaryRanges.contains(new Range(new StringToken("F"), new StringToken("G"))); + assert primaryRanges.contains(new Range(new StringToken("K"), new StringToken("L"))); + // because /127.0.0.4 holds token "B" which is the next to token "A" from /127.0.0.1, + // the node covers range (L, A] + assert primaryRanges.contains(new Range(new StringToken("L"), new StringToken("A"))); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5")); + assert primaryRanges.size() == 8; + assert primaryRanges.contains(new Range(new StringToken("C"), new StringToken("D"))); + assert primaryRanges.contains(new Range(new StringToken("E"), new StringToken("F"))); + assert primaryRanges.contains(new Range(new StringToken("J"), new StringToken("K"))); + // ranges from /127.0.0.1 + assert primaryRanges.contains(new Range(new StringToken("D"), new StringToken("E"))); + // the next token to "H" in DC2 is "K" in /127.0.0.5, so (G, H] goes to /127.0.0.5 + assert primaryRanges.contains(new Range(new StringToken("G"), new StringToken("H"))); + // ranges from /127.0.0.2 + assert primaryRanges.contains(new Range(new StringToken("B"), new StringToken("C"))); + assert primaryRanges.contains(new Range(new StringToken("H"), new StringToken("I"))); + assert primaryRanges.contains(new Range(new StringToken("I"), new StringToken("J"))); + } + @Test + public void testPrimaryRangesWithSimpleStrategy() throws Exception + { + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.clearUnsafe(); + + metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3")); + + Map configOptions = new HashMap(); + configOptions.put("replication_factor", "2"); + + Table.clear("Keyspace1"); + KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "SimpleStrategy", configOptions, false); + Schema.instance.setTableDefinition(meta); + + Collection> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1")); + assert primaryRanges.size() == 1; + assert primaryRanges.contains(new Range(new StringToken("C"), new StringToken("A"))); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2")); + assert primaryRanges.size() == 1; + assert primaryRanges.contains(new Range(new StringToken("A"), new StringToken("B"))); + + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.3")); + assert primaryRanges.size() == 1; + assert primaryRanges.contains(new Range(new StringToken("B"), new StringToken("C"))); + } }