cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [5/7] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Date Wed, 28 Jan 2015 15:20:29 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 8b9a88f,0000000..005e4c4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,587 -1,0 +1,587 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.DataInput;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.SegmentedFile;
 +import org.apache.cassandra.io.util.SequentialWriter;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
 +import org.apache.cassandra.utils.IFilter;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.StreamingHistogram;
 +
 +public class BigTableWriter extends SSTableWriter
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
 +
 +    // not very random, but the only value that can't be mistaken for a legal column-name length
 +    public static final int END_OF_ROW = 0x0000;
 +
 +    private IndexWriter iwriter;
 +    private SegmentedFile.Builder dbuilder;
 +    private final SequentialWriter dataFile;
 +    private DecoratedKey lastWrittenKey;
 +    private FileMark dataMark;
 +
 +    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
 +    {
 +        super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
 +
 +        iwriter = new IndexWriter(keyCount);
 +
 +        if (compression)
 +        {
 +            dataFile = SequentialWriter.open(getFilename(),
 +                                             descriptor.filenameFor(Component.COMPRESSION_INFO),
 +                                             metadata.compressionParameters(),
 +                                             metadataCollector);
 +            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
 +        }
 +        else
 +        {
 +            dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
 +            dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +        }
 +    }
 +
 +    public void mark()
 +    {
 +        dataMark = dataFile.mark();
 +        iwriter.mark();
 +    }
 +
 +    public void resetAndTruncate()
 +    {
 +        dataFile.resetAndTruncate(dataMark);
 +        iwriter.resetAndTruncate();
 +    }
 +
 +    /**
 +     * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
 +     */
 +    private long beforeAppend(DecoratedKey decoratedKey)
 +    {
 +        assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
 +        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
 +            throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
 +        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
 +    }
 +
 +    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
 +    {
 +        metadataCollector.addKey(decoratedKey.getKey());
 +        lastWrittenKey = decoratedKey;
 +        last = lastWrittenKey;
 +        if (first == null)
 +            first = lastWrittenKey;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("wrote {} at {}", decoratedKey, dataPosition);
 +        iwriter.append(decoratedKey, index);
 +        dbuilder.addPotentialBoundary(dataPosition);
 +    }
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
 +     */
 +    public RowIndexEntry append(AbstractCompactedRow row)
 +    {
 +        long currentPosition = beforeAppend(row.key);
 +        RowIndexEntry entry;
 +        try
 +        {
 +            entry = row.write(currentPosition, dataFile);
 +            if (entry == null)
 +                return null;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
 +        afterAppend(row.key, currentPosition, entry);
 +        return entry;
 +    }
 +
 +    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
 +    {
 +        if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
 +        {
 +            logger.error("Key size {} exceeds maximum of {}, skipping row",
 +                         decoratedKey.getKey().remaining(),
 +                         FBUtilities.MAX_UNSIGNED_SHORT);
 +            return;
 +        }
 +
 +        long startPosition = beforeAppend(decoratedKey);
 +        try
 +        {
 +            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
 +            afterAppend(decoratedKey, startPosition, entry);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
 +    }
 +
 +    private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
 +    {
 +        assert cf.hasColumns() || cf.isMarkedForDelete();
 +
 +        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
 +        ColumnIndex index = builder.build(cf);
 +
 +        out.writeShort(END_OF_ROW);
 +        return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
 +    }
 +
 +    /**
 +     * @throws IOException if a read from the DataInput fails
 +     * @throws FSWriteError if a write to the dataFile fails
 +     */
 +    public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
 +    {
 +        long currentPosition = beforeAppend(key);
 +
 +        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
 +        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
 +        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 +        List<ByteBuffer> minColumnNames = Collections.emptyList();
 +        List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +        StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
 +        boolean hasLegacyCounterShards = false;
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
 +        cf.delete(DeletionTime.serializer.deserialize(in));
 +
 +        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
 +
 +        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
 +        {
 +            tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +            maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +        }
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
 +        while (rangeTombstoneIterator.hasNext())
 +        {
 +            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
 +            tombstones.update(rangeTombstone.getLocalDeletionTime());
 +            minTimestampTracker.update(rangeTombstone.timestamp());
 +            maxTimestampTracker.update(rangeTombstone.timestamp());
 +            maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
 +            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
 +            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
 +        }
 +
 +        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
 +        try
 +        {
 +            while (iter.hasNext())
 +            {
 +                OnDiskAtom atom = iter.next();
 +                if (atom == null)
 +                    break;
 +
 +                if (atom instanceof CounterCell)
 +                {
 +                    atom = ((CounterCell) atom).markLocalToBeCleared();
 +                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
 +                }
 +
 +                int deletionTime = atom.getLocalDeletionTime();
 +                if (deletionTime < Integer.MAX_VALUE)
 +                    tombstones.update(deletionTime);
 +                minTimestampTracker.update(atom.timestamp());
 +                maxTimestampTracker.update(atom.timestamp());
 +                minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
 +                maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
 +                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
 +
 +                columnIndexer.add(atom); // This write the atom on disk too
 +            }
 +
 +            columnIndexer.maybeWriteEmptyRowHeader();
 +            dataFile.stream.writeShort(END_OF_ROW);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +
 +        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
 +                         .updateMaxTimestamp(maxTimestampTracker.get())
 +                         .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
 +                         .addRowSize(dataFile.getFilePointer() - currentPosition)
 +                         .addColumnCount(columnIndexer.writtenAtomCount())
 +                         .mergeTombstoneHistogram(tombstones)
 +                         .updateMinColumnNames(minColumnNames)
 +                         .updateMaxColumnNames(maxColumnNames)
 +                         .updateHasLegacyCounterShards(hasLegacyCounterShards);
 +
 +        afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
 +        return currentPosition;
 +    }
 +
 +    /**
 +     * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
 +     */
 +    public void abort(boolean closeBf)
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
 +
 +        if (iwriter != null)
 +            iwriter.abort(closeBf);
 +
 +        if (dataFile!= null)
 +            dataFile.abort();
 +
 +        Set<Component> components = SSTable.componentsFor(descriptor);
 +        try
 +        {
 +            if (!components.isEmpty())
 +                SSTable.delete(descriptor, components);
 +        }
 +        catch (FSWriteError e)
 +        {
 +            logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
 +            throw e;
 +        }
 +    }
 +
 +    // we use this method to ensure any managed data we may have retained references to during the write are no
 +    // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
 +    public void isolateReferences()
 +    {
 +        // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
 +        // data retention is done through copying
 +        first = getMinimalKey(first);
 +        last = lastWrittenKey = getMinimalKey(last);
 +    }
 +
 +    private Descriptor makeTmpLinks()
 +    {
 +        // create temp links if they don't already exist
 +        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
 +        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
 +        {
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
 +        }
 +        return link;
 +    }
 +
 +    public SSTableReader openEarly(long maxDataAge)
 +    {
 +        StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                                  metadata.getBloomFilterFpChance(),
 +                                                  repairedAt).get(MetadataType.STATS);
 +
 +        // find the max (exclusive) readable key
 +        DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
 +        if (exclusiveUpperBoundOfReadableIndex == null)
 +            return null;
 +
 +        Descriptor link = makeTmpLinks();
 +        // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
 +        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
 +        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
 +        SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
 +                                                           components, metadata,
 +                                                           partitioner, ifile,
 +                                                           dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
 +                                                           iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 +
 +        // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
 +        DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
 +        if (inclusiveUpperBoundOfReadableData == null)
 +        {
 +            // Prevent leaving tmplink files on disk
-             sstable.releaseReference();
++            sstable.sharedRef().release();
 +            return null;
 +        }
 +        int offset = 2;
 +        while (true)
 +        {
 +            RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
 +            if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
 +                break;
 +            inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
 +            if (inclusiveUpperBoundOfReadableData == null)
 +            {
-                 sstable.releaseReference();
++                sstable.sharedRef().release();
 +                return null;
 +            }
 +        }
 +        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
 +        return sstable;
 +    }
 +
 +    public SSTableReader closeAndOpenReader()
 +    {
 +        return closeAndOpenReader(System.currentTimeMillis());
 +    }
 +
 +    public SSTableReader closeAndOpenReader(long maxDataAge)
 +    {
 +        return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
 +    }
 +
 +    public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
 +    {
 +        Pair<Descriptor, StatsMetadata> p;
 +
 +        p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt);
 +        Descriptor desc = p.left;
 +        StatsMetadata metadata = p.right;
 +
 +        if (finishType == FinishType.EARLY)
 +            desc = makeTmpLinks();
 +
 +        // finalize in-memory state for the reader
 +        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
 +        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
 +        SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
 +                                                           components,
 +                                                           this.metadata,
 +                                                           partitioner,
 +                                                           ifile,
 +                                                           dfile,
 +                                                           iwriter.summary.build(partitioner),
 +                                                           iwriter.bf,
 +                                                           maxDataAge,
 +                                                           metadata,
 +                                                           finishType.openReason);
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(last);
 +
 +        switch (finishType)
 +        {
 +            case NORMAL: case FINISH_EARLY:
 +            // try to save the summaries to disk
 +            sstable.saveSummary(iwriter.builder, dbuilder);
 +            iwriter = null;
 +            dbuilder = null;
 +        }
 +        return sstable;
 +    }
 +
 +    // Close the writer and return the descriptor to the new sstable and it's metadata
 +    public Pair<Descriptor, StatsMetadata> close()
 +    {
 +        return close(FinishType.NORMAL, this.repairedAt);
 +    }
 +
 +    private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
 +    {
 +        switch (type)
 +        {
 +            case EARLY: case NORMAL:
 +            iwriter.close();
 +            dataFile.close();
 +        }
 +
 +        // write sstable statistics
 +        Map<MetadataType, MetadataComponent> metadataComponents;
 +        metadataComponents = metadataCollector
 +                             .finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                               metadata.getBloomFilterFpChance(),repairedAt);
 +
 +        // remove the 'tmp' marker from all components
 +        Descriptor descriptor = this.descriptor;
 +        switch (type)
 +        {
 +            case NORMAL: case FINISH_EARLY:
 +            dataFile.writeFullChecksum(descriptor);
 +            writeMetadata(descriptor, metadataComponents);
 +            // save the table of components
 +            SSTable.appendTOC(descriptor, components);
 +            descriptor = rename(descriptor, components);
 +        }
 +
 +        return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
 +    }
 +
 +    private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
 +    {
 +        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
 +        try
 +        {
 +            desc.getMetadataSerializer().serialize(components, out.stream);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, out.getPath());
 +        }
 +        finally
 +        {
 +            out.close();
 +        }
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return dataFile.getFilePointer();
 +    }
 +
 +    public long getOnDiskFilePointer()
 +    {
 +        return dataFile.getOnDiskFilePointer();
 +    }
 +
 +    /**
 +     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
 +     */
 +    class IndexWriter
 +    {
 +        private final SequentialWriter indexFile;
 +        public final SegmentedFile.Builder builder;
 +        public final IndexSummaryBuilder summary;
 +        public final IFilter bf;
 +        private FileMark mark;
 +
 +        IndexWriter(long keyCount)
 +        {
 +            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +            summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
 +            bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
 +        }
 +
 +        // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
 +        DecoratedKey getMaxReadableKey(int offset)
 +        {
 +            long maxIndexLength = indexFile.getLastFlushOffset();
 +            return summary.getMaxReadableKey(maxIndexLength, offset);
 +        }
 +
 +        public void append(DecoratedKey key, RowIndexEntry indexEntry)
 +        {
 +            bf.add(key.getKey());
 +            long indexPosition = indexFile.getFilePointer();
 +            try
 +            {
 +                ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
 +                rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new FSWriteError(e, indexFile.getPath());
 +            }
 +
 +            if (logger.isTraceEnabled())
 +                logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition);
 +
 +            summary.maybeAddEntry(key, indexPosition);
 +            builder.addPotentialBoundary(indexPosition);
 +        }
 +
 +        public void abort(boolean closeBf)
 +        {
 +            indexFile.abort();
 +            if (closeBf)
 +                bf.close();
 +        }
 +
 +        /**
 +         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
 +         */
 +        public void close()
 +        {
 +            if (components.contains(Component.FILTER))
 +            {
 +                String path = descriptor.filenameFor(Component.FILTER);
 +                try
 +                {
 +                    // bloom filter
 +                    FileOutputStream fos = new FileOutputStream(path);
 +                    DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos);
 +                    FilterFactory.serialize(bf, stream);
 +                    stream.flush();
 +                    fos.getFD().sync();
 +                    stream.close();
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new FSWriteError(e, path);
 +                }
 +            }
 +
 +            // index
 +            long position = indexFile.getFilePointer();
 +            indexFile.close(); // calls force
 +            FileUtils.truncate(indexFile.getPath(), position);
 +        }
 +
 +        public void mark()
 +        {
 +            mark = indexFile.mark();
 +        }
 +
 +        public void resetAndTruncate()
 +        {
 +            // we can't un-set the bloom filter addition, but extra keys in there are harmless.
 +            // we can't reset dbuilder either, but that is the last thing called in afterappend so
 +            // we assume that if that worked then we won't be trying to reset.
 +            indexFile.resetAndTruncate(mark);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 35ddeef,1c5138b..7c7b0b6
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -25,12 -25,10 +25,13 @@@ import java.util.*
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  
+ import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Multimap;
  import com.google.common.collect.Sets;
 +import com.google.common.util.concurrent.ListeningExecutorService;
 +import com.google.common.util.concurrent.MoreExecutors;
  
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -48,12 -48,18 +49,16 @@@ import org.apache.cassandra.net.IAsyncC
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.repair.*;
 -import org.apache.cassandra.repair.messages.AnticompactionRequest;
 -import org.apache.cassandra.repair.messages.PrepareMessage;
 -import org.apache.cassandra.repair.messages.RepairMessage;
 -import org.apache.cassandra.repair.messages.SyncComplete;
 -import org.apache.cassandra.repair.messages.ValidationComplete;
 +import org.apache.cassandra.repair.RepairJobDesc;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.repair.RepairSession;
 +import org.apache.cassandra.repair.messages.*;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.UUIDGen;
+ import org.apache.cassandra.utils.concurrent.Ref;
+ import org.apache.cassandra.utils.concurrent.RefCounted;
+ 
+ import org.apache.cassandra.utils.concurrent.Refs;
  
  /**
   * ActiveRepairService is the starting point for manual "active" repairs.
@@@ -335,17 -376,13 +340,17 @@@ public class ActiveRepairServic
      {
          assert parentRepairSession != null;
          ParentRepairSession prs = getParentRepairSession(parentRepairSession);
 +        assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges";
  
          List<Future<?>> futures = new ArrayList<>();
 +        // if we don't have successful repair ranges, then just skip anticompaction
 +        if (successfulRanges.isEmpty())
 +            return futures;
          for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
          {
-             Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
+             Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
              ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
 -            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
 +            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt));
          }
  
          return futures;
@@@ -386,20 -422,11 +391,20 @@@
              for (ColumnFamilyStore cfs : columnFamilyStores)
                  this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
              this.ranges = ranges;
 -            this.sstableMap = sstables;
              this.repairedAt = repairedAt;
 +            this.isIncremental = isIncremental;
 +        }
 +
 +        public void addSSTables(UUID cfId, Set<SSTableReader> sstables)
 +        {
 +            Set<SSTableReader> existingSSTables = this.sstableMap.get(cfId);
 +            if (existingSSTables == null)
 +                existingSSTables = new HashSet<>();
 +            existingSSTables.addAll(sstables);
 +            this.sstableMap.put(cfId, existingSSTables);
          }
  
-         public synchronized Collection<SSTableReader> getAndReferenceSSTables(UUID cfId)
+         public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
          {
              Set<SSTableReader> sstables = sstableMap.get(cfId);
              Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@@ -412,27 -440,16 +418,29 @@@
                  }
                  else
                  {
-                     if (!sstable.acquireReference())
+                     Ref ref = sstable.tryRef();
+                     if (ref == null)
                          sstableIterator.remove();
+                     else
+                         references.put(sstable, ref);
                  }
              }
-             return sstables;
+             return new Refs<>(references.build());
          }
  
-         public synchronized Set<SSTableReader> getAndReferenceSSTablesInRange(UUID cfId, Range<Token> range)
++        public synchronized Refs<SSTableReader> getAndReferenceSSTablesInRange(UUID cfId, Range<Token> range)
 +        {
-             Collection<SSTableReader> allSSTables = getAndReferenceSSTables(cfId);
-             Set<SSTableReader> sstables = new HashSet<>();
-             for (SSTableReader sstable : allSSTables)
++            Refs<SSTableReader> sstables = getAndReferenceSSTables(cfId);
++            for (SSTableReader sstable : new ArrayList<>(sstables))
 +            {
 +                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Arrays.asList(range)))
 +                    sstables.add(sstable);
 +                else
-                     sstable.releaseReference();
++                    sstables.release(sstable);
 +            }
 +            return sstables;
 +        }
 +
          @Override
          public String toString()
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index d018786,44b83f9..3186291
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -31,10 -31,13 +31,12 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
 -import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.Pair;
  
+ import org.apache.cassandra.utils.concurrent.Refs;
+ 
  /**
   * Task that manages receiving files for the session for certain ColumnFamily.
   */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 454629e,6108dea..ccc2f89
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -294,9 -291,9 +298,9 @@@ public class StreamSession implements I
          return stores;
      }
  
 -    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental)
 +    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt)
      {
-         List<SSTableReader> sstables = new ArrayList<>();
+         Refs<SSTableReader> refs = new Refs<>();
          try
          {
              for (ColumnFamilyStore cfStore : stores)
@@@ -304,11 -301,11 +308,11 @@@
                  List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
                  for (Range<Token> range : ranges)
                      rowBoundsList.add(range.toRowBounds());
-                 ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
-                 sstables.addAll(view.sstables);
 -                refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs);
++                refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)).refs);
              }
