cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [2/2] cassandra git commit: Add algorithmic token allocation
Date Tue, 23 Jun 2015 11:08:04 GMT
Add algorithmic token allocation

patch by branimir; reviewed by benedict for CASSANDRA-7032


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a3fa887
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a3fa887
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a3fa887

Branch: refs/heads/trunk
Commit: 9a3fa887cfa03c082f249d1d4003d87c14ba5d24
Parents: 115ed23
Author: branimir <branimir.lambov@datastax.com>
Authored: Tue Jun 23 12:03:45 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Tue Jun 23 12:03:45 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  11 +
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |   5 +
 .../cassandra/db/commitlog/CommitLog.java       |   2 +-
 .../db/commitlog/CompressedSegment.java         |   2 +-
 .../org/apache/cassandra/dht/BootStrapper.java  |  57 +-
 .../cassandra/dht/ByteOrderedPartitioner.java   |  14 +
 .../cassandra/dht/ComparableObjectToken.java    |  14 +
 .../cassandra/dht/Murmur3Partitioner.java       |  27 +-
 src/java/org/apache/cassandra/dht/Token.java    |  12 +
 .../ReplicationAwareTokenAllocator.java         | 805 +++++++++++++++++++
 .../dht/tokenallocator/ReplicationStrategy.java |  29 +
 .../dht/tokenallocator/TokenAllocation.java     | 259 ++++++
 .../dht/tokenallocator/TokenAllocator.java      |  27 +
 .../apache/cassandra/locator/TokenMetadata.java |   6 +-
 .../cassandra/service/StorageService.java       |   2 +-
 test/conf/logback-test.xml                      |   1 -
 .../db/commitlog/CommitLogStressTest.java       |   2 +-
 .../ReplicationAwareTokenAllocatorTest.java     | 700 ++++++++++++++++
 .../io/compress/CompressorPerformance.java      |   4 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 .../apache/cassandra/dht/BootStrapperTest.java  |  96 ++-
 23 files changed, 2044 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f15a263..08528f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0:
+ * Add algorithmic token allocation (CASSANDRA-7032)
  * Add nodetool command to replay batchlog (CASSANDRA-9547)
  * Make file buffer cache independent of paths being read (CASSANDRA-8897)
  * Remove deprecated legacy Hadoop code (CASSANDRA-9353)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 5e65ae3..71e0a81 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -24,6 +24,17 @@ cluster_name: 'Test Cluster'
 # multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
 num_tokens: 256
 
+# Triggers automatic allocation of num_tokens tokens for this node. The allocation
+# algorithm attempts to choose tokens in a way that optimizes replicated load over
+# the nodes in the datacenter for the replication strategy used by the specified
+# keyspace.
+#
+# The load assigned to each node will be close to proportional to its number of
+# vnodes.
+#
+# Only supported with the Murmur3Partitioner.
+# allocate_tokens_keyspace: KEYSPACE
+
 # initial_token allows you to specify tokens manually.  While you can use # it with
 # vnodes (num_tokens > 1, above) -- in which case you should provide a 
 # comma-separated list -- it's primarily used when adding nodes # to legacy clusters 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 7cc28bf..54005e9 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -73,6 +73,8 @@ public class Config
     /* initial token in the ring */
     public String initial_token;
     public Integer num_tokens = 1;
+    /** Triggers automatic allocation of tokens if set, using the replication strategy of the referenced keyspace */
+    public String allocate_tokens_for_keyspace = null;
 
     public volatile Long request_timeout_in_ms = 10000L;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a40eac1..a2c4817 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -811,6 +811,11 @@ public class DatabaseDescriptor
         return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token));
     }
 
+    public static String getAllocateTokensKeyspace()
+    {
+        return System.getProperty("cassandra.allocate_tokens_keyspace", conf.allocate_tokens_for_keyspace);
+    }
+
     public static Collection<String> tokensFromString(String tokenString)
     {
         List<String> tokens = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index b3f944d..7332a08 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -74,7 +74,7 @@ public class CommitLog implements CommitLogMBean
     public ParameterizedClass compressorClass;
     final public String location;
 
-    static private CommitLog construct()
+    private static CommitLog construct()
     {
         CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), new CommitLogArchiver());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 8c62536..aa12e1d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.utils.SyncUtil;
  */
 public class CompressedSegment extends CommitLogSegment
 {
-    static private final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
+    private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
         protected ByteBuffer initialValue()
         {
             return ByteBuffer.allocate(0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index aca05f0..42eb6bb 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.tokenallocator.TokenAllocation;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -150,37 +151,61 @@ public class BootStrapper extends ProgressEventNotifierSupport
 
     /**
      * if initialtoken was specified, use that (split on comma).
-     * otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node.
+     * otherwise, if allocationKeyspace is specified use the token allocation algorithm to generate suitable tokens
      * else choose num_tokens tokens at random
      */
-    public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata) throws ConfigurationException
+    public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, InetAddress address) throws ConfigurationException
     {
+        String allocationKeyspace = DatabaseDescriptor.getAllocateTokensKeyspace();
         Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
+        if (initialTokens.size() > 0 && allocationKeyspace != null)
+            logger.warn("manually specified tokens override automatic allocation");
+
         // if user specified tokens, use those
         if (initialTokens.size() > 0)
-        {
-            logger.debug("tokens manually specified as {}",  initialTokens);
-            List<Token> tokens = new ArrayList<>(initialTokens.size());
-            for (String tokenString : initialTokens)
-            {
-                Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
-                if (metadata.getEndpoint(token) != null)
-                    throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first).");
-                tokens.add(token);
-            }
-            return tokens;
-        }
-
+            return getSpecifiedTokens(metadata, initialTokens);
+        
         int numTokens = DatabaseDescriptor.getNumTokens();
         if (numTokens < 1)
             throw new ConfigurationException("num_tokens must be >= 1");
 
+        if (allocationKeyspace != null)
+            return allocateTokens(metadata, address, allocationKeyspace, numTokens);
+
         if (numTokens == 1)
-            logger.warn("Picking random token for a single vnode.  You should probably add more vnodes; failing that, you should probably specify the token manually");
+            logger.warn("Picking random token for a single vnode.  You should probably add more vnodes and/or use the automatic token allocation mechanism.");
 
         return getRandomTokens(metadata, numTokens);
     }
 
