cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eev...@apache.org
Subject git commit: update TokenMetadata in support of many tokens per node
Date Mon, 09 Jul 2012 22:07:08 GMT
Updated Branches:
  refs/heads/trunk c22dd0821 -> e85afdc5b


update TokenMetadata in support of many tokens per node

Patch by Sam Overton; reviewed by jbellis for CASSANDRA-4121


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e85afdc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e85afdc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e85afdc5

Branch: refs/heads/trunk
Commit: e85afdc5b6a691591834bd32f766087560c60a39
Parents: c22dd08
Author: Eric Evans <eevans@apache.org>
Authored: Mon Jul 9 16:07:34 2012 -0600
Committer: Eric Evans <eevans@apache.org>
Committed: Mon Jul 9 16:07:34 2012 -0600

----------------------------------------------------------------------
 .../apache/cassandra/locator/SimpleStrategy.java   |    4 +-
 .../apache/cassandra/locator/TokenMetadata.java    |  157 +++++++++------
 .../apache/cassandra/service/StorageService.java   |  111 ++++++-----
 .../locator/NetworkTopologyStrategyTest.java       |    7 +-
 4 files changed, 160 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/locator/SimpleStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 11c2aa8..50d470f 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -54,7 +54,9 @@ public class SimpleStrategy extends AbstractReplicationStrategy
         Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
         while (endpoints.size() < replicas && iter.hasNext())
         {
-            endpoints.add(metadata.getEndpoint(iter.next()));
+            InetAddress ep = metadata.getEndpoint(iter.next());
+            if (!endpoints.contains(ep))
+                endpoints.add(ep);
         }
         return endpoints;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 3340b2b..8fb63d5 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -27,7 +28,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.*;
 
+import org.apache.cassandra.utils.BiMultiValMap;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SortedBiMultiValMap;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +46,7 @@ public class TokenMetadata
     private static final Logger logger = LoggerFactory.getLogger(TokenMetadata.class);
 
     /* Maintains token to endpoint map of every node in the cluster. */
-    private final BiMap<Token, InetAddress> tokenToEndpointMap;
+    private final BiMultiValMap<Token, InetAddress> tokenToEndpointMap;
 
     /* Maintains endpoint to host ID map of every node in the cluster */
     private final BiMap<InetAddress, UUID> endpointToHostIdMap;
@@ -72,7 +75,7 @@ public class TokenMetadata
     // Finally, note that recording the tokens of joining nodes in bootstrapTokens also
     // means we can detect and reject the addition of multiple nodes at the same token
     // before one becomes part of the ring.
-    private final BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.<Token,
InetAddress>create();
+    private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<Token,
InetAddress>();
     // (don't need to record Token here since it's still part of tokenToEndpointMap until
it's done leaving)
     private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
@@ -90,12 +93,21 @@ public class TokenMetadata
     /* list of subscribers that are notified when the tokenToEndpointMap changed */
     private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new
CopyOnWriteArrayList<AbstractReplicationStrategy>();
 
+    private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
+    {
+        @Override
+        public int compare(InetAddress o1, InetAddress o2)
+        {
+            return ByteBuffer.wrap(o1.getAddress()).compareTo(ByteBuffer.wrap(o2.getAddress()));
+        }
+    };
+    
     public TokenMetadata()
     {
-        this(HashBiMap.<Token, InetAddress>create(), new Topology());
+        this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
new Topology());
     }
 
-    public TokenMetadata(BiMap<Token, InetAddress> tokenToEndpointMap, Topology topology)
+    public TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, Topology
topology)
     {
         this.tokenToEndpointMap = tokenToEndpointMap;
         this.topology = topology;
@@ -105,22 +117,21 @@ public class TokenMetadata
 
     private ArrayList<Token> sortTokens()
     {
-        ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndpointMap.keySet());
-        Collections.sort(tokens);
-        return tokens;
+        return new ArrayList<Token>(tokenToEndpointMap.keySet());
     }
 
     /** @return the number of nodes bootstrapping into source's primary range */
     public int pendingRangeChanges(InetAddress source)
     {
         int n = 0;
-        Range<Token> sourceRange = getPrimaryRangeFor(getToken(source));
+        Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source));
         lock.readLock().lock();
         try
         {
             for (Token token : bootstrapTokens.keySet())
-                if (sourceRange.contains(token))
-                    n++;
+                for (Range<Token> range : sourceRanges)
+                    if (range.contains(token))
+                        n++;
         }
         finally
         {
@@ -134,7 +145,15 @@ public class TokenMetadata
      */
     public void updateNormalToken(Token token, InetAddress endpoint)
     {
-        updateNormalTokens(Collections.singleton(Pair.create(token, endpoint)));
+        updateNormalTokens(Collections.singleton(token), endpoint);
+    }
+
+    public void updateNormalTokens(Collection<Token> tokens, InetAddress endpoint)
+    {
+        Multimap<InetAddress, Token> endpointTokens = HashMultimap.create();
+        for (Token token : tokens)
+            endpointTokens.put(endpoint, token);
+        updateNormalTokens(endpointTokens);
     }
 
     /**
@@ -143,40 +162,39 @@ public class TokenMetadata
      * Prefer this whenever there are multiple pairs to update, as each update (whether a
single or multiple)
      * is expensive (CASSANDRA-3831).
      *
-     * @param tokenPairs
+     * @param endpointTokens
      */
-    public void updateNormalTokens(Set<Pair<Token, InetAddress>> tokenPairs)
+    public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
     {
-        if (tokenPairs.isEmpty())
+        if (endpointTokens.isEmpty())
             return;
 
         lock.writeLock().lock();
         try
         {
             boolean shouldSortTokens = false;
-            for (Pair<Token, InetAddress> tokenEndpointPair : tokenPairs)
+            for (InetAddress endpoint : endpointTokens.keySet())
             {
-                Token token = tokenEndpointPair.left;
-                InetAddress endpoint = tokenEndpointPair.right;
+                Collection<Token> tokens = endpointTokens.get(endpoint);
 
-                assert token != null;
-                assert endpoint != null;
+                assert tokens != null && !tokens.isEmpty();
 
-                bootstrapTokens.inverse().remove(endpoint);
-                tokenToEndpointMap.inverse().remove(endpoint);
-                InetAddress prev = tokenToEndpointMap.put(token, endpoint);
-                if (!endpoint.equals(prev))
+                bootstrapTokens.removeValue(endpoint);
+                tokenToEndpointMap.removeValue(endpoint);
+                topology.addEndpoint(endpoint);
+                leavingEndpoints.remove(endpoint);
+                removeFromMoving(endpoint); // also removing this endpoint from moving
+
+                for (Token token : tokens)
                 {
-                    if (prev != null)
+                    InetAddress prev = tokenToEndpointMap.put(token, endpoint);
+                    if (!endpoint.equals(prev))
                     {
-                        logger.warn("Token " + token + " changing ownership from " + prev
+ " to " + endpoint);
-                        topology.removeEndpoint(prev);
+                        if (prev != null)
+                            logger.warn("Token " + token + " changing ownership from " +
prev + " to " + endpoint);
+                        shouldSortTokens = true;
                     }
-                    shouldSortTokens = true;
                 }
-                topology.addEndpoint(endpoint);
-                leavingEndpoints.remove(endpoint);
-                removeFromMoving(endpoint); // also removing this endpoint from moving
             }
 
             if (shouldSortTokens)
@@ -239,26 +257,38 @@ public class TokenMetadata
         return readMap;
     }
 
+    @Deprecated
     public void addBootstrapToken(Token token, InetAddress endpoint)
     {
-        assert token != null;
+        addBootstrapTokens(Collections.singleton(token), endpoint);
+    }
+
+    public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
+    {
+        assert tokens != null && !tokens.isEmpty();
         assert endpoint != null;
 
         lock.writeLock().lock();
         try
         {
+            
             InetAddress oldEndpoint;
 
-            oldEndpoint = bootstrapTokens.get(token);
-            if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
-                throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint
+ " and " + endpoint + " (token " + token);
+            for (Token token : tokens)
+            {
+                oldEndpoint = bootstrapTokens.get(token);
+                if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+                    throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint
+ " and " + endpoint + " (token " + token);
+    
+                oldEndpoint = tokenToEndpointMap.get(token);
+                if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+                    throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint
+ " and " + endpoint + " (token " + token);
+            }
 
-            oldEndpoint = tokenToEndpointMap.get(token);
-            if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
-                throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint
+ " and " + endpoint + " (token " + token);
+            bootstrapTokens.removeValue(endpoint);
 