-             List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
-             for (SSTableReader sstable : sstables)
+ 
+             List<SSTableStreamingSections> sections = new ArrayList<>(refs.size());
+             for (SSTableReader sstable : refs)
              {
                  long repairedAt = overriddenRepairedAt;
                  if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index a3dd10f,b00042e..30c77e9
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -23,9 -23,11 +23,11 @@@ import java.util.concurrent.ScheduledFu
  import java.util.concurrent.atomic.AtomicInteger;
  
  import org.apache.cassandra.concurrent.NamedThreadFactory;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
  import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.concurrent.Ref;
+ import org.apache.cassandra.utils.concurrent.RefCounted;
  
  /**
   * StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
@@@ -47,10 -49,10 +49,10 @@@ public class StreamTransferTask extend
          super(session, cfId);
      }
  
-     public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+     public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
      {
          assert sstable != null && cfId.equals(sstable.metadata.cfId);
-         OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel());
 -        OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
++        OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel());
          files.put(message.header.sequenceNumber, message);
          totalSize += message.header.size();
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index ed4c4ce,5ebf289..43db5af
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@@ -57,10 -60,11 +59,11 @@@ public class OutgoingFileMessage extend
          }
      };
  
-     public FileMessageHeader header;
-     public SSTableReader sstable;
+     public final FileMessageHeader header;
+     public final SSTableReader sstable;
+     public final Ref ref;
  
-     public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
 -    public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
++    public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
      {
          super(Type.FILE);
          this.sstable = sstable;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/SchemaLoader.java
index f7b10a2,ce65d5a..60f4703
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@@ -56,8 -58,22 +58,17 @@@ public class SchemaLoade
          // Migrations aren't happy if gossiper is not started.  Even if we don't use migrations though,
          // some tests now expect us to start gossip for them.
          startGossiper();
 -
 -        // if you're messing with low-level sstable stuff, it can be useful to inject the schema directly
 -        // Schema.instance.load(schemaDefinition());
 -        for (KSMetaData ksm : schemaDefinition())
 -            MigrationManager.announceNewKeyspace(ksm);
      }
  
+     @After
+     public void leakDetect() throws InterruptedException
+     {
+         System.gc();
+         System.gc();
+         System.gc();
+         Thread.sleep(10);
+     }
+ 
      public static void prepareServer()
      {
          // Cleanup first

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/KeyCacheTest.java
index c370e4f,1f7024e..9695b4a
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@@ -43,11 -38,12 +43,12 @@@ import org.apache.cassandra.locator.Sim
  import org.apache.cassandra.service.CacheService;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
+ import org.apache.cassandra.utils.concurrent.Refs;
  import static org.junit.Assert.assertEquals;
  
 -public class KeyCacheTest extends SchemaLoader
 +public class KeyCacheTest
  {
 -    private static final String KEYSPACE1 = "KeyCacheSpace";
 +    private static final String KEYSPACE1 = "KeyCacheTest1";
      private static final String COLUMN_FAMILY1 = "Standard1";
      private static final String COLUMN_FAMILY2 = "Standard2";
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6ec56c7,d9442c7..018d643
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -17,6 -17,7 +17,8 @@@
   */
  package org.apache.cassandra.db.compaction;
  
++import junit.framework.Assert;
+ import org.apache.cassandra.utils.concurrent.Refs;
  import static org.hamcrest.CoreMatchers.is;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
@@@ -89,9 -64,11 +91,9 @@@ public class AntiCompactionTes
          Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
          List<Range<Token>> ranges = Arrays.asList(range);
  
-         SSTableReader.acquireReferences(sstables);
 -        Refs<SSTableReader> refs = Refs.tryRef(sstables);
 -        if (refs == null)
 -            throw new IllegalStateException();
++        Refs<SSTableReader> refs = Refs.ref(sstables);
          long repairedAt = 1000;
-         CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
+         CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
  
          assertEquals(2, store.getSSTables().size());
          int repairedKeys = 0;
@@@ -162,85 -142,27 +163,87 @@@
          return writer.closeAndOpenReader();
      }
  
 +    public void generateSStable(ColumnFamilyStore store, String Suffix)
 +    {
 +    long timestamp = System.currentTimeMillis();
 +    for (int i = 0; i < 10; i++)
 +        {
 +            DecoratedKey key = Util.dk(Integer.toString(i) + "-" + Suffix);
 +            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
 +            for (int j = 0; j < 10; j++)
 +                rm.add("Standard1", Util.cellname(Integer.toString(j)),
 +                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
 +                        timestamp,
 +                        0);
 +            rm.apply();
 +        }
 +        store.forceBlockingFlush();
 +    }
 +
 +    @Test
 +    public void antiCompactTenSTC() throws InterruptedException, IOException{
 +        antiCompactTen("SizeTieredCompactionStrategy");
 +    }
 +
      @Test
 -    public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
 +    public void antiCompactTenLC() throws InterruptedException, IOException{
 +        antiCompactTen("LeveledCompactionStrategy");
 +    }
 +
 +    public void antiCompactTen(String compactionStrategy) throws InterruptedException, IOException
      {
 -        ColumnFamilyStore store = prepareColumnFamilyStore();
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        store.setCompactionStrategyClass(compactionStrategy);
 +        store.disableAutoCompaction();
 +
 +        for (int table = 0; table < 10; table++)
 +        {
 +            generateSStable(store,Integer.toString(table));
 +        }
          Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
          assertEquals(store.getSSTables().size(), sstables.size());
 -        Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
 +
 +        Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
          List<Range<Token>> ranges = Arrays.asList(range);
  
-         SSTableReader.acquireReferences(sstables);
+         Refs<SSTableReader> refs = Refs.tryRef(sstables);
 -        if (refs == null)
 -            throw new IllegalStateException();
 -        CompactionManager.instance.performAnticompaction(store, ranges, refs, 1);
 -        assertThat(store.getSSTables().size(), is(1));
 -        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 -        assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1));
 -        assertThat(store.getDataTracker().getCompacting().size(), is(0));
