Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AAAE318704 for ; Tue, 5 Jan 2016 15:56:48 +0000 (UTC) Received: (qmail 53249 invoked by uid 500); 5 Jan 2016 15:56:48 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 53214 invoked by uid 500); 5 Jan 2016 15:56:48 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 53193 invoked by uid 99); 5 Jan 2016 15:56:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jan 2016 15:56:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 32EC9DFFF7; Tue, 5 Jan 2016 15:56:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Date: Tue, 05 Jan 2016 15:56:48 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] cassandra git commit: Make sure the same token does not exist in several data directories Repository: cassandra Updated Branches: refs/heads/cassandra-3.3 acc5b63cc -> ea4f64977 http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 8b4351f..5e78834 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.compaction.writers; +import java.util.List; import java.util.Set; import org.slf4j.Logger; @@ -39,16 +40,18 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; public class DefaultCompactionWriter extends CompactionAwareWriter { protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class); + private final int sstableLevel; public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set nonExpiredSSTables) { - this(cfs, directories, txn, nonExpiredSSTables, false, false); + this(cfs, directories, txn, nonExpiredSSTables, false, false, 0); } @SuppressWarnings("resource") - public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set nonExpiredSSTables, boolean offline, boolean keepOriginals) + public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set nonExpiredSSTables, boolean offline, boolean keepOriginals, int sstableLevel) { super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals); + this.sstableLevel = sstableLevel; } @Override @@ -58,14 +61,14 @@ public class DefaultCompactionWriter extends CompactionAwareWriter } @Override - protected void switchCompactionLocation(Directories.DataDirectory directory) + public void switchCompactionLocation(Directories.DataDirectory directory) { @SuppressWarnings("resource") SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))), estimatedTotalKeys, minRepairedAt, cfs.metadata, - new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0), + new MetadataCollector(txn.originals(), cfs.metadata.comparator, sstableLevel), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), cfs.indexManager.listIndexes(), txn); @@ -73,6 +76,12 @@ public class DefaultCompactionWriter extends CompactionAwareWriter } @Override + public List finish(long repairedAt) + { + return sstableWriter.setRepairedAt(repairedAt).finish(); + } + + @Override public long estimatedKeys() { return estimatedTotalKeys; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index b0c4562..0c88ac6 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -17,12 +17,9 @@ */ package org.apache.cassandra.db.compaction.writers; -import java.io.File; +import java.util.List; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.RowIndexEntry; @@ -37,15 +34,14 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; public class MajorLeveledCompactionWriter extends CompactionAwareWriter { - private static final Logger logger = LoggerFactory.getLogger(MajorLeveledCompactionWriter.class); private final long maxSSTableSize; - private final long expectedWriteSize; - private final Set allSSTables; private int currentLevel = 1; private long averageEstimatedKeysPerSSTable; private long partitionsWritten = 0; private long totalWrittenInLevel = 0; private int sstablesWritten = 0; + private final long keysPerSSTable; + private Directories.DataDirectory sstableDirectory; public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, Directories directories, @@ -67,8 +63,8 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter { super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals); this.maxSSTableSize = maxSSTableSize; - this.allSSTables = txn.originals(); - expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())); + long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize); + keysPerSSTable = estimatedTotalKeys / estimatedSSTables; } @Override @@ -86,28 +82,33 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter totalWrittenInLevel = 0; currentLevel++; } - - averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1)); - switchCompactionLocation(getWriteDirectory(expectedWriteSize)); - partitionsWritten = 0; - sstablesWritten++; + switchCompactionLocation(sstableDirectory); } return rie != null; } - public void switchCompactionLocation(Directories.DataDirectory directory) + @Override + public void switchCompactionLocation(Directories.DataDirectory location) + { + this.sstableDirectory = location; + averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1)); + sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))), + keysPerSSTable, + minRepairedAt, + cfs.metadata, + new MetadataCollector(txn.originals(), cfs.metadata.comparator, currentLevel), + SerializationHeader.make(cfs.metadata, txn.originals()), + cfs.indexManager.listIndexes(), + txn)); + partitionsWritten = 0; + sstablesWritten = 0; + + } + + @Override + public List finish(long repairedAt) { - File sstableDirectory = getDirectories().getLocationForDisk(directory); - @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), - averageEstimatedKeysPerSSTable, - minRepairedAt, - cfs.metadata, - new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables), - cfs.indexManager.listIndexes(), - txn); - sstableWriter.switchWriter(writer); + return sstableWriter.setRepairedAt(repairedAt).finish(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 1dc72e7..ac83cc6 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.db.compaction.writers; +import java.io.File; +import java.util.List; import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; @@ -33,11 +35,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; public class MaxSSTableSizeWriter extends CompactionAwareWriter { private final long estimatedTotalKeys; - private final long expectedWriteSize; private final long maxSSTableSize; private final int level; private final long estimatedSSTables; private final Set allSSTables; + private Directories.DataDirectory sstableDirectory; public MaxSSTableSizeWriter(ColumnFamilyStore cfs, Directories directories, @@ -63,25 +65,25 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter this.allSSTables = txn.originals(); this.level = level; this.maxSSTableSize = maxSSTableSize; - long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); - expectedWriteSize = Math.min(maxSSTableSize, totalSize); estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables); estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize); } - @Override - public boolean realAppend(UnfilteredRowIterator partition) + protected boolean realAppend(UnfilteredRowIterator partition) { RowIndexEntry rie = sstableWriter.append(partition); if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize) - switchCompactionLocation(getWriteDirectory(expectedWriteSize)); + { + switchCompactionLocation(sstableDirectory); + } return rie != null; } + @Override public void switchCompactionLocation(Directories.DataDirectory location) { - @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))), + sstableDirectory = location; + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))), estimatedTotalKeys / estimatedSSTables, minRepairedAt, cfs.metadata, @@ -91,7 +93,11 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter txn); sstableWriter.switchWriter(writer); + } + public List finish(long repairedAt) + { + return sstableWriter.setRepairedAt(repairedAt).finish(); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 3a7f526..46183dc 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -17,8 +17,8 @@ */ package org.apache.cassandra.db.compaction.writers; -import java.io.File; import java.util.Arrays; +import java.util.List; import java.util.Set; import org.slf4j.Logger; @@ -51,6 +51,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter private final Set allSSTables; private long currentBytesToWrite; private int currentRatioIndex = 0; + private Directories.DataDirectory location; public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set nonExpiredSSTables) { @@ -82,10 +83,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter } } ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex); - long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]); currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); - switchCompactionLocation(getWriteDirectory(currentBytesToWrite)); - logger.trace("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite); } @Override @@ -96,15 +94,17 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter { currentRatioIndex++; currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); - switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex]))); + switchCompactionLocation(location); + logger.debug("Switching writer, currentBytesToWrite = {}", currentBytesToWrite); } return rie != null; } + @Override public void switchCompactionLocation(Directories.DataDirectory location) { + this.location = location; long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys); - @SuppressWarnings("resource") SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))), currentPartitionsToWrite, minRepairedAt, @@ -115,6 +115,11 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter txn); logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite); sstableWriter.switchWriter(writer); + } + @Override + public List finish(long repairedAt) + { + return sstableWriter.setRepairedAt(repairedAt).finish(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index c09d49c..4c73472 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -327,10 +327,10 @@ public class Tracker apply(View.markFlushing(memtable)); } - public void replaceFlushed(Memtable memtable, Collection sstables) + public void replaceFlushed(Memtable memtable, Iterable sstables) { assert !isDummy(); - if (sstables == null || sstables.isEmpty()) + if (sstables == null || Iterables.isEmpty(sstables)) { // sstable may be null if we flushed batchlog and nothing needed to be retained // if it's null, we don't care what state the cfstore is in, we just replace it and continue http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index b62c7e3..63926ed 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -310,7 +310,7 @@ public class View } // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set - static Function replaceFlushed(final Memtable memtable, final Collection flushed) + static Function replaceFlushed(final Memtable memtable, final Iterable flushed) { return new Function() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/IPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java index e0a08dc..b559a6f 100644 --- a/src/java/org/apache/cassandra/dht/IPartitioner.java +++ b/src/java/org/apache/cassandra/dht/IPartitioner.java @@ -20,6 +20,7 @@ package org.apache.cassandra.dht; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; @@ -49,6 +50,17 @@ public interface IPartitioner public Token getMinimumToken(); /** + * The biggest token for this partitioner, unlike getMinimumToken, this token is actually used and users wanting to + * include all tokens need to do getMaximumToken().maxKeyBound() + * + * Not implemented for the ordered partitioners + */ + default Token getMaximumToken() + { + throw new UnsupportedOperationException("If you are using a splitting partitioner, getMaximumToken has to be implemented"); + } + + /** * @return a Token that can be used to route a given key * (This is NOT a method to create a Token from its string representation; * for that, use TokenFactory.fromString.) @@ -84,4 +96,9 @@ public interface IPartitioner * Used by secondary indices. */ public AbstractType partitionOrdering(); + + default Optional splitter() + { + return Optional.empty(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java index d68be3f..f9f6113 100644 --- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java +++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java @@ -48,6 +48,19 @@ public class Murmur3Partitioner implements IPartitioner public static final Murmur3Partitioner instance = new Murmur3Partitioner(); public static final AbstractType partitionOrdering = new PartitionerDefinedOrder(instance); + private final Splitter splitter = new Splitter(this) + { + public Token tokenForValue(BigInteger value) + { + return new LongToken(value.longValue()); + } + + public BigInteger valueForToken(Token token) + { + return BigInteger.valueOf(((LongToken) token).token); + } + }; + public DecoratedKey decorateKey(ByteBuffer key) { long[] hash = getHash(key); @@ -291,8 +304,18 @@ public class Murmur3Partitioner implements IPartitioner return LongType.instance; } + public Token getMaximumToken() + { + return new LongToken(Long.MAX_VALUE); + } + public AbstractType partitionOrdering() { return partitionOrdering; } + + public Optional splitter() + { + return Optional.of(splitter); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/RandomPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index b0dea01..96a96ca 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -50,6 +50,19 @@ public class RandomPartitioner implements IPartitioner public static final RandomPartitioner instance = new RandomPartitioner(); public static final AbstractType partitionOrdering = new PartitionerDefinedOrder(instance); + private final Splitter splitter = new Splitter(this) + { + public Token tokenForValue(BigInteger value) + { + return new BigIntegerToken(value); + } + + public BigInteger valueForToken(Token token) + { + return ((BigIntegerToken)token).getTokenValue(); + } + }; + public DecoratedKey decorateKey(ByteBuffer key) { return new CachedHashDecoratedKey(getToken(key), key); @@ -194,6 +207,11 @@ public class RandomPartitioner implements IPartitioner return ownerships; } + public Token getMaximumToken() + { + return new BigIntegerToken(MAXIMUM); + } + public AbstractType getTokenValidator() { return IntegerType.instance; @@ -203,4 +221,10 @@ public class RandomPartitioner implements IPartitioner { return partitionOrdering; } + + public Optional splitter() + { + return Optional.of(splitter); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 1fc6c46..3cc3b23 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -473,6 +473,23 @@ public class Range> extends AbstractBounds implemen return new Range(left, newRight); } + public static > List> sort(Collection> ranges) + { + List> output = new ArrayList<>(ranges.size()); + for (Range r : ranges) + output.addAll(r.unwrap()); + // sort by left + Collections.sort(output, new Comparator>() + { + public int compare(Range b1, Range b2) + { + return b1.left.compareTo(b2.left); + } + }); + return output; + } + + /** * Compute a range of keys corresponding to a given range of token. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Splitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java new file mode 100644 index 0000000..67b578d --- /dev/null +++ b/src/java/org/apache/cassandra/dht/Splitter.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Partition splitter. + */ +public abstract class Splitter +{ + private final IPartitioner partitioner; + + protected Splitter(IPartitioner partitioner) + { + this.partitioner = partitioner; + } + + protected abstract Token tokenForValue(BigInteger value); + + protected abstract BigInteger valueForToken(Token token); + + public List splitOwnedRanges(int parts, List> localRanges, boolean dontSplitRanges) + { + if (localRanges.isEmpty() || parts == 1) + return Collections.singletonList(partitioner.getMaximumToken()); + + BigInteger totalTokens = BigInteger.ZERO; + for (Range r : localRanges) + { + BigInteger right = valueForToken(token(r.right)); + totalTokens = totalTokens.add(right.subtract(valueForToken(r.left))); + } + BigInteger perPart = totalTokens.divide(BigInteger.valueOf(parts)); + + if (dontSplitRanges) + return splitOwnedRangesNoPartialRanges(localRanges, perPart, parts); + + List boundaries = new ArrayList<>(); + BigInteger sum = BigInteger.ZERO; + for (Range r : localRanges) + { + Token right = token(r.right); + BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left)).abs(); + BigInteger left = valueForToken(r.left); + while (sum.add(currentRangeWidth).compareTo(perPart) >= 0) + { + BigInteger withinRangeBoundary = perPart.subtract(sum); + left = left.add(withinRangeBoundary); + boundaries.add(tokenForValue(left)); + currentRangeWidth = currentRangeWidth.subtract(withinRangeBoundary); + sum = BigInteger.ZERO; + } + sum = sum.add(currentRangeWidth); + } + boundaries.set(boundaries.size() - 1, partitioner.getMaximumToken()); + + assert boundaries.size() == parts : boundaries.size() +"!="+parts+" "+boundaries+":"+localRanges; + return boundaries; + } + + private List splitOwnedRangesNoPartialRanges(List> localRanges, BigInteger perPart, int parts) + { + List boundaries = new ArrayList<>(parts); + BigInteger sum = BigInteger.ZERO; + int i = 0; + while (boundaries.size() < parts - 1) + { + Range r = localRanges.get(i); + Range nextRange = localRanges.get(i + 1); + Token right = token(r.right); + Token nextRight = token(nextRange.right); + + BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left)); + BigInteger nextRangeWidth = valueForToken(nextRight).subtract(valueForToken(nextRange.left)); + sum = sum.add(currentRangeWidth); + // does this or next range take us beyond the per part limit? + if (sum.compareTo(perPart) > 0 || sum.add(nextRangeWidth).compareTo(perPart) > 0) + { + // Either this or the next range will take us beyond the perPart limit. Will stopping now or + // adding the next range create the smallest difference to perPart? + BigInteger diffCurrent = sum.subtract(perPart).abs(); + BigInteger diffNext = sum.add(nextRangeWidth).subtract(perPart).abs(); + if (diffNext.compareTo(diffCurrent) >= 0) + { + sum = BigInteger.ZERO; + boundaries.add(right); + } + } + i++; + } + boundaries.add(partitioner.getMaximumToken()); + return boundaries; + } + + /** + * We avoid calculating for wrap around ranges, instead we use the actual max token, and then, when translating + * to PartitionPositions, we include tokens from .minKeyBound to .maxKeyBound to make sure we include all tokens. + */ + private Token token(Token t) + { + return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumToken() : t; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index 68dbd74..2217ae2 100644 --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -35,9 +35,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; public class SimpleSSTableMultiWriter implements SSTableMultiWriter { private final SSTableWriter writer; + private final LifecycleTransaction txn; - protected SimpleSSTableMultiWriter(SSTableWriter writer) + protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleTransaction txn) { + this.txn = txn; this.writer = writer; } @@ -90,6 +92,7 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter public Throwable abort(Throwable accumulate) { + txn.untrackNew(writer); return writer.abort(accumulate); } @@ -114,6 +117,6 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter LifecycleTransaction txn) { SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn); - return new SimpleSSTableMultiWriter(writer); + return new SimpleSSTableMultiWriter(writer, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java new file mode 100644 index 0000000..674ed7f --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.service.StorageService; + +public class RangeAwareSSTableWriter implements SSTableMultiWriter +{ + private final List boundaries; + private final Directories.DataDirectory[] directories; + private final int sstableLevel; + private final long estimatedKeys; + private final long repairedAt; + private final SSTableFormat.Type format; + private final SerializationHeader.Component header; + private final LifecycleTransaction txn; + private int currentIndex = -1; + public final ColumnFamilyStore cfs; + private final List finishedWriters = new ArrayList<>(); + private final List finishedReaders = new ArrayList<>(); + private SSTableMultiWriter currentWriter = null; + + public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader.Component header) throws IOException + { + directories = cfs.getDirectories().getWriteableLocations(); + this.sstableLevel = sstableLevel; + this.cfs = cfs; + this.estimatedKeys = estimatedKeys / directories.length; + this.repairedAt = repairedAt; + this.format = format; + this.txn = txn; + this.header = header; + boundaries = StorageService.getDiskBoundaries(cfs, directories); + if (boundaries == null) + { + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); + if (localDir == null) + throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn); + } + } + + private void maybeSwitchWriter(DecoratedKey key) + { + if (boundaries == null) + return; + + boolean switched = false; + while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0) + { + switched = true; + currentIndex++; + } + + if (switched) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn); + } + } + + public boolean append(UnfilteredRowIterator partition) + { + maybeSwitchWriter(partition.partitionKey()); + return currentWriter.append(partition); + } + + @Override + public Collection finish(long repairedAt, long maxDataAge, boolean openResult) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + { + if (writer.getFilePointer() > 0) + finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult)); + else + SSTableMultiWriter.abortOrDie(writer); + } + return finishedReaders; + } + + @Override + public Collection finish(boolean openResult) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + { + if (writer.getFilePointer() > 0) + finishedReaders.addAll(writer.finish(openResult)); + else + SSTableMultiWriter.abortOrDie(writer); + } + return finishedReaders; + } + + @Override + public Collection finished() + { + return finishedReaders; + } + + @Override + public SSTableMultiWriter setOpenResult(boolean openResult) + { + finishedWriters.forEach((w) -> w.setOpenResult(openResult)); + currentWriter.setOpenResult(openResult); + return this; + } + + public String getFilename() + { + return String.join("/", cfs.keyspace.getName(), cfs.getTableName()); + } + + @Override + public long getFilePointer() + { + return currentWriter.getFilePointer(); + } + + @Override + public UUID getCfId() + { + return currentWriter.getCfId(); + } + + @Override + public Throwable commit(Throwable accumulate) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter writer : finishedWriters) + accumulate = writer.commit(accumulate); + return accumulate; + } + + @Override + public Throwable abort(Throwable accumulate) + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + for (SSTableMultiWriter finishedWriter : finishedWriters) + accumulate = finishedWriter.abort(accumulate); + + return accumulate; + } + + @Override + public void prepareToCommit() + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + finishedWriters.forEach(SSTableMultiWriter::prepareToCommit); + } + + @Override + public void close() + { + if (currentWriter != null) + finishedWriters.add(currentWriter); + currentWriter = null; + finishedWriters.forEach(SSTableMultiWriter::close); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java index 925efd6..a083218 100644 --- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java +++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java @@ -24,7 +24,12 @@ public abstract class DiskAwareRunnable extends WrappedRunnable { protected Directories.DataDirectory getWriteDirectory(long writeSize) { - Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize); + Directories.DataDirectory directory; + directory = getDirectory(); + + if (directory == null) // ok panic - write anywhere + directory = getDirectories().getWriteableLocation(writeSize); + if (directory == null) throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); @@ -36,4 +41,14 @@ public abstract class DiskAwareRunnable extends WrappedRunnable * @return Directories instance for the CF. */ protected abstract Directories getDirectories(); + protected abstract Directories.DataDirectory getDirectory(); + + /** + * Called if no disk is available with free space for the full write size. + * @return true if the scope of the task was successfully reduced. + */ + public boolean reduceScopeForLimitedSpace() + { + return false; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 60ede18..24bebae 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -64,6 +64,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.RangeStreamer; import org.apache.cassandra.dht.RingPosition; +import org.apache.cassandra.dht.Splitter; import org.apache.cassandra.dht.StreamStateStore; import org.apache.cassandra.dht.Token; import org.apache.cassandra.dht.Token.TokenFactory; @@ -2625,6 +2626,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } + public int relocateSSTables(String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; + for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) + { + CompactionManager.AllSSTableOpStatus oneStatus = cfs.relocateSSTables(); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status.statusCode; + } + /** * Takes the snapshot for the given keyspaces. A snapshot name must be specified. * @@ -4459,4 +4472,61 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB)); } + public static List getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories) + { + if (!cfs.getPartitioner().splitter().isPresent()) + return null; + + Collection> lr; + + if (StorageService.instance.isBootstrapMode()) + { + lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress()); + } + else + { + // Reason we use use the future settled TMD is that if we decommission a node, we want to stream + // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. + // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled + TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled(); + lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress()); + } + + if (lr == null || lr.isEmpty()) + return null; + List> localRanges = Range.sort(lr); + + return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories); + } + + public static List getDiskBoundaries(ColumnFamilyStore cfs) + { + return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations()); + } + + /** + * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not. + * + * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to + * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk + * etc. + * + * The final entry in the returned list will always be the partitioner maximum tokens upper key bound + * + * @param localRanges + * @param partitioner + * @param dataDirectories + * @return + */ + public static List getDiskBoundaries(List> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) + { + assert partitioner.splitter().isPresent(); + Splitter splitter = partitioner.splitter().get(); + List boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, DatabaseDescriptor.getNumTokens() > 1); + List diskBoundaries = new ArrayList<>(); + for (int i = 0; i < boundaries.size() - 1; i++) + diskBoundaries.add(boundaries.get(i).maxKeyBound()); + diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound()); + return diskBoundaries; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 70691c7..eef34c0 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -248,6 +248,7 @@ public interface StorageServiceMBean extends NotificationEmitter */ public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public int relocateSSTables(String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException; /** * Trigger a cleanup of keys on a single keyspace */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 87dcda0..61eb13f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -35,9 +35,9 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputPlus; @@ -63,8 +63,6 @@ public class StreamReader protected final int sstableLevel; protected final SerializationHeader.Component header; - protected Descriptor desc; - public StreamReader(FileMessageHeader header, StreamSession session) { this.session = session; @@ -108,7 +106,7 @@ public class StreamReader { writePartition(deserializer, writer); // TODO move this to BytesReadTracker - session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); + session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } return writer; } @@ -132,10 +130,7 @@ public class StreamReader if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); - - - return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId)); + return new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header); } protected void drain(InputStream dis, long bytesRead) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 5355c3e..9078acc 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -584,9 +584,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber receivers.get(message.header.cfId).received(message.sstable); } - public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total) + public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total) { - ProgressInfo progress = new ProgressInfo(peer, index, desc.filenameFor(Component.DATA), direction, bytes, total); + ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, total); streamResult.handleProgress(progress); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java index 106677c..ca35c0b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java @@ -102,7 +102,7 @@ public class StreamWriter long lastBytesRead = write(file, validator, readOffset, length, bytesRead); bytesRead += lastBytesRead; progress += (lastBytesRead - readOffset); - session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize); + session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); readOffset = 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 4d10244..c123102 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel; import com.google.common.base.Throwables; import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +96,7 @@ public class CompressedStreamReader extends StreamReader { writePartition(deserializer, writer); // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred - session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); + session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } } return writer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index adbd091..93e0f76 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@ -26,6 +26,7 @@ import java.util.List; import com.google.common.base.Function; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataOutputStreamPlus; @@ -75,7 +76,7 @@ public class CompressedStreamWriter extends StreamWriter long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc)); bytesTransferred += lastWrite; progress += lastWrite; - session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize); + session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 1078004..6f1c753 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -299,6 +299,11 @@ public class NodeProbe implements AutoCloseable ssProxy.forceKeyspaceCompaction(splitOutput, keyspaceName, tableNames); } + public void relocateSSTables(String keyspace, String[] cfnames) throws IOException, ExecutionException, InterruptedException + { + ssProxy.relocateSSTables(keyspace, cfnames); + } + public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { ssProxy.forceKeyspaceFlush(keyspaceName, tableNames); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 668c075..9728356 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -134,7 +134,8 @@ public class NodeTool DisableHintsForDC.class, EnableHintsForDC.class, FailureDetectorInfo.class, - RefreshSizeEstimates.class + RefreshSizeEstimates.class, + RelocateSSTables.class ); Cli.CliBuilder builder = Cli.builder("nodetool"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java index b27b07a..b62512a 100644 --- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java +++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java @@ -17,18 +17,20 @@ */ package org.apache.cassandra.tools; +import java.io.File; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import com.google.common.base.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; @@ -96,7 +98,7 @@ public class SSTableOfflineRelevel Keyspace ks = Keyspace.openWithoutSSTables(keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily); Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); - Set sstables = new HashSet<>(); + SetMultimap sstableMultimap = HashMultimap.create(); for (Map.Entry> sstable : lister.list().entrySet()) { if (sstable.getKey() != null) @@ -104,7 +106,7 @@ public class SSTableOfflineRelevel try { SSTableReader reader = SSTableReader.open(sstable.getKey()); - sstables.add(reader); + sstableMultimap.put(reader.descriptor.directory, reader); } catch (Throwable t) { @@ -113,13 +115,20 @@ public class SSTableOfflineRelevel } } } - if (sstables.isEmpty()) + if (sstableMultimap.isEmpty()) { out.println("No sstables to relevel for "+keyspace+"."+columnfamily); System.exit(1); } - Relevel rl = new Relevel(sstables); - rl.relevel(dryRun); + for (File directory : sstableMultimap.keySet()) + { + if (!sstableMultimap.get(directory).isEmpty()) + { + Relevel rl = new Relevel(sstableMultimap.get(directory)); + out.println("For sstables in " + directory + ":"); + rl.relevel(dryRun); + } + } System.exit(0); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java new file mode 100644 index 0000000..8522bc4 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.util.ArrayList; +import java.util.List; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; + +@Command(name = "relocatesstables", description = "Relocates sstables to the correct disk") +public class RelocateSSTables extends NodeTool.NodeToolCmd +{ + @Arguments(usage = " ", description = "The keyspace and table name") + private List args = new ArrayList<>(); + + @Override + public void execute(NodeProbe probe) + { + List keyspaces = parseOptionalKeyspace(args, probe); + String[] cfnames = parseOptionalTables(args); + try + { + for (String keyspace : keyspaces) + probe.relocateSSTables(keyspace, cfnames); + } + catch (Exception e) + { + throw new RuntimeException("Got error while relocating", e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index 96ee072..0f05524 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -59,8 +59,8 @@ public class LongLeveledCompactionStrategyTest Keyspace keyspace = Keyspace.open(ksname); ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname); store.disableAutoCompaction(); - - LeveledCompactionStrategy lcs = (LeveledCompactionStrategy)store.getCompactionStrategyManager().getStrategies().get(1); + CompactionStrategyManager mgr = store.getCompactionStrategyManager(); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) mgr.getStrategies().get(1).get(0); ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 27b774d..824c533 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -633,14 +633,14 @@ public class ScrubTest { SerializationHeader header = new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS); MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(0); - return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn)); + return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn), txn); } private static class TestMultiWriter extends SimpleSSTableMultiWriter { - TestMultiWriter(SSTableWriter writer) + TestMultiWriter(SSTableWriter writer, LifecycleTransaction txn) { - super(writer); + super(writer, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index 3c0098b..7fee251 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.compaction; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.Test; @@ -215,9 +216,9 @@ public class CompactionsCQLTest extends CQLTester public boolean verifyStrategies(CompactionStrategyManager manager, Class expected) { boolean found = false; - for (AbstractCompactionStrategy actualStrategy : manager.getStrategies()) + for (List strategies : manager.getStrategies()) { - if (!actualStrategy.getClass().equals(expected)) + if (!strategies.stream().allMatch((strategy) -> strategy.getClass().equals(expected))) return false; found = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index f2ddb00..1676896 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -56,6 +56,7 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; +import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -122,10 +123,11 @@ public class LeveledCompactionStrategyTest } waitForLeveling(cfs); - CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); + CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager(); // Checking we're not completely bad at math - int l1Count = strategy.getSSTableCountPerLevel()[1]; - int l2Count = strategy.getSSTableCountPerLevel()[2]; + + int l1Count = strategyManager.getSSTableCountPerLevel()[1]; + int l2Count = strategyManager.getSSTableCountPerLevel()[2]; if (l1Count == 0 || l2Count == 0) { logger.error("L1 or L2 has 0 sstables. Expected > 0 on both."); @@ -177,10 +179,10 @@ public class LeveledCompactionStrategyTest } waitForLeveling(cfs); - CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); + CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager(); // Checking we're not completely bad at math - assertTrue(strategy.getSSTableCountPerLevel()[1] > 0); - assertTrue(strategy.getSSTableCountPerLevel()[2] > 0); + assertTrue(strategyManager.getSSTableCountPerLevel()[1] > 0); + assertTrue(strategyManager.getSSTableCountPerLevel()[2] > 0); Range range = new Range<>(Util.token(""), Util.token("")); int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds()); @@ -196,7 +198,7 @@ public class LeveledCompactionStrategyTest */ private void waitForLeveling(ColumnFamilyStore cfs) throws InterruptedException { - CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); // L0 is the lowest priority, so when that's done, we know everything is done while (strategy.getSSTableCountPerLevel()[0] > 1) Thread.sleep(100); @@ -224,7 +226,7 @@ public class LeveledCompactionStrategyTest } waitForLeveling(cfs); - LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1); + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0); assert strategy.getLevelSize(1) > 0; // get LeveledScanner for level 1 sstables @@ -260,7 +262,7 @@ public class LeveledCompactionStrategyTest cfs.forceBlockingFlush(); } cfs.forceBlockingFlush(); - LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1); + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0); cfs.forceMajorCompaction(); for (SSTableReader s : cfs.getLiveSSTables()) @@ -306,14 +308,14 @@ public class LeveledCompactionStrategyTest while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) Thread.sleep(100); - CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); - List strategies = strategy.getStrategies(); - LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0); - LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1); + CompactionStrategyManager manager = cfs.getCompactionStrategyManager(); + List> strategies = manager.getStrategies(); + LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0).get(0); + LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1).get(0); assertEquals(0, repaired.manifest.getLevelCount() ); assertEquals(2, unrepaired.manifest.getLevelCount()); - assertTrue(strategy.getSSTableCountPerLevel()[1] > 0); - assertTrue(strategy.getSSTableCountPerLevel()[2] > 0); + assertTrue(manager.getSSTableCountPerLevel()[1] > 0); + assertTrue(manager.getSSTableCountPerLevel()[2] > 0); for (SSTableReader sstable : cfs.getLiveSSTables()) assertFalse(sstable.isRepaired()); @@ -331,7 +333,7 @@ public class LeveledCompactionStrategyTest sstable1.reloadSSTableMetadata(); assertTrue(sstable1.isRepaired()); - strategy.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this); + manager.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this); int repairedSSTableCount = 0; for (List level : repaired.manifest.generations) @@ -343,7 +345,7 @@ public class LeveledCompactionStrategyTest assertFalse(unrepaired.manifest.generations[2].contains(sstable1)); unrepaired.removeSSTable(sstable2); - strategy.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable2)), this); + manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this); assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2)); assertFalse(repaired.manifest.getLevel(1).contains(sstable2)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index 7b9b19c..4f49389 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -297,7 +297,7 @@ public class TrackerTest Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2)); SSTableReader reader = MockSchema.sstable(0, 10, false, cfs); - tracker.replaceFlushed(prev2, Collections.singleton(reader)); + tracker.replaceFlushed(prev2, singleton(reader)); Assert.assertEquals(1, tracker.getView().sstables.size()); Assert.assertEquals(1, listener.received.size()); Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added); @@ -314,13 +314,13 @@ public class TrackerTest tracker.markFlushing(prev1); reader = MockSchema.sstable(0, 10, true, cfs); cfs.invalidate(false); - tracker.replaceFlushed(prev1, Collections.singleton(reader)); + tracker.replaceFlushed(prev1, singleton(reader)); Assert.assertEquals(0, tracker.getView().sstables.size()); Assert.assertEquals(0, tracker.getView().flushingMemtables.size()); Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); Assert.assertEquals(3, listener.received.size()); Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added); - Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification); + Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification); Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size()); DatabaseDescriptor.setIncrementalBackupsEnabled(backups); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java index 523c203..e787cc4 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java @@ -42,6 +42,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import static com.google.common.collect.ImmutableSet.copyOf; import static com.google.common.collect.ImmutableSet.of; import static com.google.common.collect.Iterables.concat; +import static java.util.Collections.singleton; import static org.apache.cassandra.db.lifecycle.Helpers.emptySet; public class ViewTest @@ -195,7 +196,7 @@ public class ViewTest Assert.assertEquals(memtable3, cur.getCurrentMemtable()); SSTableReader sstable = MockSchema.sstable(1, cfs); - cur = View.replaceFlushed(memtable1, Collections.singleton(sstable)).apply(cur); + cur = View.replaceFlushed(memtable1, singleton(sstable)).apply(cur); Assert.assertEquals(0, cur.flushingMemtables.size()); Assert.assertEquals(1, cur.liveMemtables.size()); Assert.assertEquals(memtable3, cur.getCurrentMemtable()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/dht/LengthPartitioner.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java index 9cefbf2..e2202fe 100644 --- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java +++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java @@ -61,6 +61,12 @@ public class LengthPartitioner implements IPartitioner return MINIMUM; } + @Override + public Token getMaximumToken() + { + return null; + } + public BigIntegerToken getRandomToken() { return new BigIntegerToken(BigInteger.valueOf(new Random().nextInt(15))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/dht/SplitterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java new file mode 100644 index 0000000..751a7d7 --- /dev/null +++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.dht; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class SplitterTest +{ + + @Test + public void randomSplitTestNoVNodesRandomPartitioner() + { + randomSplitTestNoVNodes(new RandomPartitioner()); + } + + @Test + public void randomSplitTestNoVNodesMurmur3Partitioner() + { + randomSplitTestNoVNodes(new Murmur3Partitioner()); + } + + @Test + public void randomSplitTestVNodesRandomPartitioner() + { + randomSplitTestVNodes(new RandomPartitioner()); + } + @Test + public void randomSplitTestVNodesMurmur3Partitioner() + { + randomSplitTestVNodes(new Murmur3Partitioner()); + } + + public void randomSplitTestNoVNodes(IPartitioner partitioner) + { + Splitter splitter = partitioner.splitter().get(); + Random r = new Random(); + for (int i = 0; i < 10000; i++) + { + List> localRanges = generateLocalRanges(1, r.nextInt(4)+1, splitter, r, partitioner instanceof RandomPartitioner); + List boundaries = splitter.splitOwnedRanges(r.nextInt(9) + 1, localRanges, false); + assertTrue("boundaries = "+boundaries+" ranges = "+localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true)); + } + } + + public void randomSplitTestVNodes(IPartitioner partitioner) + { + Splitter splitter = partitioner.splitter().get(); + Random r = new Random(); + for (int i = 0; i < 10000; i++) + { + // we need many tokens to be able to split evenly over the disks + int numTokens = 172 + r.nextInt(128); + int rf = r.nextInt(4) + 2; + int parts = r.nextInt(5)+1; + List> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner); + List boundaries = splitter.splitOwnedRanges(parts, localRanges, true); + if (!assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, false)) + fail(String.format("Could not split %d tokens with rf=%d into %d parts (localRanges=%s, boundaries=%s)", numTokens, rf, parts, localRanges, boundaries)); + } + } + + private boolean assertRangeSizeEqual(List> localRanges, List tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges) + { + Token start = partitioner.getMinimumToken(); + List splits = new ArrayList<>(); + + for (int i = 0; i < tokens.size(); i++) + { + Token end = i == tokens.size() - 1 ? partitioner.getMaximumToken() : tokens.get(i); + splits.add(sumOwnedBetween(localRanges, start, end, splitter, splitIndividualRanges)); + start = end; + } + // when we dont need to keep around full ranges, the difference is small between the partitions + BigDecimal delta = splitIndividualRanges ? BigDecimal.valueOf(0.001) : BigDecimal.valueOf(0.2); + boolean allBalanced = true; + for (BigInteger b : splits) + { + for (BigInteger i : splits) + { + BigDecimal bdb = new BigDecimal(b); + BigDecimal bdi = new BigDecimal(i); + BigDecimal q = bdb.divide(bdi, 2, BigDecimal.ROUND_HALF_DOWN); + if (q.compareTo(BigDecimal.ONE.add(delta)) > 0 || q.compareTo(BigDecimal.ONE.subtract(delta)) < 0) + allBalanced = false; + } + } + return allBalanced; + } + + private BigInteger sumOwnedBetween(List> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges) + { + BigInteger sum = BigInteger.ZERO; + for (Range range : localRanges) + { + if (splitIndividualRanges) + { + Set> intersections = new Range<>(start, end).intersectionWith(range); + for (Range intersection : intersections) + sum = sum.add(splitter.valueForToken(intersection.right).subtract(splitter.valueForToken(intersection.left))); + } + else + { + if (new Range<>(start, end).contains(range.left)) + sum = sum.add(splitter.valueForToken(range.right).subtract(splitter.valueForToken(range.left))); + } + } + return sum; + } + + private List> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner) + { + int localTokens = numTokens * rf; + List randomTokens = new ArrayList<>(); + + for (int i = 0; i < localTokens * 2; i++) + { + Token t = splitter.tokenForValue(randomPartitioner ? new BigInteger(127, r) : BigInteger.valueOf(r.nextLong())); + randomTokens.add(t); + } + + Collections.sort(randomTokens); + + List> localRanges = new ArrayList<>(localTokens); + for (int i = 0; i < randomTokens.size() - 1; i++) + { + assert randomTokens.get(i).compareTo(randomTokens.get(i+1)) < 0; + localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i+1))); + i++; + } + return localRanges; + } +}