-            bootstrapTokens.inverse().remove(endpoint);
-            bootstrapTokens.put(token, endpoint);
+            for (Token token : tokens)
+                bootstrapTokens.put(token, endpoint);
         }
         finally
         {
@@ -324,8 +354,8 @@ public class TokenMetadata
         lock.writeLock().lock();
         try
         {
-            bootstrapTokens.inverse().remove(endpoint);
-            tokenToEndpointMap.inverse().remove(endpoint);
+            bootstrapTokens.removeValue(endpoint);
+            tokenToEndpointMap.removeValue(endpoint);
             topology.removeEndpoint(endpoint);
             leavingEndpoints.remove(endpoint);
             endpointToHostIdMap.remove(endpoint);
@@ -366,7 +396,7 @@ public class TokenMetadata
         }
     }
 
-    public Token getToken(InetAddress endpoint)
+    public Collection<Token> getTokens(InetAddress endpoint)
     {
         assert endpoint != null;
         assert isMember(endpoint); // don't want to return nulls
@@ -374,7 +404,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return tokenToEndpointMap.inverse().get(endpoint);
+            return new ArrayList<Token>(tokenToEndpointMap.inverse().get(endpoint));
         }
         finally
         {
@@ -382,6 +412,12 @@ public class TokenMetadata
         }
     }
 
+    @Deprecated
+    public Token getToken(InetAddress endpoint)
+    {
+        return getTokens(endpoint).iterator().next();
+    }
+
     public boolean isMember(InetAddress endpoint)
     {
         assert endpoint != null;
@@ -443,7 +479,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return new TokenMetadata(HashBiMap.create(tokenToEndpointMap), new Topology(topology));
+            return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap,
null, inetaddressCmp), new Topology(topology));
         }
         finally
         {
@@ -517,9 +553,18 @@ public class TokenMetadata
         }
     }
 
+    public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token>
tokens)
+    {
+        Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(tokens.size());
+        for (Token right : tokens)
+            ranges.add(new Range<Token>(getPredecessor(right), right));
+        return ranges;
+    }
+
+    @Deprecated
     public Range<Token> getPrimaryRangeFor(Token right)
     {
-        return new Range<Token>(getPredecessor(right), right);
+        return getPrimaryRangesFor(Arrays.asList(right)).iterator().next();
     }
 
     public ArrayList<Token> sortedTokens()
@@ -581,12 +626,12 @@ public class TokenMetadata
     }
 
     /** @return a copy of the bootstrapping tokens map */
