Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B77B7105EC for ; Wed, 19 Jun 2013 16:01:29 +0000 (UTC) Received: (qmail 23576 invoked by uid 500); 19 Jun 2013 16:01:29 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 23317 invoked by uid 500); 19 Jun 2013 16:01:29 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 23293 invoked by uid 99); 19 Jun 2013 16:01:28 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jun 2013 16:01:28 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7B73524780; Wed, 19 Jun 2013 16:01:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Wed, 19 Jun 2013 16:01:28 -0000 Message-Id: <1aa330329e2047e9b71dcb353c0fa7a2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: revert 079ae68fb7086259439491e6a10bc2d8a947f52c Updated Branches: refs/heads/cassandra-1.2 7861f1731 -> f932aa24b refs/heads/trunk cfc163df0 -> 0b2359959 revert 079ae68fb7086259439491e6a10bc2d8a947f52c Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f932aa24 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f932aa24 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f932aa24 Branch: refs/heads/cassandra-1.2 Commit: f932aa24b56a4683f90bf808889188dfa12c44dc Parents: 7861f17 Author: Jonathan Ellis Authored: Wed Jun 19 10:25:23 2013 -0500 Committer: Jonathan Ellis Committed: Wed Jun 19 10:25:23 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 - .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../cassandra/io/sstable/SSTableReader.java | 150 ++++++------------- 4 files changed, 47 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5d36bd9..e1282aa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,5 @@ 1.2.6 * Fix cross-DC mutation forwarding (CASSANDRA-5632) - * Reduce SSTableLoader memory usage (CASSANDRA-5555) * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272) * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622) * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 81ced05..429859e 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -232,7 +232,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (loadSSTables) { Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); - Collection sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner); + Collection sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner); if (metadata.getDefaultValidator().isCommutative()) { // Filter non-compacted sstables, remove compacted ones http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 68bb423..b91e288 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -97,7 +97,7 @@ public class SSTableLoader try { - sstables.add(SSTableReader.openForBatch(desc, components, client.getPartitioner())); + sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner())); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 574465d..ea9c451 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.*; +import com.google.common.primitives.Longs; import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,33 +154,41 @@ public class SSTableReader extends SSTable return open(descriptor, components, metadata, partitioner, true); } - public static SSTableReader openForBatch(Descriptor descriptor, Set components, IPartitioner partitioner) throws IOException - { - SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner); - SSTableReader sstable = new SSTableReader(descriptor, - components, - null, - partitioner, - System.currentTimeMillis(), - sstableMetadata); - sstable.bf = new AlwaysPresentFilter(); - sstable.loadForBatch(); - return sstable; - } - private static SSTableReader open(Descriptor descriptor, Set components, CFMetaData metadata, IPartitioner partitioner, boolean validate) throws IOException { + assert partitioner != null; + // Minimum components without which we can't do anything + assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; + assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; + long start = System.currentTimeMillis(); - SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner); + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length()); + + SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor); + + // Check if sstable is created using same partitioner. + // Partitioner can be null, which indicates older version of sstable or no stats available. + // In that case, we skip the check. + String partitionerName = partitioner.getClass().getCanonicalName(); + if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner)) + { + logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", + descriptor, sstableMetadata.partitioner, partitionerName)); + System.exit(1); + } SSTableReader sstable = new SSTableReader(descriptor, components, metadata, partitioner, + null, + null, + null, + null, System.currentTimeMillis(), sstableMetadata); // versions before 'c' encoded keys as utf-16 before hashing to the filter @@ -205,30 +214,6 @@ public class SSTableReader extends SSTable return sstable; } - private static SSTableMetadata openMetadata(Descriptor descriptor, Set components, IPartitioner partitioner) throws IOException - { - assert partitioner != null; - // Minimum components without which we can't do anything - assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; - assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length()); - - SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor); - - // Check if sstable is created using same partitioner. - // Partitioner can be null, which indicates older version of sstable or no stats available. - // In that case, we skip the check. - String partitionerName = partitioner.getClass().getCanonicalName(); - if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner)) - { - logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", - descriptor, sstableMetadata.partitioner, partitionerName)); - System.exit(1); - } - return sstableMetadata; - } - public static void logOpenException(Descriptor descriptor, IOException e) { if (e instanceof FileNotFoundException) @@ -237,9 +222,9 @@ public class SSTableReader extends SSTable logger.error("Corrupt sstable " + descriptor + "; skipped", e); } - public static Collection openAll(Set>> entries, - final CFMetaData metadata, - final IPartitioner partitioner) + public static Collection batchOpen(Set>> entries, + final CFMetaData metadata, + final IPartitioner partitioner) { final Collection sstables = new LinkedBlockingQueue(); @@ -310,20 +295,6 @@ public class SSTableReader extends SSTable Set components, CFMetaData metadata, IPartitioner partitioner, - long maxDataAge, - SSTableMetadata sstableMetadata) - { - super(desc, components, metadata, partitioner); - this.sstableMetadata = sstableMetadata; - this.maxDataAge = maxDataAge; - - this.deletingTask = new SSTableDeletingTask(this); - } - - private SSTableReader(Descriptor desc, - Set components, - CFMetaData metadata, - IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary indexSummary, @@ -331,12 +302,15 @@ public class SSTableReader extends SSTable long maxDataAge, SSTableMetadata sstableMetadata) { - this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata); + super(desc, components, metadata, partitioner); + this.sstableMetadata = sstableMetadata; + this.maxDataAge = maxDataAge; this.ifile = ifile; this.dfile = dfile; this.indexSummary = indexSummary; this.bf = bloomFilter; + this.deletingTask = new SSTableDeletingTask(this); } public void setTrackedBy(DataTracker tracker) @@ -375,56 +349,16 @@ public class SSTableReader extends SSTable { SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder); - if (recreatebloom || !summaryLoaded) - buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded); - - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (recreatebloom || !summaryLoaded) // save summary information to disk - saveSummary(this, ibuilder, dbuilder); - } - - /** - * A simplified load that creates a minimal partition index - */ - private void loadForBatch() throws IOException - { - // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments - SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); - SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : new BufferedSegmentedFile.Builder(); - - // build a bare-bones IndexSummary - IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1); - RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); - try - { - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - first = decodeKey(partitioner, descriptor, key); - summaryBuilder.maybeAddEntry(first, 0); - indexSummary = summaryBuilder.build(partitioner); - } - finally - { - FileUtils.closeQuietly(in); - } - - last = null; // shouldn't need this for batch operations - - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - } - - private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException - { // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); + + // try to load summaries from the disk and check if we need + // to read primary index because we should re-create a BloomFilter or pre-load KeyCache + final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder); + final boolean readIndex = recreatebloom || !summaryLoaded; try { long indexSize = primaryIndex.length(); @@ -440,7 +374,7 @@ public class SSTableReader extends SSTable summaryBuilder = new IndexSummaryBuilder(estimatedKeys); long indexPosition; - while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) + while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize) { ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version); @@ -471,6 +405,12 @@ public class SSTableReader extends SSTable first = getMinimalKey(first); last = getMinimalKey(last); + // finalize the load. + // finalize the state of the reader + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + if (readIndex) // save summary information to disk + saveSummary(this, ibuilder, dbuilder); } public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)