++        Assert.assertNotNull(refs);
 +        long repairedAt = 1000;
-         CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
++        CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
 +        /*
 +        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
 +        so there will be no net change in the number of sstables
 +         */
 +        assertEquals(10, store.getSSTables().size());
 +        int repairedKeys = 0;
 +        int nonRepairedKeys = 0;
 +        for (SSTableReader sstable : store.getSSTables())
 +        {
 +            ISSTableScanner scanner = sstable.getScanner();
 +            while (scanner.hasNext())
 +            {
 +                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 +                if (sstable.isRepaired())
 +                {
 +                    assertTrue(range.contains(row.getKey().getToken()));
 +                    assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
 +                    repairedKeys++;
 +                }
 +                else
 +                {
 +                    assertFalse(range.contains(row.getKey().getToken()));
 +                    assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
 +                    nonRepairedKeys++;
 +                }
 +            }
 +        }
 +        assertEquals(repairedKeys, 40);
 +        assertEquals(nonRepairedKeys, 60);
      }
+ 
      @Test
 -    public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
 +    public void shouldMutateRepairedAt() throws InterruptedException, IOException
      {
          ColumnFamilyStore store = prepareColumnFamilyStore();
          Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
@@@ -258,30 -179,6 +260,30 @@@
      }
  
  
 +    @Test
 +    public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        store.disableAutoCompaction();
 +
 +        for (int table = 0; table < 10; table++)
 +        {
 +            generateSStable(store,Integer.toString(table));
 +        }
 +        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
 +        assertEquals(store.getSSTables().size(), sstables.size());
 +        
 +        Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
 +        List<Range<Token>> ranges = Arrays.asList(range);
 +
