Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4087810BD4 for ; Tue, 16 Dec 2014 21:11:40 +0000 (UTC) Received: (qmail 25841 invoked by uid 500); 16 Dec 2014 21:11:40 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 25683 invoked by uid 500); 16 Dec 2014 21:11:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 25637 invoked by uid 99); 16 Dec 2014 21:11:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Dec 2014 21:11:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id AD501A2D958; Tue, 16 Dec 2014 21:11:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jmckenzie@apache.org To: commits@cassandra.apache.org Date: Tue, 16 Dec 2014 21:11:40 -0000 Message-Id: <00c829636b804c199e5e12cb5aed6a3c@git.apache.org> In-Reply-To: <3c8aa35d7b3b479e802ee8c89d62d300@git.apache.org> References: <3c8aa35d7b3b479e802ee8c89d62d300@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index 2488f86,0000000..fc346d1 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@@ -1,256 -1,0 +1,251 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.cache.KeyCacheKey; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; - import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; ++import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +/** + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen. + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead. + */ +public class BigTableReader extends SSTableReader +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class); + + BigTableReader(Descriptor desc, Set components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason) + { + super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + } + + public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet columns) + { + return new SSTableNamesIterator(this, key, columns); + } + + public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet columns, RowIndexEntry indexEntry ) + { + return new SSTableNamesIterator(this, input, key, columns, indexEntry); + } + + public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse) + { + return new SSTableSliceIterator(this, key, slices, reverse); + } + + public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry) + { + return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry); + } + /** + * + * @param dataRange filter to use when reading the columns + * @return A Scanner for seeking over the rows of the SSTable. + */ - public ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter) ++ public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter) + { - return new BigTableScanner(this, dataRange, limiter); ++ return BigTableScanner.getScanner(this, dataRange, limiter); + } + + + /** + * Direct I/O SSTableScanner over a defined collection of ranges of tokens. + * + * @param ranges the range of keys to cover + * @return A Scanner for seeking over the rows of the SSTable. + */ - public ICompactionScanner getScanner(Collection> ranges, RateLimiter limiter) ++ public ISSTableScanner getScanner(Collection> ranges, RateLimiter limiter) + { - // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) - List> positions = getPositionsForRanges(Range.normalize(ranges)); - if (positions.isEmpty()) - return new EmptyCompactionScanner(getFilename()); - else - return new BigTableScanner(this, ranges, limiter); ++ return BigTableScanner.getScanner(this, ranges, limiter); + } + + + /** + * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to + * allow key selection by token bounds but only if op != * EQ + * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. + * @param updateCacheAndStats true if updating stats and cache + * @return The index entry corresponding to the key, or null if the key is not present + */ + public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats) + { + // first, check bloom filter + if (op == Operator.EQ) + { + assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key + if (!bf.isPresent(((DecoratedKey)key).getKey())) + { + Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation); + return null; + } + } + + // next, the key cache (only make sense for valid row key) + if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey)) + { + DecoratedKey decoratedKey = (DecoratedKey)key; + KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey()); + RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats); + if (cachedPosition != null) + { + Tracing.trace("Key cache hit for sstable {}", descriptor.generation); + return cachedPosition; + } + } + + // check the smallest and greatest keys in the sstable to see if it can't be present + if (first.compareTo(key) > 0 || last.compareTo(key) < 0) + { + if (op == Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addFalsePositive(); + + if (op.apply(1) < 0) + { + Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation); + return null; + } + } + + int binarySearchResult = indexSummary.binarySearch(key); + long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary); + int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult); + + // if we matched the -1th position, we'll start at the first position + sampledPosition = sampledPosition == -1 ? 0 : sampledPosition; + + int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex); + + // scan the on-disk index, starting at the nearest sampled position. + // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present + // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the + // next index position because the searched key can be greater the last key of the index interval checked if it + // is lesser than the first key of next interval (and in that case we must return the position of the first key + // of the next interval). + int i = 0; + Iterator segments = ifile.iterator(sampledPosition); + while (segments.hasNext() && i <= effectiveInterval) + { + FileDataInput in = segments.next(); + try + { + while (!in.isEOF() && i <= effectiveInterval) + { + i++; + + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + + boolean opSatisfied; // did we find an appropriate position for the op requested + boolean exactMatch; // is the current position an exact match for the key, suitable for caching + + // Compare raw keys if possible for performance, otherwise compare decorated keys. + if (op == Operator.EQ) + { + opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey()); + } + else + { + DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); + int comparison = indexDecoratedKey.compareTo(key); + int v = op.apply(comparison); + opSatisfied = (v == 0); + exactMatch = (comparison == 0); + if (v < 0) + { + Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation); + return null; + } + } + + if (opSatisfied) + { + // read data position from index entry + RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version); + if (exactMatch && updateCacheAndStats) + { + assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key + DecoratedKey decoratedKey = (DecoratedKey)key; + + if (logger.isTraceEnabled()) + { + // expensive sanity check! see CASSANDRA-4687 + FileDataInput fdi = dfile.getSegment(indexEntry.position); + DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi)); + if (!keyInDisk.equals(key)) + throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath())); + fdi.close(); + } + + // store exact match for the key + cacheKey(decoratedKey, indexEntry); + } + if (op == Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addTruePositive(); + Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation); + return indexEntry; + } + + RowIndexEntry.Serializer.skip(in); + } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, in.getPath()); + } + finally + { + FileUtils.closeQuietly(in); + } + } + + if (op == SSTableReader.Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addFalsePositive(); + Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation); + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index 7e3c877,0000000..73a5d76 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@@ -1,300 -1,0 +1,354 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.IOException; - import java.util.ArrayList; - import java.util.Collection; - import java.util.Iterator; - import java.util.List; ++import java.util.*; + +import com.google.common.collect.AbstractIterator; +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; +import org.apache.cassandra.db.columniterator.LazyColumnIterator; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; - import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.CorruptSSTableException; ++import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.ByteBufferUtil; ++import org.apache.cassandra.utils.Pair; + - public class BigTableScanner implements ICompactionScanner ++public class BigTableScanner implements ISSTableScanner +{ + protected final RandomAccessReader dfile; + protected final RandomAccessReader ifile; + public final SSTableReader sstable; + + private final Iterator> rangeIterator; + private AbstractBounds currentRange; + + private final DataRange dataRange; + private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; + + protected Iterator iterator; + ++ // We can race with the sstable for deletion during compaction. If it's been ref counted to 0, skip ++ public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) ++ { ++ return sstable.acquireReference() ++ ? new BigTableScanner(sstable, dataRange, limiter) ++ : new BigTableScanner.EmptySSTableScanner(sstable.getFilename()); ++ } ++ public static ISSTableScanner getScanner(SSTableReader sstable, Collection> tokenRanges, RateLimiter limiter) ++ { ++ // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) ++ List> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges)); ++ if (positions.isEmpty() || !sstable.acquireReference()) ++ return new EmptySSTableScanner(sstable.getFilename()); ++ ++ return new BigTableScanner(sstable, tokenRanges, limiter); ++ } ++ + /** + * @param sstable SSTable to scan; must not be null + * @param dataRange a single range to scan; must not be null + * @param limiter background i/o RateLimiter; may be null + */ - public BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) ++ private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) + { + assert sstable != null; - sstable.acquireReference(); + + this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); + this.ifile = sstable.openIndexReader(); + this.sstable = sstable; + this.dataRange = dataRange; + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + + List> boundsList = new ArrayList<>(2); + if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum()) + { + // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and + // 2) the part that comes before the wrap-around + boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey())); + boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound())); + } + else + { + boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey())); + } + this.rangeIterator = boundsList.iterator(); + } + + /** + * @param sstable SSTable to scan; must not be null + * @param tokenRanges A set of token ranges to scan + * @param limiter background i/o RateLimiter; may be null + */ - public BigTableScanner(SSTableReader sstable, Collection> tokenRanges, RateLimiter limiter) ++ private BigTableScanner(SSTableReader sstable, Collection> tokenRanges, RateLimiter limiter) + { + assert sstable != null; - sstable.acquireReference(); + + this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); + this.ifile = sstable.openIndexReader(); + this.sstable = sstable; + this.dataRange = null; + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + + List> normalized = Range.normalize(tokenRanges); + List> boundsList = new ArrayList<>(normalized.size()); + for (Range range : normalized) + boundsList.add(new Range(range.left.maxKeyBound(), range.right.maxKeyBound())); + + this.rangeIterator = boundsList.iterator(); + } + + private void seekToCurrentRangeStart() + { + if (currentRange.left.isMinimum()) + return; + + long indexPosition = sstable.getIndexScanPosition(currentRange.left); + // -1 means the key is before everything in the sstable. So just start from the beginning. + if (indexPosition == -1) + { + // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and + // the seeks are no-op anyway if we are. + ifile.seek(0); + dfile.seek(0); + return; + } + + ifile.seek(indexPosition); + try + { + + while (!ifile.isEOF()) + { + indexPosition = ifile.getFilePointer(); + DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + int comparison = indexDecoratedKey.compareTo(currentRange.left); + // because our range start may be inclusive or exclusive, we need to also contains() + // instead of just checking (comparison >= 0) + if (comparison > 0 || currentRange.contains(indexDecoratedKey)) + { + // Found, just read the dataPosition and seek into index and data files + long dataPosition = ifile.readLong(); + ifile.seek(indexPosition); + dfile.seek(dataPosition); + break; + } + else + { + RowIndexEntry.Serializer.skip(ifile); + } + } + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + + public void close() throws IOException + { + FileUtils.close(dfile, ifile); + sstable.releaseReference(); + } + + public long getLengthInBytes() + { + return dfile.length(); + } + + public long getCurrentPosition() + { + return dfile.getFilePointer(); + } + + public String getBackingFiles() + { + return sstable.toString(); + } + + public boolean hasNext() + { + if (iterator == null) + iterator = createIterator(); + return iterator.hasNext(); + } + + public OnDiskAtomIterator next() + { + if (iterator == null) + iterator = createIterator(); + return iterator.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + private Iterator createIterator() + { + return new KeyScanningIterator(); + } + + protected class KeyScanningIterator extends AbstractIterator + { + private DecoratedKey nextKey; + private RowIndexEntry nextEntry; + private DecoratedKey currentKey; + private RowIndexEntry currentEntry; + + protected OnDiskAtomIterator computeNext() + { + try + { + if (nextEntry == null) + { + do + { + // we're starting the first range or we just passed the end of the previous range + if (!rangeIterator.hasNext()) + return endOfData(); + + currentRange = rangeIterator.next(); + seekToCurrentRangeStart(); + + if (ifile.isEOF()) + return endOfData(); + + currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version); + } while (!currentRange.contains(currentKey)); + } + else + { + // we're in the middle of a range + currentKey = nextKey; + currentEntry = nextEntry; + } + + long readEnd; + if (ifile.isEOF()) + { + nextEntry = null; + nextKey = null; + readEnd = dfile.length(); + } + else + { + // we need the position of the start of the next key, regardless of whether it falls in the current range + nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version); + readEnd = nextEntry.position; + + if (!currentRange.contains(nextKey)) + { + nextKey = null; + nextEntry = null; + } + } + + if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey())) + { + dfile.seek(currentEntry.position + currentEntry.headerOffset()); + ByteBufferUtil.readWithShortLength(dfile); // key + return new SSTableIdentityIterator(sstable, dfile, currentKey); + } + + return new LazyColumnIterator(currentKey, new IColumnIteratorFactory() + { + public OnDiskAtomIterator create() + { + return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); + } + }); + + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "(" + + "dfile=" + dfile + + " ifile=" + ifile + + " sstable=" + sstable + + ")"; + } ++ ++ public static class EmptySSTableScanner implements ISSTableScanner ++ { ++ private final String filename; ++ ++ public EmptySSTableScanner(String filename) ++ { ++ this.filename = filename; ++ } ++ ++ public long getLengthInBytes() ++ { ++ return 0; ++ } ++ ++ public long getCurrentPosition() ++ { ++ return 0; ++ } ++ ++ public String getBackingFiles() ++ { ++ return filename; ++ } ++ ++ public boolean hasNext() ++ { ++ return false; ++ } ++ ++ public OnDiskAtomIterator next() ++ { ++ return null; ++ } ++ ++ public void close() throws IOException { } ++ ++ public void remove() { } ++ } ++ ++ +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/SSTableExport.java index 3a04a81,22aebdb..fa6b973 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@@ -22,8 -22,6 +22,7 @@@ import java.io.IOException import java.io.PrintStream; import java.util.*; - import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; import org.apache.cassandra.config.CFMetaData; @@@ -320,10 -318,10 +319,10 @@@ public class SSTableExpor Set excludeSet = new HashSet(); if (excludes != null) - excludeSet = new HashSet(Arrays.asList(excludes)); + excludeSet = new HashSet<>(Arrays.asList(excludes)); SSTableIdentityIterator row; - ICompactionScanner scanner = reader.getScanner(); + ISSTableScanner scanner = reader.getScanner(); try { outs.println("["); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 2396acb,a09d8b4..63a49de --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@@ -46,13 -37,15 +46,13 @@@ import org.apache.cassandra.db.ColumnFa import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.dht.BytesToken; +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; - import org.apache.cassandra.io.sstable.SSTableIdentityIterator; + import org.apache.cassandra.io.sstable.*; -import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; -import org.junit.After; -import org.junit.Test; import com.google.common.collect.Iterables; @@@ -207,38 -150,14 +207,38 @@@ public class AntiCompactionTes List> ranges = Arrays.asList(range); SSTableReader.acquireReferences(sstables); - CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1); - - assertThat(store.getSSTables().size(), is(1)); - assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false)); - assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1)); - assertThat(store.getDataTracker().getCompacting().size(), is(0)); + long repairedAt = 1000; + CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt); + /* + Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time + so there will be no net change in the number of sstables + */ + assertEquals(10, store.getSSTables().size()); + int repairedKeys = 0; + int nonRepairedKeys = 0; + for (SSTableReader sstable : store.getSSTables()) + { - ICompactionScanner scanner = sstable.getScanner(); ++ ISSTableScanner scanner = sstable.getScanner(); + while (scanner.hasNext()) + { + SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + if (sstable.isRepaired()) + { + assertTrue(range.contains(row.getKey().getToken())); + assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt); + repairedKeys++; + } + else + { + assertFalse(range.contains(row.getKey().getToken())); + assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt); + nonRepairedKeys++; + } + } + } + assertEquals(repairedKeys, 40); + assertEquals(nonRepairedKeys, 60); } - @Test public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index faf3808,4659b5c..a6ee3f9 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@@ -189,11 -158,11 +189,11 @@@ public class CompactionsTes // check that the shadowed column is gone SSTableReader sstable = cfs.getSSTables().iterator().next(); Range keyRange = new Range(key, sstable.partitioner.getMinimumToken().maxKeyBound()); - ICompactionScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange)); + ISSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange)); OnDiskAtomIterator iter = scanner.next(); assertEquals(key, iter.getKey()); - assert iter.next() instanceof RangeTombstone; - assert !iter.hasNext(); + assertTrue(iter.next() instanceof RangeTombstone); + assertFalse(iter.hasNext()); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index d49d8bb,4c2236b..1eca4e6 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@@ -36,15 -31,11 +36,16 @@@ import org.junit.runner.RunWith import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.db.*; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.notifications.SSTableAddedNotification; import org.apache.cassandra.notifications.SSTableRepairStatusChanged; import org.apache.cassandra.repair.RepairJobDesc; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java index 924c4b5,678601b..c56c910 --- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java @@@ -29,11 -27,10 +29,12 @@@ import org.junit.runner.RunWith import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.ByteBufferUtil; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 2a0c3e6,03b5553..647a67b --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@@ -59,10 -56,9 +59,9 @@@ import org.apache.cassandra.db.Row import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.compaction.CompactionManager; - import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.composites.Composites; import org.apache.cassandra.dht.LocalPartitioner; -import org.apache.cassandra.dht.LocalToken; +import org.apache.cassandra.dht.LocalPartitioner.LocalToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.FileDataInput; @@@ -370,7 -346,7 +369,7 @@@ public class SSTableReaderTes boolean foundScanner = false; for (SSTableReader s : store.getSSTables()) { - ICompactionScanner scanner = s.getScanner(new Range(t(0), t(1)), null); - ISSTableScanner scanner = s.getScanner(new Range(t(0), t(1), s.partitioner), null); ++ ISSTableScanner scanner = s.getScanner(new Range(t(0), t(1)), null); scanner.next(); // throws exception pre 5407 foundScanner = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 5eae831,ecf97c3..24a0091 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@@ -41,14 -37,10 +40,13 @@@ import org.apache.cassandra.db.Mutation import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionController; - import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.compaction.LazilyCompactedRow; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.db.compaction.SSTableSplitter; -import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java index d4053eb,ff60481..9da895e --- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java @@@ -27,15 -25,12 +27,14 @@@ import org.junit.Test import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; - import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.dht.BytesToken; +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.*; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 4f3d22a,b9a3821..00f07ff --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@@ -25,9 -25,6 +25,8 @@@ import java.util.* import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; - import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil;