cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/3] cassandra git commit: Make sure the same token does not exist in several data directories
Date Tue, 05 Jan 2016 15:56:48 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.3 acc5b63cc -> ea4f64977


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 8b4351f..5e78834 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction.writers;
 
 
+import java.util.List;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -39,16 +40,18 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 public class DefaultCompactionWriter extends CompactionAwareWriter
 {
     protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
+    private final int sstableLevel;
 
     public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
-        this(cfs, directories, txn, nonExpiredSSTables, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, false, false, 0);
     }
 
     @SuppressWarnings("resource")
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals, int sstableLevel)
     {
         super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+        this.sstableLevel = sstableLevel;
     }
 
     @Override
@@ -58,14 +61,14 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     }
 
     @Override
-    protected void switchCompactionLocation(Directories.DataDirectory directory)
+    public void switchCompactionLocation(Directories.DataDirectory directory)
     {
         @SuppressWarnings("resource")
         SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))),
                                                     estimatedTotalKeys,
                                                     minRepairedAt,
                                                     cfs.metadata,
-                                                    new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
+                                                    new MetadataCollector(txn.originals(), cfs.metadata.comparator, sstableLevel),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
                                                     cfs.indexManager.listIndexes(),
                                                     txn);
@@ -73,6 +76,12 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     }
 
     @Override
+    public List<SSTableReader> finish(long repairedAt)
+    {
+        return sstableWriter.setRepairedAt(repairedAt).finish();
+    }
+
+    @Override
     public long estimatedKeys()
     {
         return estimatedTotalKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index b0c4562..0c88ac6 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -17,12 +17,9 @@
  */
 package org.apache.cassandra.db.compaction.writers;
 
-import java.io.File;
+import java.util.List;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -37,15 +34,14 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 
 public class MajorLeveledCompactionWriter extends CompactionAwareWriter
 {
-    private static final Logger logger = LoggerFactory.getLogger(MajorLeveledCompactionWriter.class);
     private final long maxSSTableSize;
-    private final long expectedWriteSize;
-    private final Set<SSTableReader> allSSTables;
     private int currentLevel = 1;
     private long averageEstimatedKeysPerSSTable;
     private long partitionsWritten = 0;
     private long totalWrittenInLevel = 0;
     private int sstablesWritten = 0;
+    private final long keysPerSSTable;
+    private Directories.DataDirectory sstableDirectory;
 
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
                                         Directories directories,
@@ -67,8 +63,8 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
     {
         super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
         this.maxSSTableSize = maxSSTableSize;
-        this.allSSTables = txn.originals();
-        expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
+        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
+        keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
     }
 
     @Override
@@ -86,28 +82,33 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                 totalWrittenInLevel = 0;
                 currentLevel++;
             }
-
-            averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
-            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
-            partitionsWritten = 0;
-            sstablesWritten++;
+            switchCompactionLocation(sstableDirectory);
         }
         return rie != null;
 
     }
 
-    public void switchCompactionLocation(Directories.DataDirectory directory)
+    @Override
+    public void switchCompactionLocation(Directories.DataDirectory location)
+    {
+        this.sstableDirectory = location;
+        averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
+        sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
+                keysPerSSTable,
+                minRepairedAt,
+                cfs.metadata,
+                new MetadataCollector(txn.originals(), cfs.metadata.comparator, currentLevel),
+                SerializationHeader.make(cfs.metadata, txn.originals()),
+                cfs.indexManager.listIndexes(),
+                txn));
+        partitionsWritten = 0;
+        sstablesWritten = 0;
+
+    }
+
+    @Override
+    public List<SSTableReader> finish(long repairedAt)
     {
-        File sstableDirectory = getDirectories().getLocationForDisk(directory);
-        @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                    averageEstimatedKeysPerSSTable,
-                                                    minRepairedAt,
-                                                    cfs.metadata,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                    cfs.indexManager.listIndexes(),
-                                                    txn);
-        sstableWriter.switchWriter(writer);
+        return sstableWriter.setRepairedAt(repairedAt).finish();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 1dc72e7..ac83cc6 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db.compaction.writers;
 
+import java.io.File;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -33,11 +35,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 public class MaxSSTableSizeWriter extends CompactionAwareWriter
 {
     private final long estimatedTotalKeys;
-    private final long expectedWriteSize;
     private final long maxSSTableSize;
     private final int level;
     private final long estimatedSSTables;
     private final Set<SSTableReader> allSSTables;
+    private Directories.DataDirectory sstableDirectory;
 
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
                                 Directories directories,
@@ -63,25 +65,25 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         this.allSSTables = txn.originals();
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;
-        long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
-        expectedWriteSize = Math.min(maxSSTableSize, totalSize);
         estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
     }
 
-    @Override
-    public boolean realAppend(UnfilteredRowIterator partition)
+    protected boolean realAppend(UnfilteredRowIterator partition)
     {
         RowIndexEntry rie = sstableWriter.append(partition);
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
-            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+        {
+            switchCompactionLocation(sstableDirectory);
+        }
         return rie != null;
     }
 
