Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EDEF69A09 for ; Fri, 22 Jun 2012 18:16:20 +0000 (UTC) Received: (qmail 48039 invoked by uid 500); 22 Jun 2012 18:16:20 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 48005 invoked by uid 500); 22 Jun 2012 18:16:20 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 47995 invoked by uid 99); 22 Jun 2012 18:16:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jun 2012 18:16:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 52A2D69E9; Fri, 22 Jun 2012 18:16:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: fixes for small-sstable compaction patch by slebresne; reviewed by jbellis for CASSANDRA-4341 Message-Id: <20120622181620.52A2D69E9@tyr.zones.apache.org> Date: Fri, 22 Jun 2012 18:16:20 +0000 (UTC) Updated Branches: refs/heads/trunk 71a50b73d -> 2ce8274c6 fixes for small-sstable compaction patch by slebresne; reviewed by jbellis for CASSANDRA-4341 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ce8274c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ce8274c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ce8274c Branch: refs/heads/trunk Commit: 2ce8274c600dba00228c4c14116840a8ec6dceb0 Parents: 71a50b7 Author: Jonathan Ellis Authored: Fri Jun 22 13:15:51 2012 -0500 Committer: Jonathan Ellis Committed: Fri Jun 22 13:15:51 2012 -0500 ---------------------------------------------------------------------- .../cassandra/db/compaction/CompactionTask.java | 4 +- .../db/compaction/LeveledCompactionTask.java | 4 +- .../cassandra/db/compaction/LeveledManifest.java | 21 ++++++++++----- .../io/compress/CompressedSequentialWriter.java | 6 ++++ .../apache/cassandra/io/sstable/SSTableWriter.java | 5 +++ .../apache/cassandra/io/util/SequentialWriter.java | 12 ++++++++ 6 files changed, 41 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 4e167f3..468c9a9 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -166,7 +166,7 @@ public class CompactionTask extends AbstractCompactionTask } } } - if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, indexEntry.position)) + if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer)) { SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact)); cachedKeyMap.put(toIndex, cachedKeys); @@ -226,7 +226,7 @@ public class CompactionTask extends AbstractCompactionTask } //extensibility point for other strategies that may want to limit the upper bounds of the sstable segment size - protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position) + protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException { return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index 59cd55f..ef290f9 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -62,9 +62,9 @@ public class LeveledCompactionTask extends CompactionTask } @Override - protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position) + protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException { - return position > sstableSizeInMB * 1024L * 1024L; + return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index b8297c7..eb82e0d 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -176,7 +176,7 @@ public class LeveledManifest return; int newLevel; - if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) < maxSSTableSizeInBytes) + if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) <= maxSSTableSizeInBytes) { // special case for tiny L0 sstables; see CASSANDRA-4341 newLevel = 0; @@ -278,7 +278,7 @@ public class LeveledManifest logger.debug("Compaction score for level {} is {}", i, score); // L0 gets a special case that if we don't have anything more important to do, - // we'll go ahead and compact even just one sstable + // we'll go ahead and compact if we have more than one sstable if (score > 1.001 || (i == 0 && sstables.size() > 1)) { Collection candidates = getCandidatesFor(i); @@ -355,9 +355,9 @@ public class LeveledManifest sstableGenerations.put(sstable, Integer.valueOf(level)); } - private static List overlapping(SSTableReader sstable, Iterable candidates) + private static Set overlapping(SSTableReader sstable, Iterable candidates) { - List overlapped = new ArrayList(); + Set overlapped = new HashSet(); overlapped.add(sstable); Range promotedRange = new Range(sstable.first.token, sstable.last.token); @@ -383,7 +383,7 @@ public class LeveledManifest // 1a. add sstables to the candidate set until we have at least maxSSTableSizeInMB // 1b. prefer choosing older sstables as candidates, to newer ones // 1c. any L0 sstables that overlap a candidate, will also become candidates - // 2. At most MAX_COMPACTING_L0 sstables will be compacted at once + // 2. At most MAX_COMPACTING_L0 sstables from L0 will be compacted at once // 3. If total candidate size is less than maxSSTableSizeInMB, we won't bother compacting with L1, // and the result of the compaction will stay in L0 instead of being promoted (see promote()) // @@ -413,7 +413,14 @@ public class LeveledManifest // limit to only the MAX_COMPACTING_L0 oldest candidates List ageSortedCandidates = new ArrayList(candidates); Collections.sort(ageSortedCandidates, SSTable.maxTimestampComparator); - return ageSortedCandidates.subList(0, MAX_COMPACTING_L0); + candidates = new HashSet(ageSortedCandidates.subList(0, MAX_COMPACTING_L0)); + if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes) + { + // add sstables from L1 that overlap candidates + for (SSTableReader candidate : new ArrayList(candidates)) + candidates.addAll(overlapping(candidate, generations[1])); + } + return candidates; } if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes) @@ -448,7 +455,7 @@ public class LeveledManifest while (true) { SSTableReader sstable = generations[level].get(i); - List candidates = overlapping(sstable, generations[(level + 1)]); + Set candidates = overlapping(sstable, generations[(level + 1)]); for (SSTableReader candidate : candidates) { if (candidate.isMarkedSuspect()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 67ec3f0..fdcf1b8 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -68,6 +68,12 @@ public class CompressedSequentialWriter extends SequentialWriter } @Override + public long getOnDiskFilePointer() throws IOException + { + return out.getFilePointer(); + } + + @Override public void sync() throws IOException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 4219f64..2fc9771 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -373,6 +373,11 @@ public class SSTableWriter extends SSTable return dataFile.getFilePointer(); } + public long getOnDiskFilePointer() throws IOException + { + return dataFile.getOnDiskFilePointer(); + } + /** * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 961cc48..8b78730 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -244,6 +244,18 @@ public class SequentialWriter extends OutputStream return current; } + /** + * Return the current file pointer of the underlying on-disk file. + * Note that since write works by buffering data, the value of this will increase by buffer + * size and not every write to the writer will modify this value. + * Furthermore, for compressed files, this value refers to compressed data, while the + * writer getFilePointer() refers to uncompressedFile + */ + public long getOnDiskFilePointer() throws IOException + { + return getFilePointer(); + } + public long length() throws IOException { return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);