-         SSTableReader.acquireReferences(sstables);
-         CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
++        Refs<SSTableReader> refs = Refs.ref(sstables);
++        CompactionManager.instance.performAnticompaction(store, ranges, refs, 0);
 +
 +        assertThat(store.getSSTables().size(), is(10));
 +        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 +    }
 +
      private ColumnFamilyStore prepareColumnFamilyStore()
      {
          Keyspace keyspace = Keyspace.open(KEYSPACE1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
index 1116c9e,08e3fb3..5420b1b
--- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
@@@ -26,7 -26,8 +26,9 @@@ import java.util.Collection
  import java.util.HashSet;
  import java.util.Set;
  
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.junit.After;
+ import org.junit.AfterClass;
  import org.junit.BeforeClass;
  import org.junit.Test;
  
@@@ -43,22 -42,20 +45,31 @@@ import static org.junit.Assert.assertEq
  import static org.junit.Assert.assertNotNull;
  import static org.apache.cassandra.Util.cellname;
  
 -public class BlacklistingCompactionsTest extends SchemaLoader
 +public class BlacklistingCompactionsTest
  {
 -    public static final String KEYSPACE = "Keyspace1";
 +    private static final String KEYSPACE1 = "BlacklistingCompactionsTest";
 +    private static final String CF_STANDARD1 = "Standard1";
  
+     @After
+     public void leakDetect() throws InterruptedException
+     {
+         System.gc();
+         System.gc();
+         System.gc();
+         Thread.sleep(10);
+     }
+ 
      @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
 +        closeStdErr();
 +    }
 +
      public static void closeStdErr()
      {
          // These tests generate an error message per CorruptSSTableException since it goes through

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 3703d54,acf8c90..97625f4
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -224,17 -197,13 +224,17 @@@ public class SSTableRewriterTest extend
          for (int i = 500; i < 1000; i++)
              writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
          SSTableReader s2 = writer.openEarly(1000);
 +
          assertTrue(s != s2);
          assertFileCounts(dir.list(), 2, 3);
 -        s.markObsolete();
 +
 +        s.setReplacedBy(s2);
 +        s2.markObsolete();
-         s.releaseReference();
-         s2.releaseReference();
+         s.sharedRef().release();
 -        Thread.sleep(1000);
 -        assertFileCounts(dir.list(), 0, 3);
++        s2.sharedRef().release();
 +
          writer.abort(false);
 +
          Thread.sleep(1000);
          int datafiles = assertFileCounts(dir.list(), 0, 0);
          assertEquals(datafiles, 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------


Mime
View raw message