+    @Override
     public void switchCompactionLocation(Directories.DataDirectory location)
     {
-        @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
+        sstableDirectory = location;
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
                                                     estimatedTotalKeys / estimatedSSTables,
                                                     minRepairedAt,
                                                     cfs.metadata,
@@ -91,7 +93,11 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                                     txn);
 
         sstableWriter.switchWriter(writer);
+    }
 
+    public List<SSTableReader> finish(long repairedAt)
+    {
+        return sstableWriter.setRepairedAt(repairedAt).finish();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 3a7f526..46183dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -17,8 +17,8 @@
  */
 package org.apache.cassandra.db.compaction.writers;
 
-import java.io.File;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -51,6 +51,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
     private final Set<SSTableReader> allSSTables;
     private long currentBytesToWrite;
     private int currentRatioIndex = 0;
+    private Directories.DataDirectory location;
 
     public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
@@ -82,10 +83,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
             }
         }
         ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
-        long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
         currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-        switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
-        logger.trace("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
     }
 
     @Override
@@ -96,15 +94,17 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
         {
             currentRatioIndex++;
             currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-            switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
+            switchCompactionLocation(location);
+            logger.debug("Switching writer, currentBytesToWrite = {}", currentBytesToWrite);
         }
         return rie != null;
     }
 
+    @Override
     public void switchCompactionLocation(Directories.DataDirectory location)
     {
+        this.location = location;
         long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
-        @SuppressWarnings("resource")
         SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
                                                     currentPartitionsToWrite,
                                                     minRepairedAt,
@@ -115,6 +115,11 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
                                                     txn);
         logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
         sstableWriter.switchWriter(writer);
+    }
 
+    @Override
+    public List<SSTableReader> finish(long repairedAt)
+    {
+        return sstableWriter.setRepairedAt(repairedAt).finish();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c09d49c..4c73472 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -327,10 +327,10 @@ public class Tracker
         apply(View.markFlushing(memtable));
     }
 