-    public Map<Token, InetAddress> getBootstrapTokens()
+    public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
     {
         lock.readLock().lock();
         try
         {
-            return ImmutableMap.copyOf(bootstrapTokens);
+            return new BiMultiValMap<Token, InetAddress>(bootstrapTokens);
         }
         finally
         {
@@ -803,24 +848,6 @@ public class TokenMetadata
     }
 
     /**
-     * @return a token to endpoint map to consider for read operations on the cluster.
-     */
-    public Map<Token, InetAddress> getTokenToEndpointMapForReading()
-    {
-        lock.readLock().lock();
-        try
-        {
-            Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size());
-            map.putAll(tokenToEndpointMap);
-            return map;
-        }
-        finally
-        {
-            lock.readLock().unlock();
-        }
-    }
-
-    /**
      * @return a (stable copy, won't be modified) Token to Endpoint map for all the normal
and bootstrapping nodes
      *         in the cluster.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index ce14352..583b3a7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.base.Function;
 import com.google.common.collect.*;
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
@@ -56,6 +55,7 @@ import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ResponseVerbHandler;
+import org.apache.cassandra.service.AntiEntropyService.RepairFuture;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.*;
@@ -120,6 +120,12 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
         return getRangesForEndpoint(table, FBUtilities.getBroadcastAddress());
     }
 
+    public Collection<Range<Token>> getLocalPrimaryRanges()
+    {
+        return getPrimaryRangesForEndpoint(FBUtilities.getBroadcastAddress());
+    }
+
+    @Deprecated
     public Range<Token> getLocalPrimaryRange()
     {
         return getPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddress());
@@ -1168,7 +1174,7 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
             logger.info("Node " + endpoint + " state jump to leaving");
             tokenMetadata.updateNormalToken(token, endpoint);
         }
-        else if (!tokenMetadata.getToken(endpoint).equals(token))
+        else if (!tokenMetadata.getTokens(endpoint).contains(token))
         {
             logger.warn("Node " + endpoint + " 'leaving' token mismatch. Long network partition?");
             tokenMetadata.updateNormalToken(token, endpoint);
@@ -1338,7 +1344,7 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
     {
         TokenMetadata tm = StorageService.instance.getTokenMetadata();
         Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create();
-        Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
+        BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
         Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
 
         if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty())
@@ -1373,11 +1379,11 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
 
         // For each of the bootstrapping nodes, simply add and remove them one by one to
         // allLeftMetadata and check in between what their ranges would be.
-        for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+        for (InetAddress endpoint : bootstrapTokens.inverse().keySet())
         {
-            InetAddress endpoint = entry.getValue();
-
-            allLeftMetadata.updateNormalToken(entry.getKey(), endpoint);
+            Collection<Token> tokens = bootstrapTokens.inverse().get(endpoint);
+            
+            allLeftMetadata.updateNormalTokens(tokens, endpoint);
             for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
                 pendingRanges.put(range, endpoint);
             allLeftMetadata.removeEndpoint(endpoint);
@@ -1977,18 +1983,17 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
         if (Table.SYSTEM_TABLE.equals(tableName))
             return;
 
-        AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(),
tableName, isSequential, columnFamilies);
-        if (future == null)
-            return;
-        try
-        {
-            future.get();
-        }
-        catch (Exception e)
+        List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>();
+        for (Range<Token> range : getLocalPrimaryRanges())
         {
-            logger.error("Repair session " + future.session.getName() + " failed.", e);
-            throw new IOException("Some repair session(s) failed (see log for details).");
+            RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+            if (future != null)
+                futures.add(future);
         }
+        if (futures.isEmpty())
+            return;
+        for (AntiEntropyService.RepairFuture future : futures)
+            FBUtilities.waitOnFuture(future);
     }
 
     public void forceTableRepairRange(String beginToken, String endToken, final String tableName,
boolean isSequential, final String... columnFamilies) throws IOException
@@ -2041,9 +2046,8 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
      * This method returns the predecessor of the endpoint ep on the identifier
      * space.
      */
-    InetAddress getPredecessor(InetAddress ep)
+    InetAddress getPredecessor(Token token)
     {
-        Token token = tokenMetadata.getToken(ep);
         return tokenMetadata.getEndpoint(tokenMetadata.getPredecessor(token));
     }
 
@@ -2051,17 +2055,27 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
      * This method returns the successor of the endpoint ep on the identifier
      * space.
      */
-    public InetAddress getSuccessor(InetAddress ep)
+    public InetAddress getSuccessor(Token token)
     {
-        Token token = tokenMetadata.getToken(ep);
         return tokenMetadata.getEndpoint(tokenMetadata.getSuccessor(token));
     }
 
     /**
+     * Get the primary ranges for the specified endpoint.
+     * @param ep endpoint we are interested in.
+     * @return collection of ranges for the specified endpoint.
+     */
+    public Collection<Range<Token>> getPrimaryRangesForEndpoint(InetAddress ep)
+    {
+        return tokenMetadata.getPrimaryRangesFor(tokenMetadata.getTokens(ep));
+    }
+
+    /**
      * Get the primary range for the specified endpoint.
      * @param ep endpoint we are interested in.
      * @return range for the specified endpoint.
      */
+    @Deprecated
     public Range<Token> getPrimaryRangeForEndpoint(InetAddress ep)
     {
         return tokenMetadata.getPrimaryRangeFor(tokenMetadata.getToken(ep));
@@ -2741,14 +2755,12 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
 
     public Map<InetAddress, Float> getOwnership()
     {
-        Map<Token, InetAddress> tokensToEndpoints = tokenMetadata.getTokenToEndpointMapForReading();
-        List<Token> sortedTokens = new ArrayList<Token>(tokensToEndpoints.keySet());
-        Collections.sort(sortedTokens);
+        List<Token> sortedTokens = tokenMetadata.sortedTokens();
         // describeOwnership returns tokens in an unspecified order, let's re-order them
         Map<Token, Float> tokenMap = new TreeMap<Token, Float>(getPartitioner().describeOwnership(sortedTokens));
         Map<InetAddress, Float> stringMap = new LinkedHashMap<InetAddress, Float>();
         for (Map.Entry<Token, Float> entry : tokenMap.entrySet())
-            stringMap.put(tokensToEndpoints.get(entry.getKey()), entry.getValue());
+            stringMap.put(tokenMetadata.getEndpoint(entry.getKey()), entry.getValue());
         return stringMap;
     }
 
@@ -2773,22 +2785,14 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
         if (keyspace == null)
             keyspace = Schema.instance.getNonSystemTables().get(0);
 
-        final BiMap<InetAddress, Token> endpointsToTokens = ImmutableBiMap.copyOf(metadata.getTokenToEndpointMapForReading()).inverse();
-
         Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<Collection<InetAddress>>();
-        if (isDcAwareReplicationStrategy(keyspace))
-        {
-            // mapping of dc's to nodes, use sorted map so that we get dcs sorted
-            SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints =
new TreeMap<String, Collection<InetAddress>>();
-            sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap());
-            for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values())
-                endpointsGroupedByDc.add(endpoints);
-        }
-        else
-        {
-            endpointsGroupedByDc.add(endpointsToTokens.keySet());
-        }
+        // mapping of dc's to nodes, use sorted map so that we get dcs sorted
+        SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new
TreeMap<String, Collection<InetAddress>>();
+        sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap());
+        for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values())
+            endpointsGroupedByDc.add(endpoints);
 