+    private static Collection<Token> getSpecifiedTokens(final TokenMetadata metadata,
+                                                                Collection<String> initialTokens)
+    {
+        logger.debug("tokens manually specified as {}",  initialTokens);
+        List<Token> tokens = new ArrayList<>(initialTokens.size());
+        for (String tokenString : initialTokens)
+        {
+            Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
+            if (metadata.getEndpoint(token) != null)
+                throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first).");
+            tokens.add(token);
+        }
+        return tokens;
+    }
+
+    static Collection<Token> allocateTokens(final TokenMetadata metadata,
+                                            InetAddress address,
+                                            String allocationKeyspace,
+                                            int numTokens)
+    {
+        Keyspace ks = Keyspace.open(allocationKeyspace);
+        if (ks == null)
+            throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace);
+        AbstractReplicationStrategy rs = ks.getReplicationStrategy();
+        
+        return TokenAllocation.allocateTokens(metadata, rs, StorageService.getPartitioner(), address, numTokens);
+    }
+
     public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
     {
         Set<Token> tokens = new HashSet<>(numTokens);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index 297e5a6..308aea1 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -116,6 +116,20 @@ public class ByteOrderedPartitioner implements IPartitioner
         {
             return token;
         }
+
+        @Override
+        public double size(Token next)
+        {
+            throw new UnsupportedOperationException(String.format("Token type %s does not support token allocation.",
+                                                                  getClass().getSimpleName()));
+        }
+
+        @Override
+        public Token increaseSlightly()
+        {
+            throw new UnsupportedOperationException(String.format("Token type %s does not support token allocation.",
+                                                                  getClass().getSimpleName()));
+        }
     }
 
     public BytesToken getToken(ByteBuffer key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/ComparableObjectToken.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ComparableObjectToken.java b/src/java/org/apache/cassandra/dht/ComparableObjectToken.java
index 137f27c..97c0c52 100644
--- a/src/java/org/apache/cassandra/dht/ComparableObjectToken.java
+++ b/src/java/org/apache/cassandra/dht/ComparableObjectToken.java
@@ -66,4 +66,18 @@ abstract class ComparableObjectToken<C extends Comparable<C>> extends Token
 
         return token.compareTo(((ComparableObjectToken<C>) o).token);
     }
