cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] cassandra git commit: Fix consolidating racks violating the RF contract
Date Wed, 16 Sep 2015 10:44:46 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 b8b4eb76c -> 4b1d59e13


Fix consolidating racks violating the RF contract

patch by Stefania Alborghetti; reviewed by Blake Eggleston for
CASSANDRA-10238


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/257cdaa0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/257cdaa0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/257cdaa0

Branch: refs/heads/cassandra-2.1
Commit: 257cdaa08dc12f747a25c03b9b0ad3ffc76ace9b
Parents: 0bb32f0
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Fri Sep 11 16:31:40 2015 +0800
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Sep 16 11:40:31 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/locator/PropertyFileSnitch.java   |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 142 +++++++++----
 .../locator/YamlFileNetworkTopologySnitch.java  |   1 +
 .../cassandra/service/StorageService.java       |  16 ++
 .../cassandra/locator/TokenMetadataTest.java    | 209 ++++++++++++++++++-
 6 files changed, 332 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d4cc15f..3c47427 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.17
+ * Fix consolidating racks violating the RF contract (CASSANDRA-10238)
  * Disallow decommission when node is in drained state (CASSANDRA-8741)
  * Backport CASSANDRA-8013 to 2.0 (CASSANDRA-10144)
  * Make getFullyExpiredSSTables less expensive (CASSANDRA-9882)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 4f822c6..745eeb8 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -69,6 +69,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
                 protected void runMayThrow() throws ConfigurationException
                 {
                     reloadConfiguration();
+                    StorageService.instance.updateTopology();
                 }
             };
             ResourceWatcher.watch(SNITCH_PROPERTIES_FILENAME, runnable, 60 * 1000);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index a673c94..b1b25e8 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -78,14 +78,14 @@ public class TokenMetadata
     // Finally, note that recording the tokens of joining nodes in bootstrapTokens also
     // means we can detect and reject the addition of multiple nodes at the same token
     // before one becomes part of the ring.
-    private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<Token,
InetAddress>();
+    private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
     // (don't need to record Token here since it's still part of tokenToEndpointMap until
it's done leaving)
-    private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
+    private final Set<InetAddress> leavingEndpoints = new HashSet<>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
-    private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>>
pendingRanges = new ConcurrentHashMap<String, Multimap<Range<Token>, InetAddress>>();
+    private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>>
pendingRanges = new ConcurrentHashMap<>();
 
     // nodes which are migrating to the new tokens in the ring
-    private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token,
InetAddress>>();
+    private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -121,7 +121,7 @@ public class TokenMetadata
 
     private ArrayList<Token> sortTokens()
     {
-        return new ArrayList<Token>(tokenToEndpointMap.keySet());
+        return new ArrayList<>(tokenToEndpointMap.keySet());
     }
 
     /** @return the number of nodes bootstrapping into source's primary range */
@@ -165,8 +165,6 @@ public class TokenMetadata
      *
      * Prefer this whenever there are multiple pairs to update, as each update (whether a
single or multiple)
      * is expensive (CASSANDRA-3831).
