Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8F40C11576 for ; Sat, 3 May 2014 03:49:54 +0000 (UTC) Received: (qmail 68475 invoked by uid 500); 3 May 2014 03:49:54 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 68204 invoked by uid 500); 3 May 2014 03:49:53 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 68129 invoked by uid 99); 3 May 2014 03:49:50 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 May 2014 03:49:50 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9321A82AD21; Sat, 3 May 2014 03:49:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Sat, 03 May 2014 03:49:53 -0000 Message-Id: In-Reply-To: <94a83799902342f1b3dc3a105fe9dec5@git.apache.org> References: <94a83799902342f1b3dc3a105fe9dec5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/9] git commit: Make batchlog replica selection rack-aware patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6551 Make batchlog replica selection rack-aware patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6551 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af96d405 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af96d405 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af96d405 Branch: refs/heads/cassandra-2.0 Commit: af96d405b42a9e4ae23cba841b7a5d83ee8f7ec8 Parents: fab4557 Author: Jonathan Ellis Authored: Fri May 2 22:47:27 2014 -0500 Committer: Jonathan Ellis Committed: Fri May 2 22:47:27 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../service/BatchlogEndpointSelector.java | 110 ++++++++++++++++++ .../apache/cassandra/service/StorageProxy.java | 21 +--- .../service/BatchlogEndpointSelectorTest.java | 116 +++++++++++++++++++ 4 files changed, 232 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9ab1a5f..5799659 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,5 @@ 2.0.8 -======= + * Make batchlog replica selection rack-aware (CASSANDRA-6551) * Add Google Compute Engine snitch (CASSANDRA-7132) * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072) * Set JMX RMI port to 7199 (CASSANDRA-7087) http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java new file mode 100644 index 0000000..bf032f5 --- /dev/null +++ b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.utils.FBUtilities; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +public class BatchlogEndpointSelector +{ + private final String localRack; + + public BatchlogEndpointSelector(String localRack) + { + this.localRack = localRack; + } + + /** + * @param endpoints nodes in the local datacenter, grouped by rack name + * @return list of candidates for batchlog hosting. if possible these will be two nodes from different racks. + */ + public Collection chooseEndpoints(Multimap endpoints) + { + // strip out dead endpoints and localhost + ListMultimap validated = ArrayListMultimap.create(); + for (Map.Entry entry : endpoints.entries()) + { + if (isValid(entry.getValue())) + validated.put(entry.getKey(), entry.getValue()); + } + if (validated.size() <= 2) + return validated.values(); + + if ((validated.size() - validated.get(localRack).size()) >= 2) + { + // we have enough endpoints in other racks + validated.removeAll(localRack); + } + + if (validated.keySet().size() == 1) + { + // we have only 1 `other` rack + Collection otherRack = Iterables.getOnlyElement(validated.asMap().values()); + return Lists.newArrayList(Iterables.limit(otherRack, 2)); + } + + // randomize which racks we pick from if more than 2 remaining + Collection racks; + if (validated.keySet().size() == 2) + { + racks = validated.keySet(); + } + else + { + racks = Lists.newArrayList(validated.keySet()); + Collections.shuffle((List) racks); + } + + // grab a random member of up to two racks + List result = new ArrayList<>(2); + for (String rack : Iterables.limit(racks, 2)) + { + List rackMembers = validated.get(rack); + result.add(rackMembers.get(getRandomInt(rackMembers.size()))); + } + + return result; + } + + @VisibleForTesting + protected boolean isValid(InetAddress input) + { + return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input); + } + + @VisibleForTesting + protected int getRandomInt(int bound) + { + return FBUtilities.threadLocalRandom().nextInt(bound); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 14d5ee2..2bf8e7f 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; -import java.util.Random; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; @@ -729,24 +728,14 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException { TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); - List localEndpoints = new ArrayList<>(topology.getDatacenterEndpoints().get(localDataCenter)); - + Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); + // special case for single-node datacenters if (localEndpoints.size() == 1) - return localEndpoints; - - List chosenEndpoints = new ArrayList<>(2); - int startOffset = new Random().nextInt(localEndpoints.size()); + return localEndpoints.values(); - // starts at some random point in the list, advances forward until the end, then loops - // around to the beginning, advancing again until it is back at the starting point again. - for (int i = 0; i < localEndpoints.size() && chosenEndpoints.size() < 2; i++) - { - InetAddress endpoint = localEndpoints.get((i + startOffset) % localEndpoints.size()); - // skip localhost and non-alive nodes - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(endpoint)) - chosenEndpoints.add(endpoint); - } + String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress()); + Collection chosenEndpoints = new BatchlogEndpointSelector(localRack).chooseEndpoints(localEndpoints); if (chosenEndpoints.isEmpty()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java new file mode 100644 index 0000000..293078d --- /dev/null +++ b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; + +import org.junit.Test; +import org.junit.matchers.JUnitMatchers; + +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; + +public class BatchlogEndpointSelectorTest +{ + private final BatchlogEndpointSelector target; + private static final String LOCAL = "local"; + + + public BatchlogEndpointSelectorTest() throws UnknownHostException + { + target = new BatchlogEndpointSelector(LOCAL) + { + @Override + protected boolean isValid(InetAddress input) + { + //we will use always alive non-localhost endpoints + return true; + } + + @Override + protected int getRandomInt(int bound) + { + //we don't need a random behavior here + return bound - 1; + } + }; + } + + @Test + public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException + { + Multimap endpoints = ImmutableMultimap. builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .put("1", InetAddress.getByName("11")) + .put("2", InetAddress.getByName("2")) + .put("2", InetAddress.getByName("22")) + .build(); + Collection result = target.chooseEndpoints(endpoints); + assertThat(result.size(), is(2)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11"))); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22"))); + } + + @Test + public void shouldSelectHostFromLocal() throws UnknownHostException + { + Multimap endpoints = ImmutableMultimap. builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .build(); + Collection result = target.chooseEndpoints(endpoints); + assertThat(result.size(), is(2)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1"))); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0"))); + } + + @Test + public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException + { + Multimap endpoints = ImmutableMultimap. builder() + .put(LOCAL, InetAddress.getByName("0")) + .build(); + Collection result = target.chooseEndpoints(endpoints); + assertThat(result.size(), is(1)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0"))); + } + + @Test + public void shouldSelectTwoFirstHostsFromSingleOtherRack() throws UnknownHostException + { + Multimap endpoints = ImmutableMultimap. builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .put("1", InetAddress.getByName("11")) + .put("1", InetAddress.getByName("111")) + .build(); + Collection result = target.chooseEndpoints(endpoints); + assertThat(result.size(), is(2)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1"))); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11"))); + } +}