+
+    @Override
+    public double size(Token next)
+    {
+        throw new UnsupportedOperationException(String.format("Token type %s does not support token allocation.",
+                                                              getClass().getSimpleName()));
+    }
+
+    @Override
+    public Token increaseSlightly()
+    {
+        throw new UnsupportedOperationException(String.format("Token type %s does not support token allocation.",
+                                                              getClass().getSimpleName()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 96c603e..003879c 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -20,10 +20,7 @@ package org.apache.cassandra.dht;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.db.DecoratedKey;
@@ -140,6 +137,21 @@ public class Murmur3Partitioner implements IPartitioner
         {
             return token;
         }
+
+        @Override
+        public double size(Token next)
+        {
+            LongToken n = (LongToken) next;
+            long v = n.token - token;  // Overflow acceptable and desired.
+            double d = Math.scalb((double) v, -Long.SIZE); // Scale so that the full range is 1.
+            return d > 0.0 ? d : (d + 1.0); // Adjust for signed long, also making sure t.size(t) == 1.
+        }
+
+        @Override
+        public Token increaseSlightly()
+        {
+            return new LongToken(token + 1);
+        }
     }
 
     /**
@@ -170,7 +182,12 @@ public class Murmur3Partitioner implements IPartitioner
 
     public LongToken getRandomToken()
     {
-        return new LongToken(normalize(ThreadLocalRandom.current().nextLong()));
+        return getRandomToken(ThreadLocalRandom.current());
+    }
+
+    public LongToken getRandomToken(Random r)
+    {
+        return new LongToken(normalize(r.nextLong()));
     }
 
     private long normalize(long v)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 76918a7..0cc8a2d 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -73,6 +73,18 @@ public abstract class Token implements RingPosition<Token>, Serializable
     abstract public long getHeapSize();
     abstract public Object getTokenValue();
 
+    /**
+     * Returns a measure for the token space covered between this token and next.
+     * Used by the token allocation algorithm (see CASSANDRA-7032).
+     */
+    abstract public double size(Token next);
+    /**
+     * Returns a token that is slightly greater than this. Used to avoid clashes
+     * between nodes in separate datacentres trying to use the same token via
+     * the token allocation algorithm.
+     */
+    abstract public Token increaseSlightly();
+
     public Token getToken()
     {
         return this;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
new file mode 100644
index 0000000..054a90e
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.util.*;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * A Replication Aware allocator for tokens, that attempts to ensure an even distribution of ownership across
+ * the known cluster for the provided replication strategy.
+ *
+ * A unit is shorthand for a "unit of ownership" which translates roughly to a node, or a disk on the node,
+ * a CPU on the node, or some other relevant unit of ownership. These units should be the lowest rung over which
+ * ownership needs to be evenly distributed. At the moment only nodes as a whole are treated as units, but that
+ * will change with the introduction of token ranges per disk.
+ */
+class ReplicationAwareTokenAllocator<Unit> implements TokenAllocator<Unit>
+{
+    final NavigableMap<Token, Unit> sortedTokens;
+    final Multimap<Unit, Token> unitToTokens;
+    final ReplicationStrategy<Unit> strategy;
+    final IPartitioner partitioner;
+    final int replicas;
+
+    ReplicationAwareTokenAllocator(NavigableMap<Token, Unit> sortedTokens, ReplicationStrategy<Unit> strategy, IPartitioner partitioner)
+    {
+        this.sortedTokens = sortedTokens;
+        unitToTokens = HashMultimap.create();
+        for (Map.Entry<Token, Unit> en : sortedTokens.entrySet())
+            unitToTokens.put(en.getValue(), en.getKey());
+        this.strategy = strategy;
+        this.replicas = strategy.replicas();
+        this.partitioner = partitioner;
+    }
+
+    public Collection<Token> addUnit(Unit newUnit, int numTokens)
+    {
+        assert !unitToTokens.containsKey(newUnit);
+
+        if (unitCount() < replicas)
+            // Allocation does not matter; everything replicates everywhere.
+            return generateRandomTokens(newUnit, numTokens);
+        if (numTokens > sortedTokens.size())
+            // Some of the heuristics below can't deal with this case. Use random for now, later allocations can fix any problems this may cause.
+            return generateRandomTokens(newUnit, numTokens);
+
+        // ============= construct our initial token ring state =============
+
+        double optTokenOwnership = optimalTokenOwnership(numTokens);
+        Map<Object, GroupInfo> groups = Maps.newHashMap();
+        Map<Unit, UnitInfo<Unit>> unitInfos = createUnitInfos(groups);
+        if (groups.size() < replicas)
+        {
+            // We need at least replicas groups to do allocation correctly. If there aren't enough, 
+            // use random allocation.
+            // This part of the code should only be reached via the RATATest. StrategyAdapter should disallow
+            // token allocation in this case as the algorithm is not able to cover the behavior of NetworkTopologyStrategy.
+            return generateRandomTokens(newUnit, numTokens);
+        }
+
+        // initialise our new unit's state (with an idealised ownership)
+        // strategy must already know about this unit
+        UnitInfo<Unit> newUnitInfo = new UnitInfo<>(newUnit, numTokens * optTokenOwnership, groups, strategy);
+
+        // build the current token ring state
+        TokenInfo<Unit> tokens = createTokenInfos(unitInfos, newUnitInfo.group);
+        newUnitInfo.tokenCount = numTokens;
+
+        // ============= construct and rank our candidate token allocations =============
+
+        // walk the token ring, constructing the set of candidates in ring order
+        // as the midpoints between all existing tokens
+        CandidateInfo<Unit> candidates = createCandidates(tokens, newUnitInfo, optTokenOwnership);
+
+        // Evaluate the expected improvements from all candidates and form a priority queue.
+        PriorityQueue<Weighted<CandidateInfo<Unit>>> improvements = new PriorityQueue<>(sortedTokens.size());
+        CandidateInfo<Unit> candidate = candidates;
+        do
+        {
+            double impr = evaluateImprovement(candidate, optTokenOwnership, 1.0 / numTokens);
+            improvements.add(new Weighted<>(impr, candidate));
+            candidate = candidate.next;
+        } while (candidate != candidates);
+
+        // ============= iteratively take the best candidate, and re-rank =============
+
+        CandidateInfo<Unit> bestToken = improvements.remove().value;
+        for (int vn = 1; ; ++vn)
+        {
+            candidates = bestToken.removeFrom(candidates);
+            confirmCandidate(bestToken);
+
+            if (vn == numTokens)
+                break;
+
+            while (true)
+            {
+                // Get the next candidate in the queue. Its improvement may have changed (esp. if multiple tokens
+                // were good suggestions because they could improve the same problem)-- evaluate it again to check
+                // if it is still a good candidate.
+                bestToken = improvements.remove().value;
+                double impr = evaluateImprovement(bestToken, optTokenOwnership, (vn + 1.0) / numTokens);
+                Weighted<CandidateInfo<Unit>> next = improvements.peek();
+
+                // If it is better than the next in the queue, it is good enough. This is a heuristic that doesn't
+                // get the best results, but works well enough and on average cuts search time by a factor of O(vnodes).
+                if (next == null || impr >= next.weight)
+                    break;
+                improvements.add(new Weighted<>(impr, bestToken));
+            }
+        }
+
+        return ImmutableList.copyOf(unitToTokens.get(newUnit));
+    }
+
+    private Collection<Token> generateRandomTokens(Unit newUnit, int numTokens)
+    {
+        Set<Token> tokens = new HashSet<>(numTokens);
+        while (tokens.size() < numTokens)
+        {
+            Token token = partitioner.getRandomToken();
+            if (!sortedTokens.containsKey(token))
+            {
+                tokens.add(token);
+                sortedTokens.put(token, newUnit);
+                unitToTokens.put(newUnit, token);
+            }
+        }
+        return tokens;
+    }
+
+    private Map<Unit, UnitInfo<Unit>> createUnitInfos(Map<Object, GroupInfo> groups)
+    {
+        Map<Unit, UnitInfo<Unit>> map = Maps.newHashMap();
+        for (Unit n : sortedTokens.values())
+        {
+            UnitInfo<Unit> ni = map.get(n);
+            if (ni == null)
+                map.put(n, ni = new UnitInfo<>(n, 0, groups, strategy));
+            ni.tokenCount++;
+        }
+        return map;
+    }
+
+    /**
+     * Construct the token ring as a CircularList of TokenInfo,
+     * and populate the ownership of the UnitInfo's provided
+     */
+    private TokenInfo<Unit> createTokenInfos(Map<Unit, UnitInfo<Unit>> units, GroupInfo newUnitGroup)
+    {
+        // build the circular list
+        TokenInfo<Unit> prev = null;
+        TokenInfo<Unit> first = null;
+        for (Map.Entry<Token, Unit> en : sortedTokens.entrySet())
+        {
+            Token t = en.getKey();
+            UnitInfo<Unit> ni = units.get(en.getValue());
+            TokenInfo<Unit> ti = new TokenInfo<>(t, ni);
+            first = ti.insertAfter(first, prev);
+            prev = ti;
+        }
+
+        TokenInfo<Unit> curr = first;
+        do
+        {
+            populateTokenInfoAndAdjustUnit(curr, newUnitGroup);
+            curr = curr.next;
+        } while (curr != first);
+
+        return first;
+    }
+
+    private CandidateInfo<Unit> createCandidates(TokenInfo<Unit> tokens, UnitInfo<Unit> newUnitInfo, double initialTokenOwnership)
+    {
+        TokenInfo<Unit> curr = tokens;
+        CandidateInfo<Unit> first = null;
+        CandidateInfo<Unit> prev = null;
+        do
+        {
+            CandidateInfo<Unit> candidate = new CandidateInfo<Unit>(partitioner.midpoint(curr.prev.token, curr.token), curr, newUnitInfo);
+            first = candidate.insertAfter(first, prev);
+
+            candidate.replicatedOwnership = initialTokenOwnership;
+            populateCandidate(candidate);
+
+            prev = candidate;
+            curr = curr.next;
+        } while (curr != tokens);
+        prev.next = first;
+        return first;
+    }
+
+    private void populateCandidate(CandidateInfo<Unit> candidate)
+    {
+        // Only finding replication start would do.
+        populateTokenInfo(candidate, candidate.owningUnit.group);
+    }
+
+    /**
+     * Incorporates the selected candidate into the ring, adjusting ownership information and calculated token
+     * information.
+     */
+    private void confirmCandidate(CandidateInfo<Unit> candidate)
+    {
+        // This process is less efficient than it could be (loops through each vnode's replication span instead
+        // of recalculating replicationStart, replicationThreshold from existing data + new token data in an O(1)
+        // case analysis similar to evaluateImprovement). This is fine as the method does not dominate processing
+        // time.
+
+        // Put the accepted candidate in the token list.
+        UnitInfo<Unit> newUnit = candidate.owningUnit;
+        Token newToken = candidate.token;
+        sortedTokens.put(newToken, newUnit.unit);
+        unitToTokens.put(newUnit.unit, newToken);
+
+        TokenInfo<Unit> prev = candidate.prevInRing();
+        TokenInfo<Unit> newTokenInfo = new TokenInfo<>(newToken, newUnit);
+        newTokenInfo.replicatedOwnership = candidate.replicatedOwnership;
+        newTokenInfo.insertAfter(prev, prev);   // List is not empty so this won't need to change head of list.
+
+        // Update data for candidate.
+        populateTokenInfoAndAdjustUnit(newTokenInfo, newUnit.group);
+
+        ReplicationVisitor replicationVisitor = new ReplicationVisitor();
+        assert newTokenInfo.next == candidate.split;
+        for (TokenInfo<Unit> curr = newTokenInfo.next; !replicationVisitor.visitedAll(); curr = curr.next)
+        {
+            // update the candidate between curr and next
+            candidate = candidate.next;
+            populateCandidate(candidate);
+
+            if (!replicationVisitor.add(curr.owningUnit.group))
+                continue;    // If we've already seen this group, the token cannot be affected.
+
+            populateTokenInfoAndAdjustUnit(curr, newUnit.group);
+        }
+
+        replicationVisitor.clean();
+    }
+
+    /**
+     * Calculates the {@code replicationStart} of a token, as well as {@code replicationThreshold} which is chosen in a way
+     * that permits {@code findUpdatedReplicationStart} to quickly identify changes in ownership.
+     */
+    private Token populateTokenInfo(BaseTokenInfo<Unit, ?> token, GroupInfo newUnitGroup)
+    {
+        GroupInfo tokenGroup = token.owningUnit.group;
+        PopulateVisitor visitor = new PopulateVisitor();
+
+        // Replication start = the end of a token from the RF'th different group seen before the token.
+        Token replicationStart;
+        // The end of a token from the RF-1'th different group seen before the token.
+        Token replicationThreshold = token.token;
+        GroupInfo currGroup;
+        for (TokenInfo<Unit> curr = token.prevInRing(); ; curr = curr.prev)
+        {
+            replicationStart = curr.token;
+            currGroup = curr.owningUnit.group;
+            if (!visitor.add(currGroup))
+                continue; // Group is already seen.
+            if (visitor.visitedAll())
+                break;
+
+            replicationThreshold = replicationStart;
+            // Another instance of the same group precedes us in the replication range of the ring,
+            // so this is where our replication range begins
+            if (currGroup == tokenGroup)
+                break;
+        }
+        if (newUnitGroup == tokenGroup)
+            // new token is always a boundary (as long as it's closer than replicationStart)
+            replicationThreshold = token.token;
+        else if (newUnitGroup != currGroup && visitor.seen(newUnitGroup))
+            // already has new group in replication span before last seen. cannot be affected
+            replicationThreshold = replicationStart;
+        visitor.clean();
+
+        token.replicationThreshold = replicationThreshold;
+        token.replicationStart = replicationStart;
+        return replicationStart;
+    }
+
+    private void populateTokenInfoAndAdjustUnit(TokenInfo<Unit> populate, GroupInfo newUnitGroup)
+    {
+        Token replicationStart = populateTokenInfo(populate, newUnitGroup);
+        double newOwnership = replicationStart.size(populate.token);
+        double oldOwnership = populate.replicatedOwnership;
+        populate.replicatedOwnership = newOwnership;
+        populate.owningUnit.ownership += newOwnership - oldOwnership;
+    }
+
+    /**
+     * Evaluates the improvement in variance for both units and individual tokens when candidate is inserted into the
+     * ring.
+     */
+    private double evaluateImprovement(CandidateInfo<Unit> candidate, double optTokenOwnership, double newUnitMult)
+    {
+        double tokenChange = 0;
+
+        UnitInfo<Unit> candidateUnit = candidate.owningUnit;
+        Token candidateEnd = candidate.token;
+
+        // Form a chain of units affected by the insertion to be able to qualify change of unit ownership.
+        // A unit may be affected more than once.
+        UnitAdjustmentTracker<Unit> unitTracker = new UnitAdjustmentTracker<>(candidateUnit);
+
+        // Reflect change in ownership of the splitting token (candidate).
+        tokenChange += applyOwnershipAdjustment(candidate, candidateUnit, candidate.replicationStart, candidateEnd, optTokenOwnership, unitTracker);
+
+        // Loop through all vnodes that replicate candidate or split and update their ownership.
+        ReplicationVisitor replicationVisitor = new ReplicationVisitor();
+        for (TokenInfo<Unit> curr = candidate.split; !replicationVisitor.visitedAll(); curr = curr.next)
+        {
+            UnitInfo<Unit> currUnit = curr.owningUnit;
+
+            if (!replicationVisitor.add(currUnit.group))
+                continue;    // If this group is already seen, the token cannot be affected.
+
+            Token replicationEnd = curr.token;
+            Token replicationStart = findUpdatedReplicationStart(curr, candidate);
+            tokenChange += applyOwnershipAdjustment(curr, currUnit, replicationStart, replicationEnd, optTokenOwnership, unitTracker);
+        }
+        replicationVisitor.clean();
+
+        double nodeChange = unitTracker.calculateUnitChange(newUnitMult, optTokenOwnership);
+        return -(tokenChange + nodeChange);
+    }
+
+    /**
+     * Returns the start of the replication span for the token {@code curr} when {@code candidate} is inserted into the
+     * ring.
+     */
+    private Token findUpdatedReplicationStart(TokenInfo<Unit> curr, CandidateInfo<Unit> candidate)
+    {
+        return furtherStartToken(curr.replicationThreshold, candidate.token, curr.token);
+    }
+
+    /**
+     * Applies the ownership adjustment for the given element, updating tracked unit ownership and returning the change
+     * of variance.
+     */
+    private double applyOwnershipAdjustment(BaseTokenInfo<Unit, ?> curr, UnitInfo<Unit> currUnit,
+            Token replicationStart, Token replicationEnd,
+            double optTokenOwnership, UnitAdjustmentTracker<Unit> unitTracker)
+    {
+        double oldOwnership = curr.replicatedOwnership;
+        double newOwnership = replicationStart.size(replicationEnd);
+        double tokenCount = currUnit.tokenCount;
+        assert tokenCount > 0;
+        unitTracker.add(currUnit, newOwnership - oldOwnership);
+        return (sq(newOwnership - optTokenOwnership) - sq(oldOwnership - optTokenOwnership)) / sq(tokenCount);
+    }
+
+    /**
+     * Tracker for unit ownership changes. The changes are tracked by a chain of UnitInfos where the adjustedOwnership
+     * field is being updated as we see changes in token ownership.
+     *
+     * The chain ends with an element that points to itself; this element must be specified as argument to the
+     * constructor as well as be the first unit with which 'add' is called; when calculating the variance change
+     * a separate multiplier is applied to it (used to permit more freedom in choosing the first tokens of a unit).
+     */
+    private static class UnitAdjustmentTracker<Unit>
+    {
+        UnitInfo<Unit> unitsChain;
+
+        UnitAdjustmentTracker(UnitInfo<Unit> newUnit)
+        {
+            unitsChain = newUnit;
+        }
+
+        void add(UnitInfo<Unit> currUnit, double diff)
+        {
+            if (currUnit.prevUsed == null)
+            {
+                assert unitsChain.prevUsed != null || currUnit == unitsChain;
+
+                currUnit.adjustedOwnership = currUnit.ownership + diff;
+                currUnit.prevUsed = unitsChain;
+                unitsChain = currUnit;
+            }
+            else
+            {
+                currUnit.adjustedOwnership += diff;
+            }
+        }
+
+        double calculateUnitChange(double newUnitMult, double optTokenOwnership)
+        {
+            double unitChange = 0;
+            UnitInfo<Unit> unitsChain = this.unitsChain;
+            // Now loop through the units chain and add the unit-level changes. Also clear the groups' seen marks.
+            while (true)
+            {
+                double newOwnership = unitsChain.adjustedOwnership;
+                double oldOwnership = unitsChain.ownership;
+                double tokenCount = unitsChain.tokenCount;
+                double diff = (sq(newOwnership / tokenCount - optTokenOwnership) - sq(oldOwnership / tokenCount - optTokenOwnership));
+                UnitInfo<Unit> prev = unitsChain.prevUsed;
+                unitsChain.prevUsed = null;
+                if (unitsChain != prev)
+                    unitChange += diff;
+                else
+                {
+                    unitChange += diff * newUnitMult;
+                    break;
+                }
+                unitsChain = prev;
+            }
+            this.unitsChain = unitsChain;
+            return unitChange;
+        }
+    }
+
+
+    /**
+     * Helper class for marking/unmarking visited a chain of groups
+     */
+    private abstract class GroupVisitor
+    {
+        GroupInfo groupChain = GroupInfo.TERMINATOR;
+        int seen = 0;
+
+        abstract GroupInfo prevSeen(GroupInfo group);
+        abstract void setPrevSeen(GroupInfo group, GroupInfo prevSeen);
+
+        // true iff this is the first time we've visited this group
+        boolean add(GroupInfo group)
+        {
+            if (prevSeen(group) != null)
+                return false;
+            ++seen;
+            setPrevSeen(group, groupChain);
+            groupChain = group;
+            return true;
+        }
+
+        boolean visitedAll()
+        {
+            return seen >= replicas;
+        }
+
+        boolean seen(GroupInfo group)
+        {
+            return prevSeen(group) != null;
+        }
+
+        // Clean group seen markers.
+        void clean()
+        {
+            GroupInfo groupChain = this.groupChain;
+            while (groupChain != GroupInfo.TERMINATOR)
+            {
+                GroupInfo prev = prevSeen(groupChain);
+                setPrevSeen(groupChain, null);
+                groupChain = prev;
+            }
+            this.groupChain = GroupInfo.TERMINATOR;
+        }
+    }
+
+    private class ReplicationVisitor extends GroupVisitor
+    {
+        GroupInfo prevSeen(GroupInfo group)
+        {
+            return group.prevSeen;
+        }
+
+        void setPrevSeen(GroupInfo group, GroupInfo prevSeen)
+        {
+            group.prevSeen = prevSeen;
+        }
+    }
+
+    private class PopulateVisitor extends GroupVisitor
+    {
+        GroupInfo prevSeen(GroupInfo group)
+        {
+            return group.prevPopulate;
+        }
+
+        void setPrevSeen(GroupInfo group, GroupInfo prevSeen)
+        {
+            group.prevPopulate = prevSeen;
+        }
+    }
+
+    private Map.Entry<Token, Unit> mapEntryFor(Token t)
+    {
+        Map.Entry<Token, Unit> en = sortedTokens.floorEntry(t);
+        if (en == null)
+            en = sortedTokens.lastEntry();
+        return en;
+    }
+
+    Unit unitFor(Token t)
+    {
+        return mapEntryFor(t).getValue();
+    }
+
+    private double optimalTokenOwnership(int tokensToAdd)
+    {
+        return 1.0 * replicas / (sortedTokens.size() + tokensToAdd);
+    }
+
+    /**
+     * Selects from {@code t1}, {@code t2} the token that forms a bigger range with {@code towards} as the upper bound,
+     * taking into account wrapping.
+     * Unlike Token.size(), equality is taken to mean "same as" rather than covering the whole range.
+     */
+    private static Token furtherStartToken(Token t1, Token t2, Token towards)
+    {
+        if (t1.equals(towards))
+            return t2;
+        if (t2.equals(towards))
+            return t1;
+
+        return t1.size(towards) > t2.size(towards) ? t1 : t2;
+    }
+
+    private static double sq(double d)
+    {
+        return d * d;
+    }
+
+
+    /**
+     * For testing, remove the given unit preserving correct state of the allocator.
+     */
+    void removeUnit(Unit n)
+    {
+        Collection<Token> tokens = unitToTokens.removeAll(n);
+        sortedTokens.keySet().removeAll(tokens);
+    }
+
+    int unitCount()
+    {
+        return unitToTokens.asMap().size();
+    }
+
+    public String toString()
+    {
+        return getClass().getSimpleName();
+    }
+
+    // get or initialise the shared GroupInfo associated with the unit
+    private static <Unit> GroupInfo getGroup(Unit unit, Map<Object, GroupInfo> groupMap, ReplicationStrategy<Unit> strategy)
+    {
+        Object groupClass = strategy.getGroup(unit);
+        GroupInfo group = groupMap.get(groupClass);
+        if (group == null)
+            groupMap.put(groupClass, group = new GroupInfo(groupClass));
+        return group;
+    }
+
+    /**
+     * Unique group object that one or more UnitInfo objects link to.
+     */
+    private static class GroupInfo
+    {
+        /**
+         * Group identifier given by ReplicationStrategy.getGroup(Unit).
+         */
+        final Object group;
+
+        /**
+         * Seen marker. When non-null, the group is already seen in replication walks.
+         * Also points to previous seen group to enable walking the seen groups and clearing the seen markers.
+         */
+        GroupInfo prevSeen = null;
+        /**
+         * Same marker/chain used by populateTokenInfo.
+         */
+        GroupInfo prevPopulate = null;
+
+        /**
+         * Value used as terminator for seen chains.
+         */
+        static GroupInfo TERMINATOR = new GroupInfo(null);
+
+        public GroupInfo(Object group)
+        {
+            this.group = group;
+        }
+
+        public String toString()
+        {
+            return group.toString() + (prevSeen != null ? "*" : "");
+        }
+    }
+
+    /**
+     * Unit information created and used by ReplicationAwareTokenDistributor. Contained vnodes all point to the same
+     * instance.
+     */
+    static class UnitInfo<Unit>
+    {
+        final Unit unit;
+        final GroupInfo group;
+        double ownership;
+        int tokenCount;
+
+        /**
+         * During evaluateImprovement this is used to form a chain of units affected by the candidate insertion.
+         */
+        UnitInfo<Unit> prevUsed;
+        /**
+         * During evaluateImprovement this holds the ownership after the candidate insertion.
+         */
+        double adjustedOwnership;
+
+        private UnitInfo(Unit unit, GroupInfo group)
+        {
+            this.unit = unit;
+            this.group = group;
+            this.tokenCount = 0;
+        }
+
+        public UnitInfo(Unit unit, double ownership, Map<Object, GroupInfo> groupMap, ReplicationStrategy<Unit> strategy)
+        {
+            this(unit, getGroup(unit, groupMap, strategy));
+            this.ownership = ownership;
+        }
+
+        public String toString()
+        {
+            return String.format("%s%s(%.2e)%s",
+                    unit, unit == group.group ? (group.prevSeen != null ? "*" : "") : ":" + group.toString(),
+                    ownership, prevUsed != null ? (prevUsed == this ? "#" : "->" + prevUsed.toString()) : "");
+        }
+    }
+
+    private static class CircularList<T extends CircularList<T>>
+    {
+        T prev;
+        T next;
+
+        /**
+         * Inserts this after unit in the circular list which starts at head. Returns the new head of the list, which
+         * only changes if head was null.
+         */
+        @SuppressWarnings("unchecked")
+        T insertAfter(T head, T unit)
+        {
+            if (head == null)
+            {
+                return prev = next = (T) this;
+            }
+            assert unit != null;
+            assert unit.next != null;
+            prev = unit;
+            next = unit.next;
+            prev.next = (T) this;
+            next.prev = (T) this;
+            return head;
+        }
+
+        /**
+         * Removes this from the list that starts at head. Returns the new head of the list, which only changes if the
+         * head was removed.
+         */
+        T removeFrom(T head)
+        {
+            next.prev = prev;
+            prev.next = next;
+            return this == head ? (this == next ? null : next) : head;
+        }
+    }
+
+    private static class BaseTokenInfo<Unit, T extends BaseTokenInfo<Unit, T>> extends CircularList<T>
+    {
+        final Token token;
+        final UnitInfo<Unit> owningUnit;
+
+        /**
+         * Start of the replication span for the vnode, i.e. the first token of the RF'th group seen before the token.
+         * The replicated ownership of the unit is the range between {@code replicationStart} and {@code token}.
+         */
+        Token replicationStart;
+        /**
+         * The closest position that the new candidate can take to become the new replication start. If candidate is
+         * closer, the start moves to this position. Used to determine replicationStart after insertion of new token.
+         *
+         * Usually the RF minus one boundary, i.e. the first token of the RF-1'th group seen before the token.
+         */
+        Token replicationThreshold;
+        /**
+         * Current replicated ownership. This number is reflected in the owning unit's ownership.
+         */
+        double replicatedOwnership = 0;
+
+        public BaseTokenInfo(Token token, UnitInfo<Unit> owningUnit)
+        {
+            this.token = token;
+            this.owningUnit = owningUnit;
+        }
+
+        public String toString()
+        {
+            return String.format("%s(%s)", token, owningUnit);
+        }
+
+        /**
+         * Previous unit in the token ring. For existing tokens this is prev,
+         * for candidates it's "split".
+         */
+        TokenInfo<Unit> prevInRing()
+        {
+            return null;
+        }
+    }
+
+    /**
+     * TokenInfo about existing tokens/vnodes.
+     */
+    private static class TokenInfo<Unit> extends BaseTokenInfo<Unit, TokenInfo<Unit>>
+    {
+        public TokenInfo(Token token, UnitInfo<Unit> owningUnit)
+        {
+            super(token, owningUnit);
+        }
+
+        TokenInfo<Unit> prevInRing()
+        {
+            return prev;
+        }
+    }
+
+    /**
+     * TokenInfo about candidate new tokens/vnodes.
+     */
+    private static class CandidateInfo<Unit> extends BaseTokenInfo<Unit, CandidateInfo<Unit>>
+    {
+        // directly preceding token in the current token ring
+        final TokenInfo<Unit> split;
+
+        public CandidateInfo(Token token, TokenInfo<Unit> split, UnitInfo<Unit> owningUnit)
+        {
+            super(token, owningUnit);
+            this.split = split;
+        }
+
+        TokenInfo<Unit> prevInRing()
+        {
+            return split.prev;
+        }
+    }
+
+    static void dumpTokens(String lead, BaseTokenInfo<?, ?> tokens)
+    {
+        BaseTokenInfo<?, ?> token = tokens;
+        do
+        {
+            System.out.format("%s%s: rs %s rt %s size %.2e\n", lead, token, token.replicationStart, token.replicationThreshold, token.replicatedOwnership);
+            token = token.next;
+        } while (token != null && token != tokens);
+    }
+
+    static class Weighted<T> implements Comparable<Weighted<T>>
+    {
+        final double weight;
+        final T value;
+
+        public Weighted(double weight, T value)
+        {
+            this.weight = weight;
+            this.value = value;
+        }
+
+        @Override
+        public int compareTo(Weighted<T> o)
+        {
+            int cmp = Double.compare(o.weight, this.weight);
+            return cmp;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s<%s>", value, weight);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationStrategy.java b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationStrategy.java
new file mode 100644
index 0000000..6dbd37c
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationStrategy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht.tokenallocator;
+
+interface ReplicationStrategy<Unit>
+{
+    int replicas();
+
+    /**
+     * Returns a group identifier. getGroup(a) == getGroup(b) iff a and b are on the same group.
+     * @return Some hashable object.
+     */
+    Object getGroup(Unit unit);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
new file mode 100644
index 0000000..a357cb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.math.stat.descriptive.SummaryStatistics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.locator.TokenMetadata.Topology;
+
+public class TokenAllocation
+{
+    private static final Logger logger = LoggerFactory.getLogger(TokenAllocation.class);
+
+    public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
+                                                   final AbstractReplicationStrategy rs,
+                                                   final IPartitioner partitioner,
+                                                   final InetAddress endpoint,
+                                                   int numTokens)
+    {
+        StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint);
+        Collection<Token> tokens = create(tokenMetadata, strategy, partitioner).addUnit(endpoint, numTokens);
+        tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens);
+
+        if (logger.isWarnEnabled())
+        {
+            logger.warn("Selected tokens {}", tokens);
+            SummaryStatistics os = replicatedOwnershipStats(tokenMetadata, rs, endpoint);
+            TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap();
+            tokenMetadataCopy.updateNormalTokens(tokens, endpoint);
+            SummaryStatistics ns = replicatedOwnershipStats(tokenMetadataCopy, rs, endpoint);
+            logger.warn("Replicated node load in datacentre before allocation " + statToString(os));
+            logger.warn("Replicated node load in datacentre after allocation " + statToString(ns));
+
+            // TODO: Is it worth doing the replicated ownership calculation always to be able to raise this alarm?
+            if (ns.getStandardDeviation() > os.getStandardDeviation())
+                logger.warn("Unexpected growth in standard deviation after allocation.");
+        }
+        return tokens;
+    }
+
+    private static Collection<Token> adjustForCrossDatacenterClashes(final TokenMetadata tokenMetadata,
+                                                                     StrategyAdapter strategy, Collection<Token> tokens)
+    {
+        List<Token> filtered = Lists.newArrayListWithCapacity(tokens.size());
+
+        for (Token t : tokens)
+        {
+            while (tokenMetadata.getEndpoint(t) != null)
+            {
+                InetAddress other = tokenMetadata.getEndpoint(t);
+                if (strategy.inAllocationRing(other))
+                    throw new ConfigurationException(String.format("Allocated token %s already assigned to node %s. Is another node also allocating tokens?", t, other));
+                t = t.increaseSlightly();
+            }
+            filtered.add(t);
+        }
+        return filtered;
+    }
+
+    // return the ratio of ownership for each endpoint
+    public static Map<InetAddress, Double> evaluateReplicatedOwnership(TokenMetadata tokenMetadata, AbstractReplicationStrategy rs)
+    {
+        Map<InetAddress, Double> ownership = Maps.newHashMap();
+        List<Token> sortedTokens = tokenMetadata.sortedTokens();
+        Iterator<Token> it = sortedTokens.iterator();
+        Token current = it.next();
+        while (it.hasNext())
+        {
+            Token next = it.next();
+            addOwnership(tokenMetadata, rs, current, next, ownership);
+            current = next;
+        }
+        addOwnership(tokenMetadata, rs, current, sortedTokens.get(0), ownership);
+
+        return ownership;
+    }
+
+    static void addOwnership(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, Token current, Token next, Map<InetAddress, Double> ownership)
+    {
+        double size = current.size(next);
+        Token representative = current.getPartitioner().midpoint(current, next);
+        for (InetAddress n : rs.calculateNaturalEndpoints(representative, tokenMetadata))
+        {
+            Double v = ownership.get(n);
+            ownership.put(n, v != null ? v + size : size);
+        }
+    }
+
+    public static String statToString(SummaryStatistics stat)
+    {
+        return String.format("max %.2f min %.2f stddev %.4f", stat.getMax() / stat.getMean(), stat.getMin() / stat.getMean(), stat.getStandardDeviation());
+    }
+
+    public static SummaryStatistics replicatedOwnershipStats(TokenMetadata tokenMetadata,
+                                                             AbstractReplicationStrategy rs, InetAddress endpoint)
+    {
+        SummaryStatistics stat = new SummaryStatistics();
+        StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint);
+        for (Map.Entry<InetAddress, Double> en : evaluateReplicatedOwnership(tokenMetadata, rs).entrySet())
+        {
+            // Filter only in the same datacentre.
+            if (strategy.inAllocationRing(en.getKey()))
+                stat.addValue(en.getValue() / tokenMetadata.getTokens(en.getKey()).size());
+        }
+        return stat;
+    }
+
+    static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy, IPartitioner partitioner)
+    {
+        NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>();
+        for (Map.Entry<Token, InetAddress> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet())
+        {
+            if (strategy.inAllocationRing(en.getValue()))
+                sortedTokens.put(en.getKey(), en.getValue());
+        }
+        return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner);
+    }
+
+    interface StrategyAdapter extends ReplicationStrategy<InetAddress>
+    {
+        // return true iff the provided endpoint occurs in the same virtual token-ring we are allocating for
+        // i.e. the set of the nodes that share ownership with the node we are allocating
+        // alternatively: return false if the endpoint's ownership is independent of the node we are allocating tokens for
+        boolean inAllocationRing(InetAddress other);
+    }
+
+    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, final InetAddress endpoint)
+    {
+        if (rs instanceof NetworkTopologyStrategy)
+            return getStrategy(tokenMetadata, (NetworkTopologyStrategy) rs, rs.snitch, endpoint);
+        if (rs instanceof SimpleStrategy)
+            return getStrategy(tokenMetadata, (SimpleStrategy) rs, endpoint);
+        throw new ConfigurationException("Token allocation does not support replication strategy " + rs.getClass().getSimpleName());
+    }
+
+    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final SimpleStrategy rs, final InetAddress endpoint)
+    {
+        final int replicas = rs.getReplicationFactor();
+
+        return new StrategyAdapter()
+        {
+            @Override
+            public int replicas()
+            {
+                return replicas;
+            }
+
+            @Override
+            public Object getGroup(InetAddress unit)
+            {
+                return unit;
+            }
+
+            @Override
+            public boolean inAllocationRing(InetAddress other)
+            {
+                return true;
+            }
+        };
+    }
+
+    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final NetworkTopologyStrategy rs, final IEndpointSnitch snitch, final InetAddress endpoint)
+    {
+        final String dc = snitch.getDatacenter(endpoint);
+        final int replicas = rs.getReplicationFactor(dc);
+
+        Topology topology = tokenMetadata.getTopology();
+        int racks = topology.getDatacenterRacks().get(dc).size();
+
+        if (replicas >= racks)
+        {
+            return new StrategyAdapter()
+            {
+                @Override
+                public int replicas()
+                {
+                    return replicas;
+                }
+
+                @Override
+                public Object getGroup(InetAddress unit)
+                {
+                    return snitch.getRack(unit);
+                }
+
+                @Override
+                public boolean inAllocationRing(InetAddress other)
+                {
+                    return dc.equals(snitch.getDatacenter(other));
+                }
+            };
+        }
+        else if (racks == 1)
+        {
+            // One rack, each node treated as separate.
+            return new StrategyAdapter()
+            {
+                @Override
+                public int replicas()
+                {
+                    return replicas;
+                }
+
+                @Override
+                public Object getGroup(InetAddress unit)
+                {
+                    return unit;
+                }
+
+                @Override
+                public boolean inAllocationRing(InetAddress other)
+                {
+                    return dc.equals(snitch.getDatacenter(other));
+                }
+            };
+        }
+        else
+            throw new ConfigurationException(
+                                            String.format("Token allocation failed: the number of racks %d in datacentre %s is lower than its replication factor %d.",
+                                                          replicas, dc, racks));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java
new file mode 100644
index 0000000..580f2ec
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.util.Collection;
+
+import org.apache.cassandra.dht.Token;
+
+public interface TokenAllocator<Unit>
+{
+    public Collection<Token> addUnit(Unit newUnit, int numTokens);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index a609c67..1b3c560 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -781,7 +781,8 @@ public class TokenMetadata
     {
         List tokens = sortedTokens();
         int index = Collections.binarySearch(tokens, token);
-        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+//        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+        if (index < 0) index = -index-1;
         return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1));
     }
 
@@ -789,7 +790,8 @@ public class TokenMetadata
     {
         List tokens = sortedTokens();
         int index = Collections.binarySearch(tokens, token);
-        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+//        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+        if (index < 0) return (Token) tokens.get(-index-1);
         return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 60eafb8..25ed849 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -876,7 +876,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     throw new UnsupportedOperationException(s);
                 }
                 setMode(Mode.JOINING, "getting bootstrap token", true);
-                bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata);
+                bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddress());
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/test/conf/logback-test.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml
index 6d75aaf..b503f04 100644
--- a/test/conf/logback-test.xml
+++ b/test/conf/logback-test.xml
@@ -66,7 +66,6 @@
 
   <root level="DEBUG">
     <appender-ref ref="ASYNCFILE" />
-    <appender-ref ref="STDERR" />
     <appender-ref ref="STDOUT" />
   </root>
 </configuration>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a3fa887/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index f5fd2cf..13f3be6 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -127,7 +127,7 @@ public class CommitLogStressTest
     ReplayPosition discardedPos;
     
     @BeforeClass
-    static public void initialize() throws FileNotFoundException, IOException, InterruptedException
+    public static void initialize() throws FileNotFoundException, IOException, InterruptedException
     {
         try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
         {


Mime
View raw message