-     *
-     * @param endpointTokens
      */
     public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
     {
@@ -213,9 +211,6 @@ public class TokenMetadata
     /**
      * Store an end-point to host ID mapping.  Each ID must be unique, and
      * cannot be changed after the fact.
-     *
-     * @param hostId
-     * @param endpoint
      */
     public void updateHostId(UUID hostId, InetAddress endpoint)
     {
@@ -284,7 +279,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            Map<InetAddress, UUID> readMap = new HashMap<InetAddress, UUID>();
+            Map<InetAddress, UUID> readMap = new HashMap<>();
             readMap.putAll(endpointToHostIdMap);
             return readMap;
         }
@@ -407,6 +402,43 @@ public class TokenMetadata
     }
 
     /**
+     * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238.
+     */
+    public void updateTopology(InetAddress endpoint)
+    {
+        assert endpoint != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            logger.info("Updating topology for {}", endpoint);
+            topology.updateEndpoint(endpoint);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * This is called when the snitch properties for many endpoints are updated, it will
update
+     * the topology mappings of any endpoints whose snitch has changed, see CASSANDRA-10238.
+     */
+    public void updateTopology()
+    {
+        lock.writeLock().lock();
+        try
+        {
+            logger.info("Updating topology for all endpoints that have changed");
+            topology.updateEndpoints();
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
      * Remove pair of token/address from moving endpoints
      * @param endpoint address of the moving node
      */
@@ -442,7 +474,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return new ArrayList<Token>(tokenToEndpointMap.inverse().get(endpoint));
+            return new ArrayList<>(tokenToEndpointMap.inverse().get(endpoint));
         }
         finally
         {
@@ -508,7 +540,7 @@ public class TokenMetadata
         }
     }
 
-    private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
+    private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<>();
 
     /**
      * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
@@ -519,7 +551,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap,
null, inetaddressCmp),
+            return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null,
inetaddressCmp),
                                      HashBiMap.create(endpointToHostIdMap),
                                      new Topology(topology));
         }
@@ -622,9 +654,9 @@ public class TokenMetadata
 
     public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token>
tokens)
     {
-        Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(tokens.size());
+        Collection<Range<Token>> ranges = new ArrayList<>(tokens.size());
         for (Token right : tokens)
-            ranges.add(new Range<Token>(getPredecessor(right), right));
+            ranges.add(new Range<>(getPredecessor(right), right));
         return ranges;
     }
 
@@ -660,7 +692,7 @@ public class TokenMetadata
 
     public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress
endpoint)
     {
-        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+        List<Range<Token>> ranges = new ArrayList<>();
         for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(keyspaceName).entries())
         {
             if (entry.getValue().equals(endpoint))
@@ -845,7 +877,7 @@ public class TokenMetadata
                 for (InetAddress ep : eps)
                 {
                     sb.append(ep);
-                    sb.append(":");
+                    sb.append(':');
                     sb.append(tokenToEndpointMap.inverse().get(ep));
                     sb.append(System.getProperty("line.separator"));
                 }
@@ -857,7 +889,7 @@ public class TokenMetadata
                 sb.append(System.getProperty("line.separator"));
                 for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
                 {
-                    sb.append(entry.getValue()).append(":").append(entry.getKey());
+                    sb.append(entry.getValue()).append(':').append(entry.getKey());
                     sb.append(System.getProperty("line.separator"));
                 }
             }
@@ -896,7 +928,7 @@ public class TokenMetadata
         {
             for (Map.Entry<Range<Token>, InetAddress> rmap : entry.getValue().entries())
             {
-                sb.append(rmap.getValue()).append(":").append(rmap.getKey());
+                sb.append(rmap.getValue()).append(':').append(rmap.getKey());
                 sb.append(System.getProperty("line.separator"));
             }
         }
@@ -910,7 +942,7 @@ public class TokenMetadata
         if (ranges.isEmpty())
             return Collections.emptyList();
 
-        Set<InetAddress> endpoints = new HashSet<InetAddress>();
+        Set<InetAddress> endpoints = new HashSet<>();
         for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : ranges.entrySet())
         {
             if (entry.getKey().contains(token))
@@ -954,7 +986,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size()
+ bootstrapTokens.size());
+            Map<Token, InetAddress> map = new HashMap<>(tokenToEndpointMap.size()
+ bootstrapTokens.size());
             map.putAll(tokenToEndpointMap);
             map.putAll(bootstrapTokens);
             return map;
@@ -1001,14 +1033,14 @@ public class TokenMetadata
         /** reverse-lookup map for endpoint to current known dc/rack assignment */
         private final Map<InetAddress, Pair<String, String>> currentLocations;
 
-        protected Topology()
+        Topology()
         {
             dcEndpoints = HashMultimap.create();
-            dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
-            currentLocations = new HashMap<InetAddress, Pair<String, String>>();
+            dcRacks = new HashMap<>();
+            currentLocations = new HashMap<>();
         }
 
-        protected void clear()
+        void clear()
         {
             dcEndpoints.clear();
             dcRacks.clear();
@@ -1018,19 +1050,19 @@ public class TokenMetadata
         /**
          * construct deep-copy of other
          */
-        protected Topology(Topology other)
+        Topology(Topology other)
         {
             dcEndpoints = HashMultimap.create(other.dcEndpoints);
-            dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
+            dcRacks = new HashMap<>();
             for (String dc : other.dcRacks.keySet())
                 dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
-            currentLocations = new HashMap<InetAddress, Pair<String, String>>(other.currentLocations);
+            currentLocations = new HashMap<>(other.currentLocations);
         }
 
         /**
          * Stores current DC/rack assignment for ep
          */
-        protected void addEndpoint(InetAddress ep)
+        void addEndpoint(InetAddress ep)
         {
             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
             String dc = snitch.getDatacenter(ep);
@@ -1040,10 +1072,14 @@ public class TokenMetadata
             {
                 if (current.left.equals(dc) && current.right.equals(rack))
                     return;
-                dcRacks.get(current.left).remove(current.right, ep);
-                dcEndpoints.remove(current.left, ep);
+                doRemoveEndpoint(ep, current);
             }
 
+            doAddEndpoint(ep, dc, rack);
+        }
+
+        private void doAddEndpoint(InetAddress ep, String dc, String rack)
+        {
             dcEndpoints.put(dc, ep);
 
             if (!dcRacks.containsKey(dc))
@@ -1056,13 +1092,49 @@ public class TokenMetadata
         /**
          * Removes current DC/rack assignment for ep
          */
-        protected void removeEndpoint(InetAddress ep)
+        void removeEndpoint(InetAddress ep)
         {
             if (!currentLocations.containsKey(ep))
                 return;
-            Pair<String, String> current = currentLocations.remove(ep);
-            dcEndpoints.remove(current.left, ep);
+
+            doRemoveEndpoint(ep, currentLocations.remove(ep));
+        }
+
+        private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+        {
             dcRacks.get(current.left).remove(current.right, ep);
+            dcEndpoints.remove(current.left, ep);
+        }
+
+        void updateEndpoint(InetAddress ep)
+        {
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            if (snitch == null || !currentLocations.containsKey(ep))
+                return;
+
+           updateEndpoint(ep, snitch);
+        }
+
+        void updateEndpoints()
+        {
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            if (snitch == null)
+                return;
+
+            for (InetAddress ep : currentLocations.keySet())
+                updateEndpoint(ep, snitch);
+        }
+
+        private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+        {
+            Pair<String, String> current = currentLocations.get(ep);
+            String dc = snitch.getDatacenter(ep);
+            String rack = snitch.getRack(ep);
+            if (dc.equals(current.left) && rack.equals(current.right))
+                return;
+
+            doRemoveEndpoint(ep, current);
+            doAddEndpoint(ep, dc, rack);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
index 3237979..e6691c4 100644
--- a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
+++ b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
@@ -120,6 +120,7 @@ public class YamlFileNetworkTopologySnitch
                 protected void runMayThrow() throws ConfigurationException
                 {
                     loadTopologyConfiguration();
+                    StorageService.instance.updateTopology();
                 }
             };
             ResourceWatcher.watch(topologyConfigFilename, runnable,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5ac4980..c5f159e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1378,9 +1378,11 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
                         SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value));
                         break;
                     case DC:
+                        updateTopology(endpoint);
                         SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value));
                         break;
                     case RACK:
+                        updateTopology(endpoint);
                         SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value));
                         break;
                     case RPC_ADDRESS:
@@ -1398,6 +1400,20 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         }
     }
 
+    public void updateTopology(InetAddress endpoint)
+    {
+        if (getTokenMetadata().isMember(endpoint))
+        {
+            getTokenMetadata().updateTopology(endpoint);
+        }
+    }
+
+    public void updateTopology()
+    {
+        getTokenMetadata().updateTopology();
+
+    }
+
     private void updatePeerInfo(InetAddress endpoint)
     {
         EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index 95118dc..fc8095d 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -19,19 +19,27 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Map;
+
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Multimap;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
+import static junit.framework.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 
 import static org.apache.cassandra.Util.token;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -50,9 +58,9 @@ public class TokenMetadataTest
         tmd.updateNormalToken(token(SIX), InetAddress.getByName("127.0.0.6"));
     }
 
-    private void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin,
String... expected)
+    private static void testRingIterator(ArrayList<Token> ring, String start, boolean
includeMin, String... expected)
     {
-        ArrayList<Token> actual = new ArrayList<Token>();
+        ArrayList<Token> actual = new ArrayList<>();
         Iterators.addAll(actual, TokenMetadata.ringIterator(ring, token(start), includeMin));
         assertEquals(actual.toString(), expected.length, actual.size());
         for (int i = 0; i < expected.length; i++)
@@ -84,4 +92,199 @@ public class TokenMetadataTest
     {
         testRingIterator(new ArrayList<Token>(), "2", false);
     }
+
+    @Test
+    public void testTopologyUpdate_RackConsolidation() throws UnknownHostException
+    {
+        final InetAddress first = InetAddress.getByName("127.0.0.1");
+        final InetAddress second = InetAddress.getByName("127.0.0.6");
+        final String DATA_CENTER = "datacenter1";
+        final String RACK1 = "rack1";
+        final String RACK2 = "rack2";
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return endpoint.equals(first) ? RACK1 : RACK2;
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return DATA_CENTER;
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+
+        tmd.updateNormalToken(token(ONE), first);
+        tmd.updateNormalToken(token(SIX), second);
+
+        TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap();
+        assertNotNull(tokenMetadata);
+
+        TokenMetadata.Topology topology = tokenMetadata.getTopology();
+        assertNotNull(topology);
+
+        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+        assertNotNull(allEndpoints);
+        assertTrue(allEndpoints.size() == 2);
+        assertTrue(allEndpoints.containsKey(DATA_CENTER));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        assertNotNull(racks);
+        assertTrue(racks.size() == 1);
+        assertTrue(racks.containsKey(DATA_CENTER));
+        assertTrue(racks.get(DATA_CENTER).size() == 2);
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK2));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+        assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second));
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return RACK1;
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return DATA_CENTER;
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+
+        tokenMetadata.updateTopology(first);
+        tokenMetadata.updateTopology(second);
+
+        allEndpoints = topology.getDatacenterEndpoints();
+        assertNotNull(allEndpoints);
+        assertTrue(allEndpoints.size() == 2);
+        assertTrue(allEndpoints.containsKey(DATA_CENTER));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+        racks = topology.getDatacenterRacks();
+        assertNotNull(racks);
+        assertTrue(racks.size() == 1);
+        assertTrue(racks.containsKey(DATA_CENTER));
+        assertTrue(racks.get(DATA_CENTER).size() == 2);
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+        assertFalse(racks.get(DATA_CENTER).containsKey(RACK2));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(second));
+    }
+
+    @Test
+    public void testTopologyUpdate_RackExpansion() throws UnknownHostException
+    {
+        final InetAddress first = InetAddress.getByName("127.0.0.1");
+        final InetAddress second = InetAddress.getByName("127.0.0.6");
+        final String DATA_CENTER = "datacenter1";
+        final String RACK1 = "rack1";
+        final String RACK2 = "rack2";
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return RACK1;
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return DATA_CENTER;
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+
+        tmd.updateNormalToken(token(ONE), first);
+        tmd.updateNormalToken(token(SIX), second);
+
+        TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap();
+        assertNotNull(tokenMetadata);
+
+        TokenMetadata.Topology topology = tokenMetadata.getTopology();
+        assertNotNull(topology);
+
+        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+        assertNotNull(allEndpoints);
+        assertTrue(allEndpoints.size() == 2);
+        assertTrue(allEndpoints.containsKey(DATA_CENTER));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        assertNotNull(racks);
+        assertTrue(racks.size() == 1);
+        assertTrue(racks.containsKey(DATA_CENTER));
+        assertTrue(racks.get(DATA_CENTER).size() == 2);
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+        assertFalse(racks.get(DATA_CENTER).containsKey(RACK2));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(second));
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return endpoint.equals(first) ? RACK1 : RACK2;
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return DATA_CENTER;
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+
+        tokenMetadata.updateTopology();
+
+        allEndpoints = topology.getDatacenterEndpoints();
+        assertNotNull(allEndpoints);
+        assertTrue(allEndpoints.size() == 2);
+        assertTrue(allEndpoints.containsKey(DATA_CENTER));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+        racks = topology.getDatacenterRacks();
+        assertNotNull(racks);
+        assertTrue(racks.size() == 1);
+        assertTrue(racks.containsKey(DATA_CENTER));
+        assertTrue(racks.get(DATA_CENTER).size() == 2);
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK2));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+        assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second));
+    }
 }


Mime
View raw message