-    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
+    public void replaceFlushed(Memtable memtable, Iterable<SSTableReader> sstables)
     {
         assert !isDummy();
-        if (sstables == null || sstables.isEmpty())
+        if (sstables == null || Iterables.isEmpty(sstables))
         {
             // sstable may be null if we flushed batchlog and nothing needed to be retained
             // if it's null, we don't care what state the cfstore is in, we just replace it and continue

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index b62c7e3..63926ed 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -310,7 +310,7 @@ public class View
     }
 
     // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
-    static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed)
+    static Function<View, View> replaceFlushed(final Memtable memtable, final Iterable<SSTableReader> flushed)
     {
         return new Function<View, View>()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index e0a08dc..b559a6f 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -49,6 +50,17 @@ public interface IPartitioner
     public Token getMinimumToken();
 
     /**
+     * The biggest token for this partitioner, unlike getMinimumToken, this token is actually used and users wanting to
+     * include all tokens need to do getMaximumToken().maxKeyBound()
+     *
+     * Not implemented for the ordered partitioners
+     */
+    default Token getMaximumToken()
+    {
+        throw new UnsupportedOperationException("If you are using a splitting partitioner, getMaximumToken has to be implemented");
+    }
+
+    /**
      * @return a Token that can be used to route a given key
      * (This is NOT a method to create a Token from its string representation;
      * for that, use TokenFactory.fromString.)
@@ -84,4 +96,9 @@ public interface IPartitioner
      * Used by secondary indices.
      */
     public AbstractType<?> partitionOrdering();
+
+    default Optional<Splitter> splitter()
+    {
+        return Optional.empty();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index d68be3f..f9f6113 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -48,6 +48,19 @@ public class Murmur3Partitioner implements IPartitioner
     public static final Murmur3Partitioner instance = new Murmur3Partitioner();
     public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
 
+    private final Splitter splitter = new Splitter(this)
+    {
+        public Token tokenForValue(BigInteger value)
+        {
+            return new LongToken(value.longValue());
+        }
+
+        public BigInteger valueForToken(Token token)
+        {
+            return BigInteger.valueOf(((LongToken) token).token);
+        }
+    };
+
     public DecoratedKey decorateKey(ByteBuffer key)
     {
         long[] hash = getHash(key);
@@ -291,8 +304,18 @@ public class Murmur3Partitioner implements IPartitioner
         return LongType.instance;
     }
 
+    public Token getMaximumToken()
+    {
+        return new LongToken(Long.MAX_VALUE);
+    }
+
     public AbstractType<?> partitionOrdering()
     {
         return partitionOrdering;
     }
+
+    public Optional<Splitter> splitter()
+    {
+        return Optional.of(splitter);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index b0dea01..96a96ca 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -50,6 +50,19 @@ public class RandomPartitioner implements IPartitioner
     public static final RandomPartitioner instance = new RandomPartitioner();
     public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
 
+    private final Splitter splitter = new Splitter(this)
+    {
+        public Token tokenForValue(BigInteger value)
+        {
+            return new BigIntegerToken(value);
+        }
+
+        public BigInteger valueForToken(Token token)
+        {
+            return ((BigIntegerToken)token).getTokenValue();
+        }
+    };
+
     public DecoratedKey decorateKey(ByteBuffer key)
     {
         return new CachedHashDecoratedKey(getToken(key), key);
@@ -194,6 +207,11 @@ public class RandomPartitioner implements IPartitioner
         return ownerships;
     }
 
+    public Token getMaximumToken()
+    {
+        return new BigIntegerToken(MAXIMUM);
+    }
+
     public AbstractType<?> getTokenValidator()
     {
         return IntegerType.instance;
@@ -203,4 +221,10 @@ public class RandomPartitioner implements IPartitioner
     {
         return partitionOrdering;
     }
+
+    public Optional<Splitter> splitter()
+    {
+        return Optional.of(splitter);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 1fc6c46..3cc3b23 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -473,6 +473,23 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
         return new Range<T>(left, newRight);
     }
 
+    public static <T extends RingPosition<T>> List<Range<T>> sort(Collection<Range<T>> ranges)
+    {
+        List<Range<T>> output = new ArrayList<>(ranges.size());
+        for (Range<T> r : ranges)
+            output.addAll(r.unwrap());
+        // sort by left
+        Collections.sort(output, new Comparator<Range<T>>()
+        {
+            public int compare(Range<T> b1, Range<T> b2)
+            {
+                return b1.left.compareTo(b2.left);
+            }
+        });
+        return output;
+    }
+
+
     /**
      * Compute a range of keys corresponding to a given range of token.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Splitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java
new file mode 100644
index 0000000..67b578d
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/Splitter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Partition splitter.
+ */
+public abstract class Splitter
+{
+    private final IPartitioner partitioner;
+
+    protected Splitter(IPartitioner partitioner)
+    {
+        this.partitioner = partitioner;
+    }
+
+    protected abstract Token tokenForValue(BigInteger value);
+
+    protected abstract BigInteger valueForToken(Token token);
+
+    public List<Token> splitOwnedRanges(int parts, List<Range<Token>> localRanges, boolean dontSplitRanges)
+    {
+        if (localRanges.isEmpty() || parts == 1)
+            return Collections.singletonList(partitioner.getMaximumToken());
+
+        BigInteger totalTokens = BigInteger.ZERO;
+        for (Range<Token> r : localRanges)
+        {
+            BigInteger right = valueForToken(token(r.right));
+            totalTokens = totalTokens.add(right.subtract(valueForToken(r.left)));
+        }
+        BigInteger perPart = totalTokens.divide(BigInteger.valueOf(parts));
+
+        if (dontSplitRanges)
+            return splitOwnedRangesNoPartialRanges(localRanges, perPart, parts);
+
+        List<Token> boundaries = new ArrayList<>();
+        BigInteger sum = BigInteger.ZERO;
+        for (Range<Token> r : localRanges)
+        {
+            Token right = token(r.right);
+            BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left)).abs();
+            BigInteger left = valueForToken(r.left);
+            while (sum.add(currentRangeWidth).compareTo(perPart) >= 0)
+            {
+                BigInteger withinRangeBoundary = perPart.subtract(sum);
+                left = left.add(withinRangeBoundary);
+                boundaries.add(tokenForValue(left));
+                currentRangeWidth = currentRangeWidth.subtract(withinRangeBoundary);
+                sum = BigInteger.ZERO;
+            }
+            sum = sum.add(currentRangeWidth);
+        }
+        boundaries.set(boundaries.size() - 1, partitioner.getMaximumToken());
+
+        assert boundaries.size() == parts : boundaries.size() +"!="+parts+" "+boundaries+":"+localRanges;
+        return boundaries;
+    }
+
+    private List<Token> splitOwnedRangesNoPartialRanges(List<Range<Token>> localRanges, BigInteger perPart, int parts)
+    {
+        List<Token> boundaries = new ArrayList<>(parts);
+        BigInteger sum = BigInteger.ZERO;
+        int i = 0;
+        while (boundaries.size() < parts - 1)
+        {
+            Range<Token> r = localRanges.get(i);
+            Range<Token> nextRange = localRanges.get(i + 1);
+            Token right = token(r.right);
+            Token nextRight = token(nextRange.right);
+
+            BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left));
+            BigInteger nextRangeWidth = valueForToken(nextRight).subtract(valueForToken(nextRange.left));
+            sum = sum.add(currentRangeWidth);
+            // does this or next range take us beyond the per part limit?
+            if (sum.compareTo(perPart) > 0 || sum.add(nextRangeWidth).compareTo(perPart) > 0)
+            {
+                // Either this or the next range will take us beyond the perPart limit. Will stopping now or
+                // adding the next range create the smallest difference to perPart?
+                BigInteger diffCurrent = sum.subtract(perPart).abs();
+                BigInteger diffNext = sum.add(nextRangeWidth).subtract(perPart).abs();
+                if (diffNext.compareTo(diffCurrent) >= 0)
+                {
+                    sum = BigInteger.ZERO;
+                    boundaries.add(right);
+                }
+            }
+            i++;
+        }
+        boundaries.add(partitioner.getMaximumToken());
+        return boundaries;
+    }
+
+    /**
+     * We avoid calculating for wrap around ranges, instead we use the actual max token, and then, when translating
+     * to PartitionPositions, we include tokens from .minKeyBound to .maxKeyBound to make sure we include all tokens.
+     */
+    private Token token(Token t)
+    {
+        return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumToken() : t;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 68dbd74..2217ae2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -35,9 +35,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 public class SimpleSSTableMultiWriter implements SSTableMultiWriter
 {
     private final SSTableWriter writer;
+    private final LifecycleTransaction txn;
 
-    protected SimpleSSTableMultiWriter(SSTableWriter writer)
+    protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
     {
+        this.txn = txn;
         this.writer = writer;
     }
 
@@ -90,6 +92,7 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
 
     public Throwable abort(Throwable accumulate)
     {
+        txn.untrackNew(writer);
         return writer.abort(accumulate);
     }
 
@@ -114,6 +117,6 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
                                             LifecycleTransaction txn)
     {
         SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn);
-        return new SimpleSSTableMultiWriter(writer);
+        return new SimpleSSTableMultiWriter(writer, txn);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
new file mode 100644
index 0000000..674ed7f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.service.StorageService;
+
+public class RangeAwareSSTableWriter implements SSTableMultiWriter
+{
+    private final List<PartitionPosition> boundaries;
+    private final Directories.DataDirectory[] directories;
+    private final int sstableLevel;
+    private final long estimatedKeys;
+    private final long repairedAt;
+    private final SSTableFormat.Type format;
+    private final SerializationHeader.Component header;
+    private final LifecycleTransaction txn;
+    private int currentIndex = -1;
+    public final ColumnFamilyStore cfs;
+    private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
+    private final List<SSTableReader> finishedReaders = new ArrayList<>();
+    private SSTableMultiWriter currentWriter = null;
+
+    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader.Component header) throws IOException
+    {
+        directories = cfs.getDirectories().getWriteableLocations();
+        this.sstableLevel = sstableLevel;
+        this.cfs = cfs;
+        this.estimatedKeys = estimatedKeys / directories.length;
+        this.repairedAt = repairedAt;
+        this.format = format;
+        this.txn = txn;
+        this.header = header;
+        boundaries = StorageService.getDiskBoundaries(cfs, directories);
+        if (boundaries == null)
+        {
+            Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
+            if (localDir == null)
+                throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn);
+        }
+    }
+
+    private void maybeSwitchWriter(DecoratedKey key)
+    {
+        if (boundaries == null)
+            return;
+
+        boolean switched = false;
+        while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0)
+        {
+            switched = true;
+            currentIndex++;
+        }
+
+        if (switched)
+        {
+            if (currentWriter != null)
+                finishedWriters.add(currentWriter);
+
+            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn);
+        }
+    }
+
+    public boolean append(UnfilteredRowIterator partition)
+    {
+        maybeSwitchWriter(partition.partitionKey());
+        return currentWriter.append(partition);
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        for (SSTableMultiWriter writer : finishedWriters)
+        {
+            if (writer.getFilePointer() > 0)
+                finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult));
+            else
+                SSTableMultiWriter.abortOrDie(writer);
+        }
+        return finishedReaders;
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(boolean openResult)
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        for (SSTableMultiWriter writer : finishedWriters)
+        {
+            if (writer.getFilePointer() > 0)
+                finishedReaders.addAll(writer.finish(openResult));
+            else
+                SSTableMultiWriter.abortOrDie(writer);
+        }
+        return finishedReaders;
+    }
+
+    @Override
+    public Collection<SSTableReader> finished()
+    {
+        return finishedReaders;
+    }
+
+    @Override
+    public SSTableMultiWriter setOpenResult(boolean openResult)
+    {
+        finishedWriters.forEach((w) -> w.setOpenResult(openResult));
+        currentWriter.setOpenResult(openResult);
+        return this;
+    }
+
+    public String getFilename()
+    {
+        return String.join("/", cfs.keyspace.getName(), cfs.getTableName());
+    }
+
+    @Override
+    public long getFilePointer()
+    {
+        return currentWriter.getFilePointer();
+    }
+
+    @Override
+    public UUID getCfId()
+    {
+        return currentWriter.getCfId();
+    }
+
+    @Override
+    public Throwable commit(Throwable accumulate)
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        for (SSTableMultiWriter writer : finishedWriters)
+            accumulate = writer.commit(accumulate);
+        return accumulate;
+    }
+
+    @Override
+    public Throwable abort(Throwable accumulate)
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        for (SSTableMultiWriter finishedWriter : finishedWriters)
+            accumulate = finishedWriter.abort(accumulate);
+
+        return accumulate;
+    }
+
+    @Override
+    public void prepareToCommit()
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        finishedWriters.forEach(SSTableMultiWriter::prepareToCommit);
+    }
+
+    @Override
+    public void close()
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        finishedWriters.forEach(SSTableMultiWriter::close);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 925efd6..a083218 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -24,7 +24,12 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
 {
     protected Directories.DataDirectory getWriteDirectory(long writeSize)
     {
-        Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize);
+        Directories.DataDirectory directory;
+        directory = getDirectory();
+
+        if (directory == null) // ok panic - write anywhere
+            directory = getDirectories().getWriteableLocation(writeSize);
+
         if (directory == null)
             throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
 
@@ -36,4 +41,14 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
      * @return Directories instance for the CF.
      */
     protected abstract Directories getDirectories();
+    protected abstract Directories.DataDirectory getDirectory();
+
+    /**
+     * Called if no disk is available with free space for the full write size.
+     * @return true if the scope of the task was successfully reduced.
+     */
+    public boolean reduceScopeForLimitedSpace()
+    {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 60ede18..24bebae 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.RangeStreamer;
 import org.apache.cassandra.dht.RingPosition;
+import org.apache.cassandra.dht.Splitter;
 import org.apache.cassandra.dht.StreamStateStore;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.Token.TokenFactory;
@@ -2625,6 +2626,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public int relocateSSTables(String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+        for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
+        {
+            CompactionManager.AllSSTableOpStatus oneStatus = cfs.relocateSSTables();
+            if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+                status = oneStatus;
+        }
+        return status.statusCode;
+    }
+
     /**
      * Takes the snapshot for the given keyspaces. A snapshot name must be specified.
      *
@@ -4459,4 +4472,61 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB));
     }
 
+    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories)
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+            return null;
+
+        Collection<Range<Token>> lr;
+
+        if (StorageService.instance.isBootstrapMode())
+        {
+            lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
+        }
+        else
+        {
+            // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
+            // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
+            // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
+            TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled();
+            lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress());
+        }
+
+        if (lr == null || lr.isEmpty())
+            return null;
+        List<Range<Token>> localRanges = Range.sort(lr);
+
+        return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories);
+    }
+
+    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs)
+    {
+        return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations());
+    }
+
+    /**
+     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
+     *
+     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
+     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
+     * etc.
+     *
+     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
+     *
+     * @param localRanges
+     * @param partitioner
+     * @param dataDirectories
+     * @return
+     */
+    public static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+    {
+        assert partitioner.splitter().isPresent();
+        Splitter splitter = partitioner.splitter().get();
+        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, DatabaseDescriptor.getNumTokens() > 1);
+        List<PartitionPosition> diskBoundaries = new ArrayList<>();
+        for (int i = 0; i < boundaries.size() - 1; i++)
+            diskBoundaries.add(boundaries.get(i).maxKeyBound());
+        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
+        return diskBoundaries;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 70691c7..eef34c0 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -248,6 +248,7 @@ public interface StorageServiceMBean extends NotificationEmitter
      */
     public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
 
+    public int relocateSSTables(String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException;
     /**
      * Trigger a cleanup of keys on a single keyspace
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 87dcda0..61eb13f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -35,9 +35,9 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -63,8 +63,6 @@ public class StreamReader
     protected final int sstableLevel;
     protected final SerializationHeader.Component header;
 
-    protected Descriptor desc;
-
     public StreamReader(FileMessageHeader header, StreamSession session)
     {
         this.session = session;
@@ -108,7 +106,7 @@ public class StreamReader
             {
                 writePartition(deserializer, writer);
                 // TODO move this to BytesReadTracker
-                session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+                session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
             return writer;
         }
@@ -132,10 +130,7 @@ public class StreamReader
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
 
-        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
-
-
-        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
+        return new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header);
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5355c3e..9078acc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -584,9 +584,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         receivers.get(message.header.cfId).received(message.sstable);
     }
 
-    public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
+    public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total)
     {
-        ProgressInfo progress = new ProgressInfo(peer, index, desc.filenameFor(Component.DATA), direction, bytes, total);
+        ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, total);
         streamResult.handleProgress(progress);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 106677c..ca35c0b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -102,7 +102,7 @@ public class StreamWriter
                     long lastBytesRead = write(file, validator, readOffset, length, bytesRead);
                     bytesRead += lastBytesRead;
                     progress += (lastBytesRead - readOffset);
-                    session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
                     readOffset = 0;
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4d10244..c123102 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,7 +96,7 @@ public class CompressedStreamReader extends StreamReader
                 {
                     writePartition(deserializer, writer);
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+                    session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                 }
             }
             return writer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index adbd091..93e0f76 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import com.google.common.base.Function;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -75,7 +76,7 @@ public class CompressedStreamWriter extends StreamWriter
                     long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
                     bytesTransferred += lastWrite;
                     progress += lastWrite;
-                    session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1078004..6f1c753 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -299,6 +299,11 @@ public class NodeProbe implements AutoCloseable
         ssProxy.forceKeyspaceCompaction(splitOutput, keyspaceName, tableNames);
     }
 
+    public void relocateSSTables(String keyspace, String[] cfnames) throws IOException, ExecutionException, InterruptedException
+    {
+        ssProxy.relocateSSTables(keyspace, cfnames);
+    }
+
     public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
     {
         ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 668c075..9728356 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -134,7 +134,8 @@ public class NodeTool
                 DisableHintsForDC.class,
                 EnableHintsForDC.class,
                 FailureDetectorInfo.class,
-                RefreshSizeEstimates.class
+                RefreshSizeEstimates.class,
+                RelocateSSTables.class
         );
 
         Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index b27b07a..b62512a 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -17,18 +17,20 @@
  */
 package org.apache.cassandra.tools;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -96,7 +98,7 @@ public class SSTableOfflineRelevel
         Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
         Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
-        Set<SSTableReader> sstables = new HashSet<>();
+        SetMultimap<File, SSTableReader> sstableMultimap = HashMultimap.create();
         for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
         {
             if (sstable.getKey() != null)
@@ -104,7 +106,7 @@ public class SSTableOfflineRelevel
                 try
                 {
                     SSTableReader reader = SSTableReader.open(sstable.getKey());
-                    sstables.add(reader);
+                    sstableMultimap.put(reader.descriptor.directory, reader);
                 }
                 catch (Throwable t)
                 {
@@ -113,13 +115,20 @@ public class SSTableOfflineRelevel
                 }
             }
         }
-        if (sstables.isEmpty())
+        if (sstableMultimap.isEmpty())
         {
             out.println("No sstables to relevel for "+keyspace+"."+columnfamily);
             System.exit(1);
         }
-        Relevel rl = new Relevel(sstables);
-        rl.relevel(dryRun);
+        for (File directory : sstableMultimap.keySet())
+        {
+            if (!sstableMultimap.get(directory).isEmpty())
+            {
+                Relevel rl = new Relevel(sstableMultimap.get(directory));
+                out.println("For sstables in " + directory + ":");
+                rl.relevel(dryRun);
+            }
+        }
         System.exit(0);
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
new file mode 100644
index 0000000..8522bc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+@Command(name = "relocatesstables", description = "Relocates sstables to the correct disk")
+public class RelocateSSTables extends NodeTool.NodeToolCmd
+{
+    @Arguments(usage = "<keyspace> <table>", description = "The keyspace and table name")
+    private List<String> args = new ArrayList<>();
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        List<String> keyspaces = parseOptionalKeyspace(args, probe);
+        String[] cfnames = parseOptionalTables(args);
+        try
+        {
+            for (String keyspace : keyspaces)
+                probe.relocateSSTables(keyspace, cfnames);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Got error while relocating", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 96ee072..0f05524 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -59,8 +59,8 @@ public class LongLeveledCompactionStrategyTest
         Keyspace keyspace = Keyspace.open(ksname);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname);
         store.disableAutoCompaction();
-
-        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy)store.getCompactionStrategyManager().getStrategies().get(1);
+        CompactionStrategyManager mgr = store.getCompactionStrategyManager();
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) mgr.getStrategies().get(1).get(0);
 
         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 27b774d..824c533 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -633,14 +633,14 @@ public class ScrubTest
     {
         SerializationHeader header = new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS);
         MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(0);
-        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn));
+        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn), txn);
     }
 
     private static class TestMultiWriter extends SimpleSSTableMultiWriter
     {
-        TestMultiWriter(SSTableWriter writer)
+        TestMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
         {
-            super(writer);
+            super(writer, txn);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 3c0098b..7fee251 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.junit.Test;
@@ -215,9 +216,9 @@ public class CompactionsCQLTest extends CQLTester
     public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends AbstractCompactionStrategy> expected)
     {
         boolean found = false;
-        for (AbstractCompactionStrategy actualStrategy : manager.getStrategies())
+        for (List<AbstractCompactionStrategy> strategies : manager.getStrategies())
         {
-            if (!actualStrategy.getClass().equals(expected))
+            if (!strategies.stream().allMatch((strategy) -> strategy.getClass().equals(expected)))
                 return false;
             found = true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index f2ddb00..1676896 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -122,10 +123,11 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
+        CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
         // Checking we're not completely bad at math
-        int l1Count = strategy.getSSTableCountPerLevel()[1];
-        int l2Count = strategy.getSSTableCountPerLevel()[2];
+
+        int l1Count = strategyManager.getSSTableCountPerLevel()[1];
+        int l2Count = strategyManager.getSSTableCountPerLevel()[2];
         if (l1Count == 0 || l2Count == 0)
         {
             logger.error("L1 or L2 has 0 sstables. Expected > 0 on both.");
@@ -177,10 +179,10 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
+        CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
         // Checking we're not completely bad at math
-        assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
-        assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
+        assertTrue(strategyManager.getSSTableCountPerLevel()[1] > 0);
+        assertTrue(strategyManager.getSSTableCountPerLevel()[2] > 0);
 
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
@@ -196,7 +198,7 @@ public class LeveledCompactionStrategyTest
      */
     private void waitForLeveling(ColumnFamilyStore cfs) throws InterruptedException
     {
-        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
+        CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
         // L0 is the lowest priority, so when that's done, we know everything is done
         while (strategy.getSSTableCountPerLevel()[0] > 1)
             Thread.sleep(100);
@@ -224,7 +226,7 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1);
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
         assert strategy.getLevelSize(1) > 0;
 
         // get LeveledScanner for level 1 sstables
@@ -260,7 +262,7 @@ public class LeveledCompactionStrategyTest
             cfs.forceBlockingFlush();
         }
         cfs.forceBlockingFlush();
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1);
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
         cfs.forceMajorCompaction();
 
         for (SSTableReader s : cfs.getLiveSSTables())
@@ -306,14 +308,14 @@ public class LeveledCompactionStrategyTest
         while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
             Thread.sleep(100);
 
-        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
-        List<AbstractCompactionStrategy> strategies = strategy.getStrategies();
-        LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0);
-        LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1);
+        CompactionStrategyManager manager = cfs.getCompactionStrategyManager();
+        List<List<AbstractCompactionStrategy>> strategies = manager.getStrategies();
+        LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0).get(0);
+        LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1).get(0);
         assertEquals(0, repaired.manifest.getLevelCount() );
         assertEquals(2, unrepaired.manifest.getLevelCount());
-        assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
-        assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
+        assertTrue(manager.getSSTableCountPerLevel()[1] > 0);
+        assertTrue(manager.getSSTableCountPerLevel()[2] > 0);
 
         for (SSTableReader sstable : cfs.getLiveSSTables())
             assertFalse(sstable.isRepaired());
@@ -331,7 +333,7 @@ public class LeveledCompactionStrategyTest
         sstable1.reloadSSTableMetadata();
         assertTrue(sstable1.isRepaired());
 
-        strategy.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
+        manager.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
 
         int repairedSSTableCount = 0;
         for (List<SSTableReader> level : repaired.manifest.generations)
@@ -343,7 +345,7 @@ public class LeveledCompactionStrategyTest
         assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
 
         unrepaired.removeSSTable(sstable2);
-        strategy.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable2)), this);
+        manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this);
         assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
         assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 7b9b19c..4f49389 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -297,7 +297,7 @@ public class TrackerTest
         Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
 
         SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
-        tracker.replaceFlushed(prev2, Collections.singleton(reader));
+        tracker.replaceFlushed(prev2, singleton(reader));
         Assert.assertEquals(1, tracker.getView().sstables.size());
         Assert.assertEquals(1, listener.received.size());
         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
@@ -314,13 +314,13 @@ public class TrackerTest
         tracker.markFlushing(prev1);
         reader = MockSchema.sstable(0, 10, true, cfs);
         cfs.invalidate(false);