+        Map<Token, Float> tokenOwnership = getPartitioner().describeOwnership(tokenMetadata.sortedTokens());
         LinkedHashMap<InetAddress, Float> finalOwnership = Maps.newLinkedHashMap();
 
         // calculate ownership per dc
@@ -2802,19 +2806,22 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
             {
                 public int compare(InetAddress o1, InetAddress o2)
                 {
-                    return endpointsToTokens.get(o1).compareTo(endpointsToTokens.get(o2));
-                }
-            });
+                    byte[] b1 = o1.getAddress();
+                    byte[] b2 = o2.getAddress();
 
-            // calculate the ownership without replication
-            Function<InetAddress, Token> f = new Function<InetAddress, Token>()
-            {
-                public Token apply(InetAddress arg0)
-                {
-                    return endpointsToTokens.get(arg0);
+                    if(b1.length < b2.length) return -1;
+                    if(b1.length > b2.length) return 1;
+
+                    for(int i = 0; i < b1.length; i++)
+                    {
+                        int left = (int)b1[i] & 0xFF;
+                        int right = (int)b2[i] & 0xFF;
+                        if (left < right)       return -1;
+                        else if (left > right)  return 1;
+                    }
+                    return 0;
                 }
-            };
-            Map<Token, Float> tokenOwnership = getPartitioner().describeOwnership(Lists.transform(sortedEndpoints,
f));
+            });
 
             // calculate the ownership with replication and add the endpoint to the final
ownership map
             for (InetAddress endpoint : endpoints)
@@ -3144,7 +3151,9 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
      */
     public List<String> getRangeKeySample()
     {
-        List<DecoratedKey> keys = keySamples(ColumnFamilyStore.allUserDefined(), getLocalPrimaryRange());
+        List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+        for (Range<Token> range : getLocalPrimaryRanges())
+            keys.addAll(keySamples(ColumnFamilyStore.allUserDefined(), range));
 
         List<String> sampledKeys = new ArrayList<String>(keys.size());
         for (DecoratedKey key : keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index 9e3d684..bd03766 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -41,6 +41,9 @@ import org.apache.cassandra.dht.StringToken;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.Pair;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
 public class NetworkTopologyStrategyTest
 {
     private String table = "Keyspace1";
@@ -105,7 +108,7 @@ public class NetworkTopologyStrategyTest
         DatabaseDescriptor.setEndpointSnitch(snitch);
         TokenMetadata metadata = new TokenMetadata();
         Map<String, String> configOptions = new HashMap<String, String>();
-        Set<Pair<Token, InetAddress>> tokens = new HashSet<Pair<Token,
InetAddress>>();
+        Multimap<InetAddress, Token> tokens = HashMultimap.create();
 
         int totalRF = 0;
         for (int dc = 0; dc < dcRacks.length; ++dc)
@@ -120,7 +123,7 @@ public class NetworkTopologyStrategyTest
                     InetAddress address = InetAddress.getByAddress(ipBytes);
                     StringToken token = new StringToken(String.format("%02x%02x%02x", ep,
rack, dc));
                     logger.debug("adding node " + address + " at " + token);
-                    tokens.add(new Pair<Token, InetAddress>(token, address));
+                    tokens.put(address, token);
                 }
             }
         }


Mime
View raw message