Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0563A200B5E for ; Wed, 10 Aug 2016 19:58:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 03EF4160AA4; Wed, 10 Aug 2016 17:58:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A17E8160A8F for ; Wed, 10 Aug 2016 19:58:43 +0200 (CEST) Received: (qmail 25881 invoked by uid 500); 10 Aug 2016 17:58:42 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 25869 invoked by uid 99); 10 Aug 2016 17:58:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Aug 2016 17:58:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7DE48DFDF4; Wed, 10 Aug 2016 17:58:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jake@apache.org To: commits@cassandra.apache.org Message-Id: <3e9b1d1460a0436d85126cb1bc56a914@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: cassandra git commit: Fix compaction throttle Date: Wed, 10 Aug 2016 17:58:42 +0000 (UTC) archived-at: Wed, 10 Aug 2016 17:58:45 -0000 Repository: cassandra Updated Branches: refs/heads/trunk d32a745b2 -> 10649b1ce Fix compaction throttle Patch by tjake reviewed by Marcus Eriksson for CASSANDRA-12366 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/10649b1c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/10649b1c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/10649b1c Branch: refs/heads/trunk Commit: 10649b1ce5aba1683e024c0f9cf23bd9c42704b7 Parents: d32a745 Author: T Jake Luciani Authored: Tue Aug 2 12:50:03 2016 -0400 Committer: T Jake Luciani Committed: Wed Aug 10 13:57:18 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compaction/AbstractCompactionStrategy.java | 39 ++++++++++++++++++-- .../db/compaction/CompactionController.java | 3 +- .../db/compaction/CompactionManager.java | 19 +++++++++- .../cassandra/db/compaction/CompactionTask.java | 23 +++++++++++- .../compaction/LeveledCompactionStrategy.java | 27 ++++++++++++-- .../cassandra/io/sstable/ISSTableScanner.java | 2 + .../io/sstable/format/big/BigTableScanner.java | 35 ++++++++++++++++++ .../apache/cassandra/metrics/TableMetrics.java | 3 +- .../db/compaction/CompactionIteratorTest.java | 13 +++++++ 10 files changed, 152 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5738c30..5c1d286 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Fix compaction throughput throttle (CASSANDRA-12366) * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) * Cassandra stress should dump all setting on startup (CASSANDRA-11914) * Make it possible to compact a given token range (CASSANDRA-10643) http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 48d5eb5..8454147 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -23,7 +23,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SerializationHeader; @@ -283,12 +282,11 @@ public abstract class AbstractCompactionStrategy @SuppressWarnings("resource") public ScannerList getScanners(Collection sstables, Collection> ranges) { - RateLimiter limiter = CompactionManager.instance.getRateLimiter(); ArrayList scanners = new ArrayList(); try { for (SSTableReader sstable : sstables) - scanners.add(sstable.getScanner(ranges, limiter)); + scanners.add(sstable.getScanner(ranges, null)); } catch (Throwable t) { @@ -341,6 +339,41 @@ public abstract class AbstractCompactionStrategy this.scanners = scanners; } + public long getTotalBytesScanned() + { + long bytesScanned = 0L; + for (ISSTableScanner scanner : scanners) + bytesScanned += scanner.getBytesScanned(); + + return bytesScanned; + } + + public long getTotalCompressedSize() + { + long compressedSize = 0; + for (ISSTableScanner scanner : scanners) + compressedSize += scanner.getCompressedLengthInBytes(); + + return compressedSize; + } + + public double getCompressionRatio() + { + double compressed = 0.0; + double uncompressed = 0.0; + + for (ISSTableScanner scanner : scanners) + { + compressed += scanner.getCompressedLengthInBytes(); + uncompressed += scanner.getLengthInBytes(); + } + + if (compressed == uncompressed || uncompressed == 0) + return MetadataCollector.NO_COMPRESSION_RATIO; + + return compressed / uncompressed; + } + public void close() { Throwable t = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index d922d28..b34eee6 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -72,8 +72,7 @@ public class CompactionController implements AutoCloseable public CompactionController(ColumnFamilyStore cfs, Set compacting, int gcBefore) { - this(cfs, compacting, gcBefore, - CompactionManager.instance.getRateLimiter(), + this(cfs, compacting, gcBefore, null, cfs.getCompactionStrategyManager().getCompactionParams().tombstoneOption()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index ac6c753..f3a5711 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -30,6 +30,7 @@ import javax.management.openmbean.TabularData; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; +import com.google.common.primitives.Ints; import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -483,7 +484,7 @@ public class CompactionManager implements CompactionManagerMBean @Override protected CompactionController getCompactionController(Set toCompact) { - return new CompactionController(cfStore, toCompact, gcBefore, getRateLimiter(), tombstoneOption); + return new CompactionController(cfStore, toCompact, gcBefore, null, tombstoneOption); } }; task.setUserDefined(true); @@ -1079,15 +1080,22 @@ public class CompactionManager implements CompactionManagerMBean logger.info("Cleaning up {}", sstable); File compactionFileLocation = sstable.descriptor.directory; + RateLimiter limiter = getRateLimiter(); + double compressionRatio = sstable.getCompressionRatio(); + if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO) + compressionRatio = 1.0; List finished; + int nowInSec = FBUtilities.nowInSeconds(); try (SSTableRewriter writer = SSTableRewriter.construct(cfs, txn, false, sstable.maxDataAge); - ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter()); + ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, null); CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) { writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, txn)); + long lastBytesScanned = 0; + while (ci.hasNext()) { @@ -1102,6 +1110,13 @@ public class CompactionManager implements CompactionManagerMBean if (writer.append(notCleaned) != null) totalkeysWritten++; + + long bytesScanned = scanner.getBytesScanned(); + + int lengthRead = (int) (Ints.checkedCast(bytesScanned - lastBytesScanned) * compressionRatio); + limiter.acquire(lengthRead + 1); + + lastBytesScanned = bytesScanned; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 86c8a8f..43a726b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; @@ -41,6 +43,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Refs; @@ -149,14 +152,15 @@ public class CompactionTask extends AbstractCompactionTask logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg); + RateLimiter limiter = CompactionManager.instance.getRateLimiter(); long start = System.nanoTime(); long startTime = System.currentTimeMillis(); long totalKeysWritten = 0; long estimatedKeys = 0; + long inputSizeBytes; try (CompactionController controller = getCompactionController(transaction.originals())) { Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - Collection newSStables; long[] mergedRowCounts; @@ -173,6 +177,12 @@ public class CompactionTask extends AbstractCompactionTask if (collector != null) collector.beginCompaction(ci); long lastCheckObsoletion = start; + inputSizeBytes = scanners.getTotalCompressedSize(); + double compressionRatio = scanners.getCompressionRatio(); + if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO) + compressionRatio = 1.0; + + long lastBytesScanned = 0; if (!controller.cfs.getCompactionStrategyManager().isActive()) throw new CompactionInterruptedException(ci.getCompactionInfo()); @@ -188,6 +198,15 @@ public class CompactionTask extends AbstractCompactionTask if (writer.append(ci.next())) totalKeysWritten++; + + long bytesScanned = scanners.getTotalBytesScanned(); + + //Rate limit the scanners, and account for compression + int lengthRead = (int) (Ints.checkedCast(bytesScanned - lastBytesScanned) * compressionRatio); + limiter.acquire(lengthRead + 1); + + lastBytesScanned = bytesScanned; + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) { controller.maybeRefreshOverlaps(); @@ -213,7 +232,7 @@ public class CompactionTask extends AbstractCompactionTask long durationInNano = System.nanoTime() - start; long dTime = TimeUnit.NANOSECONDS.toMillis(durationInNano); - long startsize = SSTableReader.getTotalBytes(transaction.originals()); + long startsize = inputSizeBytes; long endsize = SSTableReader.getTotalBytes(newSStables); double ratio = (double) endsize / (double) startsize; http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 287e387..c224f91 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -265,7 +265,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy { // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each for (SSTableReader sstable : byLevel.get(level)) - scanners.add(sstable.getScanner(ranges, CompactionManager.instance.getRateLimiter())); + scanners.add(sstable.getScanner(ranges, null)); } else { @@ -322,9 +322,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy private final List sstables; private final Iterator sstableIterator; private final long totalLength; + private final long compressedLength; private ISSTableScanner currentScanner; private long positionOffset; + private SSTableReader currentSSTable; + private long totalBytesScanned = 0; public LeveledScanner(Collection sstables, Collection> ranges) { @@ -333,6 +336,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy // add only sstables that intersect our range, and estimate how much data that involves this.sstables = new ArrayList<>(sstables.size()); long length = 0; + long cLength = 0; for (SSTableReader sstable : sstables) { this.sstables.add(sstable); @@ -343,13 +347,17 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy estKeysInRangeRatio = ((double) sstable.estimatedKeysForRanges(ranges)) / estimatedKeys; length += sstable.uncompressedLength() * estKeysInRangeRatio; + cLength += sstable.onDiskLength() * estKeysInRangeRatio; } totalLength = length; + compressedLength = cLength; Collections.sort(this.sstables, SSTableReader.sstableComparator); sstableIterator = this.sstables.iterator(); assert sstableIterator.hasNext(); // caller should check intersecting first - currentScanner = sstableIterator.next().getScanner(ranges, CompactionManager.instance.getRateLimiter()); + currentSSTable = sstableIterator.next(); + currentScanner = currentSSTable.getScanner(ranges, null); + } public static Collection intersecting(Collection sstables, Collection> ranges) @@ -392,6 +400,8 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy return currentScanner.next(); positionOffset += currentScanner.getLengthInBytes(); + totalBytesScanned += currentScanner.getBytesScanned(); + currentScanner.close(); if (!sstableIterator.hasNext()) { @@ -399,7 +409,8 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy currentScanner = null; return endOfData(); } - currentScanner = sstableIterator.next().getScanner(ranges, CompactionManager.instance.getRateLimiter()); + currentSSTable = sstableIterator.next(); + currentScanner = currentSSTable.getScanner(ranges, null); } } @@ -419,6 +430,16 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy return positionOffset + (currentScanner == null ? 0L : currentScanner.getCurrentPosition()); } + public long getCompressedLengthInBytes() + { + return compressedLength; + } + + public long getBytesScanned() + { + return currentScanner == null ? totalBytesScanned : totalBytesScanned + currentScanner.getBytesScanned(); + } + public String getBackingFiles() { return Joiner.on(", ").join(sstables); http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java index 7063057..2dff34e 100644 --- a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java @@ -28,6 +28,8 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; public interface ISSTableScanner extends UnfilteredPartitionIterator { public long getLengthInBytes(); + public long getCompressedLengthInBytes(); public long getCurrentPosition(); + public long getBytesScanned(); public String getBackingFiles(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index 66213a6..e465a02 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -62,6 +62,8 @@ public class BigTableScanner implements ISSTableScanner private final DataRange dataRange; private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; private final boolean isForThrift; + private long startScan = -1; + private long bytesScanned = 0; protected Iterator iterator; @@ -223,6 +225,16 @@ public class BigTableScanner implements ISSTableScanner return dfile.getFilePointer(); } + public long getBytesScanned() + { + return bytesScanned; + } + + public long getCompressedLengthInBytes() + { + return sstable.onDiskLength(); + } + public String getBackingFiles() { return sstable.toString(); @@ -277,12 +289,16 @@ public class BigTableScanner implements ISSTableScanner { do { + if (startScan != -1) + bytesScanned += dfile.getFilePointer() - startScan; + // we're starting the first range or we just passed the end of the previous range if (!rangeIterator.hasNext()) return endOfData(); currentRange = rangeIterator.next(); seekToCurrentRangeStart(); + startScan = dfile.getFilePointer(); if (ifile.isEOF()) return endOfData(); @@ -325,14 +341,23 @@ public class BigTableScanner implements ISSTableScanner { protected UnfilteredRowIterator initializeIterator() { + + if (startScan != -1) + bytesScanned += dfile.getFilePointer() - startScan; + try { if (dataRange == null) { dfile.seek(currentEntry.position); + startScan = dfile.getFilePointer(); ByteBufferUtil.skipShortLength(dfile); // key return SSTableIdentityIterator.create(sstable, dfile, partitionKey()); } + else + { + startScan = dfile.getFilePointer(); + } ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey()); return sstable.iterator(dfile, partitionKey(), currentEntry, filter.getSlices(BigTableScanner.this.metadata()), columns, filter.isReversed(), isForThrift); @@ -382,6 +407,16 @@ public class BigTableScanner implements ISSTableScanner return 0; } + public long getBytesScanned() + { + return 0; + } + + public long getCompressedLengthInBytes() + { + return 0; + } + public String getBackingFiles() { return sstable.getFilename(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 7889077..d1d4a3d 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.repair.SystemDistributedKeyspace; import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.TopKSampler; @@ -819,7 +820,7 @@ public class TableMetrics dataLengthSum += compressionMetadata.dataLength; } } - return dataLengthSum != 0 ? compressedLengthSum / dataLengthSum : 0; + return dataLengthSum != 0 ? compressedLengthSum / dataLengthSum : MetadataCollector.NO_COMPRESSION_RATIO; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/10649b1c/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java index 2189e15..129d1e7 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java @@ -39,6 +39,7 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.schema.KeyspaceParams; public class CompactionIteratorTest @@ -369,6 +370,18 @@ public class CompactionIteratorTest } @Override + public long getBytesScanned() + { + return 0; + } + + @Override + public long getCompressedLengthInBytes() + { + return 0; + } + + @Override public String getBackingFiles() { return null;