-        tracker.replaceFlushed(prev1, Collections.singleton(reader));
+        tracker.replaceFlushed(prev1, singleton(reader));
         Assert.assertEquals(0, tracker.getView().sstables.size());
         Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
         Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
         Assert.assertEquals(3, listener.received.size());
         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
-        Assert.assertTrue(listener.received.get(1) instanceof  SSTableDeletingNotification);
+        Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
         Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 523c203..e787cc4 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import static com.google.common.collect.ImmutableSet.copyOf;
 import static com.google.common.collect.ImmutableSet.of;
 import static com.google.common.collect.Iterables.concat;
+import static java.util.Collections.singleton;
 import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
 
 public class ViewTest
@@ -195,7 +196,7 @@ public class ViewTest
         Assert.assertEquals(memtable3, cur.getCurrentMemtable());
 
         SSTableReader sstable = MockSchema.sstable(1, cfs);
-        cur = View.replaceFlushed(memtable1, Collections.singleton(sstable)).apply(cur);
+        cur = View.replaceFlushed(memtable1, singleton(sstable)).apply(cur);
         Assert.assertEquals(0, cur.flushingMemtables.size());
         Assert.assertEquals(1, cur.liveMemtables.size());
         Assert.assertEquals(memtable3, cur.getCurrentMemtable());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
