Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B6AFD18804 for ; Fri, 27 Nov 2015 12:58:40 +0000 (UTC) Received: (qmail 39129 invoked by uid 500); 27 Nov 2015 12:58:40 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 39086 invoked by uid 500); 27 Nov 2015 12:58:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 39063 invoked by uid 99); 27 Nov 2015 12:58:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Nov 2015 12:58:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48DB5E03C8; Fri, 27 Nov 2015 12:58:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: snazy@apache.org To: commits@cassandra.apache.org Date: Fri, 27 Nov 2015 12:58:40 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] cassandra git commit: Warn or fail when changing cluster topology live Repository: cassandra Updated Branches: refs/heads/trunk d00d091c6 -> 33e7bba12 Warn or fail when changing cluster topology live patch by Stefania; reviewed by Paulo Motta for CASSANDRA-10243 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7650fc19 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7650fc19 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7650fc19 Branch: refs/heads/trunk Commit: 7650fc196341bd673626054593f2ce6e895d7783 Parents: 8cd13f1 Author: Stefania Authored: Fri Nov 27 13:49:15 2015 +0100 Committer: Robert Stupp Committed: Fri Nov 27 13:49:15 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 6 + build.xml | 1 + src/java/org/apache/cassandra/gms/Gossiper.java | 28 +- .../locator/GossipingPropertyFileSnitch.java | 107 ++----- .../cassandra/locator/PropertyFileSnitch.java | 74 ++++- .../cassandra/locator/SnitchProperties.java | 5 + .../locator/YamlFileNetworkTopologySnitch.java | 111 +++++-- .../cassandra/service/MigrationManager.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 10 +- .../cassandra/service/StorageService.java | 32 +- .../GossipingPropertyFileSnitchTest.java | 35 +- .../locator/PropertyFileSnitchTest.java | 321 +++++++++++++++++++ .../YamlFileNetworkTopologySnitchTest.java | 293 +++++++++++++++-- 14 files changed, 823 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 111852c..a2f7b6e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Warn or fail when changing cluster topology live (CASSANDRA-10243) * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213) * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658) * DeletionTime.compareTo wrong in rare cases (CASSANDRA-10749) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 54a6b79..cae8dfb 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -32,6 +32,12 @@ New features - a new validate(key, cf) method is added to PerRowSecondaryIndex. A default implementation is provided, so no changes are required to custom implementations. +Operations +------------ + - Changing rack or dc of live nodes is no longer possible for PropertyFileSnitch + and YamlFileNetworkTopologySnitch. Reloading the configuration file of + GossipingPropertyFileSnitch has been disabled, CASSANDRA-10243. + 2.1.11 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 28a8f74..45bb13f 100644 --- a/build.xml +++ b/build.xml @@ -1528,6 +1528,7 @@ + http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 2f69d66..09851b2 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -256,24 +256,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean subscribers.remove(subscriber); } - public Set getLiveMembers() + public Set getLiveEndpoints() { - Set liveMembers = new HashSet(liveEndpoints); - if (!liveMembers.contains(FBUtilities.getBroadcastAddress())) - liveMembers.add(FBUtilities.getBroadcastAddress()); - return liveMembers; - } - - public Set getLiveTokenOwners() - { - Set tokenOwners = new HashSet(); - for (InetAddress member : getLiveMembers()) - { - EndpointState epState = endpointStateMap.get(member); - if (epState != null && !isDeadState(epState) && StorageService.instance.getTokenMetadata().isMember(member)) - tokenOwners.add(member); - } - return tokenOwners; + Set liveEndpoints = new HashSet(this.liveEndpoints); + if (!liveEndpoints.contains(FBUtilities.getBroadcastAddress())) + liveEndpoints.add(FBUtilities.getBroadcastAddress()); + return liveEndpoints; } /** @@ -981,7 +969,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean MessagingService.instance().sendRR(echoMessage, addr, echoHandler); } - private void realMarkAlive(final InetAddress addr, final EndpointState localState) + @VisibleForTesting + public void realMarkAlive(final InetAddress addr, final EndpointState localState) { if (logger.isTraceEnabled()) logger.trace("marking as alive {}", addr); @@ -998,7 +987,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean logger.trace("Notified " + subscribers); } - private void markDead(InetAddress addr, EndpointState localState) + @VisibleForTesting + public void markDead(InetAddress addr, EndpointState localState) { if (logger.isTraceEnabled()) logger.trace("marking as down {}", addr); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java index f3f38a0..e2449ae 100644 --- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java +++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,8 +32,6 @@ import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.ResourceWatcher; -import org.apache.cassandra.utils.WrappedRunnable; public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// implements IEndpointStateChangeSubscriber @@ -42,28 +40,23 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// private PropertyFileSnitch psnitch; - private volatile String myDC; - private volatile String myRack; - private volatile boolean preferLocal; - private AtomicReference snitchHelperReference; - private volatile boolean gossipStarted; + private final String myDC; + private final String myRack; + private final boolean preferLocal; + private final AtomicReference snitchHelperReference; private Map> savedEndpoints; private static final String DEFAULT_DC = "UNKNOWN_DC"; private static final String DEFAULT_RACK = "UNKNOWN_RACK"; - private static final int DEFAULT_REFRESH_PERIOD_IN_SECONDS = 60; - public GossipingPropertyFileSnitch() throws ConfigurationException { - this(DEFAULT_REFRESH_PERIOD_IN_SECONDS); - } - - public GossipingPropertyFileSnitch(int refreshPeriodInSeconds) throws ConfigurationException - { - snitchHelperReference = new AtomicReference(); + SnitchProperties properties = loadConfiguration(); - reloadConfiguration(false); + myDC = properties.get("dc", DEFAULT_DC).trim(); + myRack = properties.get("rack", DEFAULT_RACK).trim(); + preferLocal = Boolean.parseBoolean(properties.get("prefer_local", "false")); + snitchHelperReference = new AtomicReference<>(); try { @@ -74,23 +67,15 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// { logger.info("Unable to load {}; compatibility mode disabled", PropertyFileSnitch.SNITCH_PROPERTIES_FILENAME); } + } - try - { - FBUtilities.resourceToFile(SnitchProperties.RACKDC_PROPERTY_FILENAME); - Runnable runnable = new WrappedRunnable() - { - protected void runMayThrow() throws ConfigurationException - { - reloadConfiguration(true); - } - }; - ResourceWatcher.watch(SnitchProperties.RACKDC_PROPERTY_FILENAME, runnable, refreshPeriodInSeconds * 1000); - } - catch (ConfigurationException ex) - { - logger.error("{} found, but does not look like a plain file. Will not watch it for changes", SnitchProperties.RACKDC_PROPERTY_FILENAME); - } + private static SnitchProperties loadConfiguration() throws ConfigurationException + { + final SnitchProperties properties = new SnitchProperties(); + if (!properties.contains("dc") || !properties.contains("rack")) + throw new ConfigurationException("DC or rack not found in snitch properties, check your configuration in: " + SnitchProperties.RACKDC_PROPERTY_FILENAME); + + return properties; } /** @@ -156,56 +141,18 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress())); - reloadGossiperState(); - - gossipStarted = true; + loadGossiperState(); } - - private void reloadConfiguration(boolean isUpdate) throws ConfigurationException - { - final SnitchProperties properties = new SnitchProperties(); - - String newDc = properties.get("dc", null); - String newRack = properties.get("rack", null); - if (newDc == null || newRack == null) - throw new ConfigurationException("DC or rack not found in snitch properties, check your configuration in: " + SnitchProperties.RACKDC_PROPERTY_FILENAME); - - newDc = newDc.trim(); - newRack = newRack.trim(); - final boolean newPreferLocal = Boolean.parseBoolean(properties.get("prefer_local", "false")); - if (!newDc.equals(myDC) || !newRack.equals(myRack) || (preferLocal != newPreferLocal)) - { - myDC = newDc; - myRack = newRack; - preferLocal = newPreferLocal; - - reloadGossiperState(); - - if (StorageService.instance != null) - { - if (isUpdate) - StorageService.instance.updateTopology(FBUtilities.getBroadcastAddress()); - else - StorageService.instance.getTokenMetadata().invalidateCachedRings(); - } + private void loadGossiperState() + { + assert Gossiper.instance != null; - if (gossipStarted) - StorageService.instance.gossipSnitchInfo(); - } - } + ReconnectableSnitchHelper pendingHelper = new ReconnectableSnitchHelper(this, myDC, preferLocal); + Gossiper.instance.register(pendingHelper); - private void reloadGossiperState() - { - if (Gossiper.instance != null) - { - ReconnectableSnitchHelper pendingHelper = new ReconnectableSnitchHelper(this, myDC, preferLocal); - Gossiper.instance.register(pendingHelper); - - pendingHelper = snitchHelperReference.getAndSet(pendingHelper); - if (pendingHelper != null) - Gossiper.instance.unregister(pendingHelper); - } - // else this will eventually rerun at gossiperStarting() + pendingHelper = snitchHelperReference.getAndSet(pendingHelper); + if (pendingHelper != null) + Gossiper.instance.unregister(pendingHelper); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java index f293081..6115572 100644 --- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java +++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java @@ -23,12 +23,16 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Properties; +import java.util.Set; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; @@ -51,6 +55,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch private static final Logger logger = LoggerFactory.getLogger(PropertyFileSnitch.class); public static final String SNITCH_PROPERTIES_FILENAME = "cassandra-topology.properties"; + private static final int DEFAULT_REFRESH_PERIOD_IN_SECONDS = 5; private static volatile Map endpointMap; private static volatile String[] defaultDCRack; @@ -59,6 +64,11 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch public PropertyFileSnitch() throws ConfigurationException { + this(DEFAULT_REFRESH_PERIOD_IN_SECONDS); + } + + public PropertyFileSnitch(int refreshPeriodInSeconds) throws ConfigurationException + { reloadConfiguration(false); try @@ -71,7 +81,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch reloadConfiguration(true); } }; - ResourceWatcher.watch(SNITCH_PROPERTIES_FILENAME, runnable, 60 * 1000); + ResourceWatcher.watch(SNITCH_PROPERTIES_FILENAME, runnable, refreshPeriodInSeconds * 1000); } catch (ConfigurationException ex) { @@ -85,7 +95,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch * @param endpoint endpoint to process * @return a array of string with the first index being the data center and the second being the rack */ - public String[] getEndpointInfo(InetAddress endpoint) + public static String[] getEndpointInfo(InetAddress endpoint) { String[] rawEndpointInfo = getRawEndpointInfo(endpoint); if (rawEndpointInfo == null) @@ -93,7 +103,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch return rawEndpointInfo; } - private String[] getRawEndpointInfo(InetAddress endpoint) + private static String[] getRawEndpointInfo(InetAddress endpoint) { String[] value = endpointMap.get(endpoint); if (value == null) @@ -132,7 +142,8 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch public void reloadConfiguration(boolean isUpdate) throws ConfigurationException { - HashMap reloadedMap = new HashMap(); + HashMap reloadedMap = new HashMap<>(); + String[] reloadedDefaultDCRack = null; Properties properties = new Properties(); InputStream stream = null; @@ -155,18 +166,18 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch String key = (String) entry.getKey(); String value = (String) entry.getValue(); - if (key.equals("default")) + if ("default".equals(key)) { String[] newDefault = value.split(":"); if (newDefault.length < 2) - defaultDCRack = new String[] { "default", "default" }; + reloadedDefaultDCRack = new String[] { "default", "default" }; else - defaultDCRack = new String[] { newDefault[0].trim(), newDefault[1].trim() }; + reloadedDefaultDCRack = new String[] { newDefault[0].trim(), newDefault[1].trim() }; } else { InetAddress host; - String hostString = key.replace("/", ""); + String hostString = StringUtils.remove(key, '/'); try { host = InetAddress.getByName(hostString); @@ -183,18 +194,24 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch reloadedMap.put(host, token); } } - if (defaultDCRack == null && !reloadedMap.containsKey(FBUtilities.getBroadcastAddress())) - throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for this node's broadcast address %s, nor does it provides a default", + if (reloadedDefaultDCRack == null && !reloadedMap.containsKey(FBUtilities.getBroadcastAddress())) + throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for " + + "this node's broadcast address %s, nor does it provides a default", SNITCH_PROPERTIES_FILENAME, FBUtilities.getBroadcastAddress())); + if (isUpdate && !livenessCheck(reloadedMap, reloadedDefaultDCRack)) + return; + if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); for (Map.Entry entry : reloadedMap.entrySet()) - sb.append(entry.getKey()).append(":").append(Arrays.toString(entry.getValue())).append(", "); + sb.append(entry.getKey()).append(':').append(Arrays.toString(entry.getValue())).append(", "); logger.debug("Loaded network topology from property file: {}", StringUtils.removeEnd(sb.toString(), ", ")); } + + defaultDCRack = reloadedDefaultDCRack; endpointMap = reloadedMap; if (StorageService.instance != null) // null check tolerates circular dependency; see CASSANDRA-4145 { @@ -208,6 +225,41 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch StorageService.instance.gossipSnitchInfo(); } + /** + * We cannot update rack or data-center for a live node, see CASSANDRA-10243. + * + * @param reloadedMap - the new map of hosts to dc:rack properties + * @param reloadedDefaultDCRack - the default dc:rack or null if no default + * @return true if we can continue updating (no live host had dc or rack updated) + */ + private static boolean livenessCheck(HashMap reloadedMap, String[] reloadedDefaultDCRack) + { + // If the default has changed we must check all live hosts but hopefully we will find a live + // host quickly and interrupt the loop. Otherwise we only check the live hosts that were either + // in the old set or in the new set + Set hosts = Arrays.equals(defaultDCRack, reloadedDefaultDCRack) + ? Sets.intersection(StorageService.instance.getLiveMembers(), // same default + Sets.union(endpointMap.keySet(), reloadedMap.keySet())) + : StorageService.instance.getLiveMembers(); // default updated + + for (InetAddress host : hosts) + { + String[] origValue = endpointMap.containsKey(host) ? endpointMap.get(host) : defaultDCRack; + String[] updateValue = reloadedMap.containsKey(host) ? reloadedMap.get(host) : reloadedDefaultDCRack; + + if (!Arrays.equals(origValue, updateValue)) + { + logger.error("Cannot update data center or rack from {} to {} for live host {}, property file NOT RELOADED", + origValue, + updateValue, + host); + return false; + } + } + + return true; + } + @Override public void gossiperStarting() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/locator/SnitchProperties.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SnitchProperties.java b/src/java/org/apache/cassandra/locator/SnitchProperties.java index be89fcf..8fdda7a 100644 --- a/src/java/org/apache/cassandra/locator/SnitchProperties.java +++ b/src/java/org/apache/cassandra/locator/SnitchProperties.java @@ -66,4 +66,9 @@ public class SnitchProperties { return properties.getProperty(propertyName, defaultValue); } + + public boolean contains(String propertyName) + { + return properties.containsKey(propertyName); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java index 4139662..870eea8 100644 --- a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java +++ b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; @@ -40,7 +41,9 @@ import org.yaml.snakeyaml.TypeDescription; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.collect.Sets; import com.google.common.net.InetAddresses; /** @@ -60,10 +63,10 @@ public class YamlFileNetworkTopologySnitch /** * How often to check the topology configuration file, in milliseconds; defaults to one minute. */ - private static final int CHECK_PERIOD_IN_MS = 60 * 1000; + private static final int CHECK_PERIOD_IN_MS = 5 * 1000; /** Default name for the topology configuration file. */ - private static final String DEFAULT_TOPOLOGY_CONFIG_FILENAME = "cassandra-topology.yaml"; + static final String DEFAULT_TOPOLOGY_CONFIG_FILENAME = "cassandra-topology.yaml"; /** Node data map, keyed by broadcast address. */ private volatile Map nodeDataMap; @@ -196,44 +199,29 @@ public class YamlFileNetworkTopologySnitch public String dc_local_address; } - /** - * Loads the topology configuration file. - * - * @throws ConfigurationException - * on failure - */ - private synchronized void loadTopologyConfiguration(boolean isUpdate) - throws ConfigurationException + public TopologyConfig readConfig() throws ConfigurationException { - logger.debug("Loading topology configuration from {}", - topologyConfigFilename); + final TypeDescription topologyConfigTypeDescription = new TypeDescription(TopologyConfig.class); + topologyConfigTypeDescription.putListPropertyType("topology", Datacenter.class); - final TypeDescription topologyConfigTypeDescription = new TypeDescription( - TopologyConfig.class); - topologyConfigTypeDescription.putListPropertyType("topology", - Datacenter.class); - - final TypeDescription topologyTypeDescription = new TypeDescription( - Datacenter.class); + final TypeDescription topologyTypeDescription = new TypeDescription(Datacenter.class); topologyTypeDescription.putListPropertyType("racks", Rack.class); - final TypeDescription rackTypeDescription = new TypeDescription( - Rack.class); + final TypeDescription rackTypeDescription = new TypeDescription(Rack.class); rackTypeDescription.putListPropertyType("nodes", Node.class); - final Constructor configConstructor = new Constructor( - TopologyConfig.class); + final Constructor configConstructor = new Constructor(TopologyConfig.class); configConstructor.addTypeDescription(topologyConfigTypeDescription); configConstructor.addTypeDescription(topologyTypeDescription); configConstructor.addTypeDescription(rackTypeDescription); final InputStream configFileInputStream = getClass().getClassLoader() - .getResourceAsStream(topologyConfigFilename); + .getResourceAsStream(topologyConfigFilename); if (configFileInputStream == null) { throw new ConfigurationException( - "Could not read topology config file " - + topologyConfigFilename); + "Could not read topology config file " + + topologyConfigFilename); } Yaml yaml; TopologyConfig topologyConfig; @@ -246,7 +234,30 @@ public class YamlFileNetworkTopologySnitch { FileUtils.closeQuietly(configFileInputStream); } - final Map nodeDataMap = new HashMap(); + + return topologyConfig; + } + + /** + * Loads the topology configuration file. + * + * @throws ConfigurationException + * on failure + */ + private synchronized void loadTopologyConfiguration(boolean isUpdate) + throws ConfigurationException + { + logger.debug("Loading topology configuration from {}", + topologyConfigFilename); + + loadTopologyConfiguration(isUpdate, readConfig()); + } + + @VisibleForTesting + synchronized void loadTopologyConfiguration(boolean isUpdate, TopologyConfig topologyConfig) + throws ConfigurationException + { + final Map nodeDataMap = new HashMap<>(); if (topologyConfig.topology == null) { @@ -336,6 +347,9 @@ public class YamlFileNetworkTopologySnitch defaultNodeData.datacenter = topologyConfig.default_dc_name; defaultNodeData.rack = topologyConfig.default_rack_name; + if (isUpdate && !livenessCheck(nodeDataMap, defaultNodeData)) + return; + // YAML configuration looks good; now make the changes this.nodeDataMap = nodeDataMap; @@ -360,6 +374,41 @@ public class YamlFileNetworkTopologySnitch } /** + * We cannot update rack or data-center for a live node, see CASSANDRA-10243. + * + * @param reloadedMap - the new map of hosts to NodeData + * @param reloadedDefaultData - the default NodeData + * @return true if we can continue updating (no live host had dc or rack updated) + */ + private boolean livenessCheck(Map reloadedMap, NodeData reloadedDefaultData) + { + // If the default has changed we must check all live hosts but hopefully we will find a live + // host quickly and interrupt the loop. Otherwise we only check the live hosts that were either + // in the old set or in the new set + Set hosts = NodeData.isSameDcRack(defaultNodeData, reloadedDefaultData) + ? Sets.intersection(StorageService.instance.getLiveMembers(), // same default + Sets.union(nodeDataMap.keySet(), reloadedMap.keySet())) + : StorageService.instance.getLiveMembers(); // default updated + + for (InetAddress host : hosts) + { + NodeData origValue = nodeDataMap.containsKey(host) ? nodeDataMap.get(host) : defaultNodeData; + NodeData updateValue = reloadedMap.containsKey(host) ? reloadedMap.get(host) : reloadedDefaultData; + + if (!NodeData.isSameDcRack(origValue, updateValue)) + { + logger.error("Cannot update data center or rack from {} to {} for live host {}, property file NOT RELOADED", + new String[] { origValue.datacenter, origValue.rack }, // same format as error in PropertyFileSnitch, + new String[] { updateValue.datacenter, updateValue.rack }, + host); + return false; + } + } + + return true; + } + + /** * be careful about just blindly updating ApplicationState.INTERNAL_IP everytime we read the yaml file, * as that can cause connections to get unnecessarily reset (via IESCS.onChange()). */ @@ -382,7 +431,7 @@ public class YamlFileNetworkTopologySnitch /** * Topology data for a node. */ - private class NodeData + private static final class NodeData { /** Data center name. */ public String datacenter; @@ -402,6 +451,12 @@ public class YamlFileNetworkTopologySnitch .add("rack", rack).add("dcLocalAddress", dcLocalAddress) .toString(); } + + public static boolean isSameDcRack(NodeData a, NodeData b) + { + return a == b || + (a != null && Objects.equal(a.datacenter, b.datacenter) && Objects.equal(a.rack, b.rack)); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index bebfa43..3539602 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -395,7 +395,7 @@ public class MigrationManager } }); - for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) + for (InetAddress endpoint : Gossiper.instance.getLiveEndpoints()) { // only push schema to nodes with known and equal versions if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && @@ -439,7 +439,7 @@ public class MigrationManager Schema.instance.clear(); - Set liveEndpoints = Gossiper.instance.getLiveMembers(); + Set liveEndpoints = Gossiper.instance.getLiveEndpoints(); liveEndpoints.remove(FBUtilities.getBroadcastAddress()); // force migration if there are nodes around http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index b701015..0e90ea6 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1863,7 +1863,7 @@ public class StorageProxy implements StorageProxyMBean { final String myVersion = Schema.instance.getVersion().toString(); final Map versions = new ConcurrentHashMap(); - final Set liveHosts = Gossiper.instance.getLiveMembers(); + final Set liveHosts = Gossiper.instance.getLiveEndpoints(); final CountDownLatch latch = new CountDownLatch(liveHosts.size()); IAsyncCallback cb = new IAsyncCallback() @@ -1897,7 +1897,7 @@ public class StorageProxy implements StorageProxyMBean // maps versions to hosts that are on that version. Map> results = new HashMap>(); - Iterable allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); + Iterable allHosts = Iterables.concat(Gossiper.instance.getLiveEndpoints(), Gossiper.instance.getUnreachableMembers()); for (InetAddress host : allHosts) { UUID version = versions.get(host); @@ -2125,12 +2125,12 @@ public class StorageProxy implements StorageProxyMBean // Since the truncate operation is so aggressive and is typically only // invoked by an admin, for simplicity we require that all nodes are up // to perform the operation. - int liveMembers = Gossiper.instance.getLiveMembers().size(); + int liveMembers = Gossiper.instance.getLiveEndpoints().size(); throw new UnavailableException(ConsistencyLevel.ALL, liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers); } - Set allEndpoints = Gossiper.instance.getLiveTokenOwners(); - + Set allEndpoints = StorageService.instance.getLiveMembers(true); + int blockFor = allEndpoints.size(); final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 03c1960..5503123 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.service; -import static java.nio.charset.StandardCharsets.ISO_8859_1; - import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.File; @@ -574,7 +572,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE while (true) { Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - for (InetAddress address : Gossiper.instance.getLiveMembers()) + for (InetAddress address : Gossiper.instance.getLiveEndpoints()) { if (!Gossiper.instance.isFatClient(address)) break outer; @@ -2303,9 +2301,33 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public List getLiveNodes() { - return stringify(Gossiper.instance.getLiveMembers()); + return stringify(Gossiper.instance.getLiveEndpoints()); + } + + public Set getLiveMembers() + { + return getLiveMembers(false); + } + + public Set getLiveMembers(boolean excludeDeadStates) + { + Set ret = new HashSet<>(); + for (InetAddress ep : Gossiper.instance.getLiveEndpoints()) + { + if (excludeDeadStates) + { + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep); + if (epState == null || Gossiper.instance.isDeadState(epState)) + continue; + } + + if (tokenMetadata.isMember(ep)) + ret.add(ep); + } + return ret; } + public List getUnreachableNodes() { return stringify(Gossiper.instance.getUnreachableMembers()); @@ -3746,7 +3768,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (endpoint.equals(myAddress)) throw new UnsupportedOperationException("Cannot remove self"); - if (Gossiper.instance.getLiveMembers().contains(endpoint)) + if (Gossiper.instance.getLiveEndpoints().contains(endpoint)) throw new UnsupportedOperationException("Node " + endpoint + " is alive and owns this ID. Use decommission command to remove it from the ring"); // A leaving endpoint that is dead is already being removed. http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java b/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java index 9026ebf..80d4559 100644 --- a/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java @@ -17,12 +17,9 @@ */ package org.apache.cassandra.locator; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import org.junit.Test; import org.apache.cassandra.utils.FBUtilities; -import org.junit.Test; /** * Unit tests for {@link GossipingPropertyFileSnitch}. @@ -30,30 +27,12 @@ import org.junit.Test; public class GossipingPropertyFileSnitchTest { @Test - public void testAutoReloadConfig() throws Exception + public void testLoadConfig() throws Exception { - String confFile = FBUtilities.resourceToFile(SnitchProperties.RACKDC_PROPERTY_FILENAME); - - final GossipingPropertyFileSnitch snitch = new GossipingPropertyFileSnitch(/*refreshPeriodInSeconds*/1); - YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, FBUtilities.getBroadcastAddress().getHostAddress(), "DC1", "RAC1"); - - final Path effectiveFile = Paths.get(confFile); - final Path backupFile = Paths.get(confFile + ".bak"); - final Path modifiedFile = Paths.get(confFile + ".mod"); - - try - { - Files.copy(effectiveFile, backupFile); - Files.copy(modifiedFile, effectiveFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING); - - Thread.sleep(1500); - - YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, FBUtilities.getBroadcastAddress().getHostAddress(), "DC2", "RAC2"); - } - finally - { - Files.copy(backupFile, effectiveFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING); - Files.delete(backupFile); - } + final GossipingPropertyFileSnitch snitch = new GossipingPropertyFileSnitch(); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, + FBUtilities.getBroadcastAddress().getHostAddress(), + "DC1", + "RAC1"); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java new file mode 100644 index 0000000..24b8c77 --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.locator; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for {@link PropertyFileSnitch}. + */ +public class PropertyFileSnitchTest +{ + private Path effectiveFile; + private Path backupFile; + + private VersionedValue.VersionedValueFactory valueFactory; + private Map> tokenMap; + + @Before + public void setup() throws ConfigurationException, IOException + { + String confFile = FBUtilities.resourceToFile(PropertyFileSnitch.SNITCH_PROPERTIES_FILENAME); + effectiveFile = Paths.get(confFile); + backupFile = Paths.get(confFile + ".bak"); + + restoreOrigConfigFile(); + + InetAddress[] hosts = { + InetAddress.getByName("127.0.0.1"), // this exists in the config file + InetAddress.getByName("127.0.0.2"), // this exists in the config file + InetAddress.getByName("127.0.0.9"), // this does not exist in the config file + }; + + IPartitioner partitioner = new RandomPartitioner(); + valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + tokenMap = new HashMap<>(); + + for (InetAddress host : hosts) + { + Set tokens = Collections.singleton(partitioner.getRandomToken()); + Gossiper.instance.initializeNodeUnsafe(host, UUID.randomUUID(), 1); + Gossiper.instance.injectApplicationState(host, ApplicationState.TOKENS, valueFactory.tokens(tokens)); + + setNodeShutdown(host); + tokenMap.put(host, tokens); + } + } + + private void restoreOrigConfigFile() throws IOException + { + if (Files.exists(backupFile)) + { + Files.copy(backupFile, effectiveFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + Files.delete(backupFile); + } + } + + private void replaceConfigFile(Map replacements) throws IOException + { + List lines = Files.readAllLines(effectiveFile, StandardCharsets.UTF_8); + List newLines = new ArrayList<>(lines.size()); + Set replaced = new HashSet<>(); + + for (String line : lines) + { + String[] info = line.split("="); + if (info.length == 2 && replacements.containsKey(info[0])) + { + String replacement = replacements.get(info[0]); + if (!replacement.isEmpty()) // empty means remove this line + newLines.add(info[0] + '=' + replacement); + + replaced.add(info[0]); + } + else + { + newLines.add(line); + } + } + + // add any new lines that were not replaced + for (Map.Entry replacement : replacements.entrySet()) + { + if (replaced.contains(replacement.getKey())) + continue; + + if (!replacement.getValue().isEmpty()) // empty means remove this line so do nothing here + newLines.add(replacement.getKey() + '=' + replacement.getValue()); + } + + Files.write(effectiveFile, newLines, StandardCharsets.UTF_8, StandardOpenOption.TRUNCATE_EXISTING); + } + + private void setNodeShutdown(InetAddress host) + { + StorageService.instance.getTokenMetadata().removeEndpoint(host); + Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.shutdown(true)); + Gossiper.instance.markDead(host, Gossiper.instance.getEndpointStateForEndpoint(host)); + } + + private void setNodeLive(InetAddress host) + { + Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.normal(tokenMap.get(host))); + Gossiper.instance.realMarkAlive(host, Gossiper.instance.getEndpointStateForEndpoint(host)); + StorageService.instance.getTokenMetadata().updateNormalTokens(tokenMap.get(host), host); + } + + /** + * Test that changing rack for a host in the configuration file is only effective if the host is not live. + * The original configuration file contains: 127.0.0.1=DC1:RAC1 + */ + @Test + public void testChangeHostRack() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.1"); + final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + + try + { + setNodeLive(host); + + Files.copy(effectiveFile, backupFile); + replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2")); + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + + setNodeShutdown(host); + replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2")); + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); + } + finally + { + restoreOrigConfigFile(); + setNodeShutdown(host); + } + } + + /** + * Test that changing dc for a host in the configuration file is only effective if the host is not live. + * The original configuration file contains: 127.0.0.1=DC1:RAC1 + */ + @Test + public void testChangeHostDc() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.1"); + final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + + try + { + setNodeLive(host); + + Files.copy(effectiveFile, backupFile); + replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1")); + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + + setNodeShutdown(host); + replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1")); + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC1"); + } + finally + { + restoreOrigConfigFile(); + setNodeShutdown(host); + } + } + + /** + * Test that adding a host to the configuration file changes the host dc and rack only if the host + * is not live. The original configuration file does not contain 127.0.0.9 and so it should use + * the default default=DC1:r1. + */ + @Test + public void testAddHost() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.9"); + final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + + try + { + setNodeLive(host); + + Files.copy(effectiveFile, backupFile); + replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged + + setNodeShutdown(host); + replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC2"); // changed + } + finally + { + restoreOrigConfigFile(); + setNodeShutdown(host); + } + } + + /** + * Test that removing a host from the configuration file changes the host rack only if the host + * is not live. The original configuration file contains 127.0.0.2=DC1:RAC2 and default=DC1:r1 so removing + * this host should result in a different rack if the host is not live. + */ + @Test + public void testRemoveHost() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.2"); + final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); + + try + { + setNodeLive(host); + + Files.copy(effectiveFile, backupFile); + replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); // unchanged + + setNodeShutdown(host); + replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + } + finally + { + restoreOrigConfigFile(); + setNodeShutdown(host); + } + } + + /** + * Test that we can change the default only if this does not result in any live node changing dc or rack. + * The configuration file contains default=DC1:r1 and we change it to default=DC2:r2. Let's use host 127.0.0.9 + * since it is not in the configuration file. + */ + @Test + public void testChangeDefault() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.9"); + final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + + try + { + setNodeLive(host); + + Files.copy(effectiveFile, backupFile); + replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged + + setNodeShutdown(host); + replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default again (refresh file update) + + Thread.sleep(1500); + YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC2", "r2"); // default updated + } + finally + { + restoreOrigConfigFile(); + setNodeShutdown(host); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java b/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java index af1a7e9..9507f25 100644 --- a/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java @@ -17,11 +17,28 @@ */ package org.apache.cassandra.locator; +import java.io.IOException; import java.net.InetAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.locator.YamlFileNetworkTopologySnitch.*; + import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import com.google.common.net.InetAddresses; @@ -31,54 +48,58 @@ import com.google.common.net.InetAddresses; */ public class YamlFileNetworkTopologySnitchTest { + private String confFile; - /** - * Testing variant of {@link YamlFileNetworkTopologySnitch}. - * - */ - private class TestYamlFileNetworkTopologySnitch - extends YamlFileNetworkTopologySnitch + private VersionedValue.VersionedValueFactory valueFactory; + private Map> tokenMap; + + @Before + public void setup() throws ConfigurationException, IOException { + confFile = YamlFileNetworkTopologySnitch.DEFAULT_TOPOLOGY_CONFIG_FILENAME; + + InetAddress[] hosts = { + InetAddress.getByName("127.0.0.1"), // this exists in the config file + InetAddress.getByName("127.0.0.2"), // this exists in the config file + InetAddress.getByName("127.0.0.9"), // this does not exist in the config file + }; - /** - * Constructor. - * - * @throws ConfigurationException - * on configuration error - */ - public TestYamlFileNetworkTopologySnitch( - final String topologyConfigFilename) - throws ConfigurationException + IPartitioner partitioner = new RandomPartitioner(); + valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + tokenMap = new HashMap<>(); + + for (InetAddress host : hosts) { - super(topologyConfigFilename); + Set tokens = Collections.singleton(partitioner.getRandomToken()); + Gossiper.instance.initializeNodeUnsafe(host, UUID.randomUUID(), 1); + Gossiper.instance.injectApplicationState(host, ApplicationState.TOKENS, valueFactory.tokens(tokens)); + + setNodeShutdown(host); + tokenMap.put(host, tokens); } } /** * A basic test case. - * - * @throws Exception - * on failure + * + * @throws Exception on failure */ @Test public void testBasic() throws Exception { - final TestYamlFileNetworkTopologySnitch snitch = new TestYamlFileNetworkTopologySnitch( - "cassandra-topology.yaml"); - checkEndpoint(snitch, FBUtilities.getBroadcastAddress() - .getHostAddress(), "DC1", "RAC1"); + final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile); + checkEndpoint(snitch, FBUtilities.getBroadcastAddress().getHostAddress(), "DC1", "RAC1"); checkEndpoint(snitch, "192.168.1.100", "DC1", "RAC1"); checkEndpoint(snitch, "10.0.0.12", "DC1", "RAC2"); checkEndpoint(snitch, "127.0.0.3", "DC1", "RAC3"); checkEndpoint(snitch, "10.20.114.10", "DC2", "RAC1"); checkEndpoint(snitch, "127.0.0.8", "DC3", "RAC8"); checkEndpoint(snitch, "6.6.6.6", "DC1", "r1"); - } /** * Asserts that a snitch's determination of data center and rack for an endpoint match what we expect. - * + * * @param snitch * snitch * @param endpointString @@ -89,12 +110,230 @@ public class YamlFileNetworkTopologySnitchTest * expected rack */ public static void checkEndpoint(final AbstractNetworkTopologySnitch snitch, - final String endpointString, final String expectedDatacenter, - final String expectedRack) + final String endpointString, final String expectedDatacenter, + final String expectedRack) { final InetAddress endpoint = InetAddresses.forString(endpointString); Assert.assertEquals(expectedDatacenter, snitch.getDatacenter(endpoint)); Assert.assertEquals(expectedRack, snitch.getRack(endpoint)); } + + private static TopologyConfig moveNode(TopologyConfig topologyConfig, + String broadcastAddress, String dcLocalAddress, + String oldDC, String newDC, + String oldRack, String newRack) + { + + for (Datacenter dc : topologyConfig.topology) + { + if (oldDC != null && oldRack != null) + { + if (dc.dc_name.equals(oldDC)) + { + for (Rack rack : dc.racks) + { + if (rack.rack_name.equals(oldRack)) + { + for (Node node : rack.nodes) + { + if (node.broadcast_address.equals(broadcastAddress)) + { + rack.nodes.remove(node); + break; + } + } + } + } + } + } + + if (newDC != null && newRack != null) + { + if (dc.dc_name.equals(newDC)) + { + for (Rack rack : dc.racks) + { + if (rack.rack_name.equals(newRack)) + { + Node node = new Node(); + node.broadcast_address = broadcastAddress; + node.dc_local_address = dcLocalAddress; + rack.nodes.add(node); + } + } + } + } + } + + return topologyConfig; + } + + private void setNodeShutdown(InetAddress host) + { + StorageService.instance.getTokenMetadata().removeEndpoint(host); + Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.shutdown(true)); + Gossiper.instance.markDead(host, Gossiper.instance.getEndpointStateForEndpoint(host)); + } + + private void setNodeLive(InetAddress host) + { + Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.normal(tokenMap.get(host))); + Gossiper.instance.realMarkAlive(host, Gossiper.instance.getEndpointStateForEndpoint(host)); + StorageService.instance.getTokenMetadata().updateNormalTokens(tokenMap.get(host), host); + } + + /** + * Test that changing rack for a host in the configuration file is only effective if the host is not live. + * The original configuration file contains DC1, RAC1 for broadcast address 127.0.0.1 and dc_local_address 9.0.0.1. + */ + @Test + public void testChangeHostRack() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.1"); + final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + + try + { + final TopologyConfig topologyConfig = snitch.readConfig(); + moveNode(topologyConfig, host.getHostAddress(), "9.0.0.1", "DC1", "DC1", "RAC1", "RAC2"); + + setNodeLive(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + + setNodeShutdown(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); + } + finally + { + setNodeShutdown(host); + } + } + + /** + * Test that changing dc for a host in the configuration file is only effective if the host is not live. + * The original configuration file contains DC1, RAC1 for broadcast address 127.0.0.1 and dc_local_address 9.0.0.1. + */ + @Test + public void testChangeHostDc() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.1"); + final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + + try + { + final TopologyConfig topologyConfig = snitch.readConfig(); + moveNode(topologyConfig, host.getHostAddress(), "9.0.0.1", "DC1", "DC2", "RAC1", "RAC1"); + + setNodeLive(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + + setNodeShutdown(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC1"); + } + finally + { + setNodeShutdown(host); + } + } + + /** + * Test that adding a host to the configuration file changes the host dc and rack only if the host + * is not live. The original configuration file does not contain 127.0.0.9 and so it should use + * the default data center DC1 and rack r1. + */ + @Test + public void testAddHost() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.9"); + final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + + try + { + final TopologyConfig topologyConfig = snitch.readConfig(); + moveNode(topologyConfig, host.getHostAddress(), "9.0.0.9", null, "DC2", null, "RAC2"); + + setNodeLive(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged + + setNodeShutdown(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC2"); // changed + } + finally + { + setNodeShutdown(host); + } + } + + /** + * Test that removing a host from the configuration file changes the host rack only if the host + * is not live. The original configuration file contains 127.0.0.2 in DC1, RAC2 and default DC1, r1 so removing + * this host should result in a different rack if the host is not live. + */ + @Test + public void testRemoveHost() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.2"); + final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); + + try + { + final TopologyConfig topologyConfig = snitch.readConfig(); + moveNode(topologyConfig, host.getHostAddress(), "9.0.0.2", "DC1", null, "RAC2", null); + + setNodeLive(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); // unchanged + + setNodeShutdown(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + } + finally + { + setNodeShutdown(host); + } + } + + /** + * Test that we can change the default only if this does not result in any live node changing dc or rack. + * The configuration file contains default DC1 and r1 and we change it to DC2 and r2. Let's use host 127.0.0.9 + * since it is not in the configuration file. + */ + @Test + public void testChangeDefault() throws Exception + { + final InetAddress host = InetAddress.getByName("127.0.0.9"); + final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + + try + { + final TopologyConfig topologyConfig = snitch.readConfig(); + topologyConfig.default_dc_name = "DC2"; + topologyConfig.default_rack_name = "r2"; + + setNodeLive(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged + + setNodeShutdown(host); + snitch.loadTopologyConfiguration(true, topologyConfig); + checkEndpoint(snitch, host.getHostAddress(), "DC2", "r2"); // default updated + } + finally + { + setNodeShutdown(host); + } + } }