index 9cefbf2..e2202fe 100644
--- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
+++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
@@ -61,6 +61,12 @@ public class LengthPartitioner implements IPartitioner
         return MINIMUM;
     }
 
+    @Override
+    public Token getMaximumToken()
+    {
+        return null;
+    }
+
     public BigIntegerToken getRandomToken()
     {
         return new BigIntegerToken(BigInteger.valueOf(new Random().nextInt(15)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/dht/SplitterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java
new file mode 100644
index 0000000..751a7d7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SplitterTest
+{
+
+    @Test
+    public void randomSplitTestNoVNodesRandomPartitioner()
+    {
+        randomSplitTestNoVNodes(new RandomPartitioner());
+    }
+
+    @Test
+    public void randomSplitTestNoVNodesMurmur3Partitioner()
+    {
+        randomSplitTestNoVNodes(new Murmur3Partitioner());
+    }
+
+    @Test
+    public void randomSplitTestVNodesRandomPartitioner()
+    {
+        randomSplitTestVNodes(new RandomPartitioner());
+    }
+    @Test
+    public void randomSplitTestVNodesMurmur3Partitioner()
+    {
+        randomSplitTestVNodes(new Murmur3Partitioner());
+    }
+
+    public void randomSplitTestNoVNodes(IPartitioner partitioner)
+    {
+        Splitter splitter = partitioner.splitter().get();
+        Random r = new Random();
+        for (int i = 0; i < 10000; i++)
+        {
+            List<Range<Token>> localRanges = generateLocalRanges(1, r.nextInt(4)+1, splitter, r, partitioner instanceof RandomPartitioner);
+            List<Token> boundaries = splitter.splitOwnedRanges(r.nextInt(9) + 1, localRanges, false);
+            assertTrue("boundaries = "+boundaries+" ranges = "+localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true));
+        }
+    }
+
+    public void randomSplitTestVNodes(IPartitioner partitioner)
+    {
+        Splitter splitter = partitioner.splitter().get();
+        Random r = new Random();
+        for (int i = 0; i < 10000; i++)
+        {
+            // we need many tokens to be able to split evenly over the disks
+            int numTokens = 172 + r.nextInt(128);
+            int rf = r.nextInt(4) + 2;
+            int parts = r.nextInt(5)+1;
+            List<Range<Token>> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner);
+            List<Token> boundaries = splitter.splitOwnedRanges(parts, localRanges, true);
+            if (!assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, false))
+                fail(String.format("Could not split %d tokens with rf=%d into %d parts (localRanges=%s, boundaries=%s)", numTokens, rf, parts, localRanges, boundaries));
+        }
+    }
+
+    private boolean assertRangeSizeEqual(List<Range<Token>> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
+    {
+        Token start = partitioner.getMinimumToken();
+        List<BigInteger> splits = new ArrayList<>();
+
+        for (int i = 0; i < tokens.size(); i++)
+        {
+            Token end = i == tokens.size() - 1 ? partitioner.getMaximumToken() : tokens.get(i);
+            splits.add(sumOwnedBetween(localRanges, start, end, splitter, splitIndividualRanges));
+            start = end;
+        }
+        // when we dont need to keep around full ranges, the difference is small between the partitions
+        BigDecimal delta = splitIndividualRanges ? BigDecimal.valueOf(0.001) : BigDecimal.valueOf(0.2);
+        boolean allBalanced = true;
+        for (BigInteger b : splits)
+        {
+            for (BigInteger i : splits)
+            {
+                BigDecimal bdb = new BigDecimal(b);
+                BigDecimal bdi = new BigDecimal(i);
+                BigDecimal q = bdb.divide(bdi, 2, BigDecimal.ROUND_HALF_DOWN);
+                if (q.compareTo(BigDecimal.ONE.add(delta)) > 0 || q.compareTo(BigDecimal.ONE.subtract(delta)) < 0)
+                    allBalanced = false;
+            }
+        }
+        return allBalanced;
+    }
+
+    private BigInteger sumOwnedBetween(List<Range<Token>> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
+    {
+        BigInteger sum = BigInteger.ZERO;
+        for (Range<Token> range : localRanges)
+        {
+            if (splitIndividualRanges)
+            {
+                Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range);
+                for (Range<Token> intersection : intersections)
+                    sum = sum.add(splitter.valueForToken(intersection.right).subtract(splitter.valueForToken(intersection.left)));
+            }
+            else
+            {
+                if (new Range<>(start, end).contains(range.left))
+                    sum = sum.add(splitter.valueForToken(range.right).subtract(splitter.valueForToken(range.left)));
+            }
+        }
+        return sum;
+    }
+
+    private List<Range<Token>> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
+    {
+        int localTokens = numTokens * rf;
+        List<Token> randomTokens = new ArrayList<>();
+
+        for (int i = 0; i < localTokens * 2; i++)
+        {
+            Token t = splitter.tokenForValue(randomPartitioner ? new BigInteger(127, r) : BigInteger.valueOf(r.nextLong()));
+            randomTokens.add(t);
+        }
+
+        Collections.sort(randomTokens);
+
+        List<Range<Token>> localRanges = new ArrayList<>(localTokens);
+        for (int i = 0; i < randomTokens.size() - 1; i++)
+        {
+            assert randomTokens.get(i).compareTo(randomTokens.get(i+1)) < 0;
+            localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i+1)));
+            i++;
+        }
+        return localRanges;
+    }
+}


Mime
View raw message