Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5CFD117881 for ; Wed, 28 Jan 2015 15:20:25 +0000 (UTC) Received: (qmail 88843 invoked by uid 500); 28 Jan 2015 15:20:25 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 88674 invoked by uid 500); 28 Jan 2015 15:20:25 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 88414 invoked by uid 99); 28 Jan 2015 15:20:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jan 2015 15:20:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35071E0D5F; Wed, 28 Jan 2015 15:20:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Wed, 28 Jan 2015 15:20:29 -0000 Message-Id: <2ebf2da4b71647d8b7ad26d6dc57c900@git.apache.org> In-Reply-To: <0ed75320de5147da8b51a02ab2b8ee23@git.apache.org> References: <0ed75320de5147da8b51a02ab2b8ee23@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/7] cassandra git commit: Merge branch 'cassandra-2.1' into trunk http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 8b9a88f,0000000..005e4c4 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@@ -1,587 -1,0 +1,587 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.DataInput; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.FilterFactory; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.StreamingHistogram; + +public class BigTableWriter extends SSTableWriter +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class); + + // not very random, but the only value that can't be mistaken for a legal column-name length + public static final int END_OF_ROW = 0x0000; + + private IndexWriter iwriter; + private SegmentedFile.Builder dbuilder; + private final SequentialWriter dataFile; + private DecoratedKey lastWrittenKey; + private FileMark dataMark; + + BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + { + super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); + + iwriter = new IndexWriter(keyCount); + + if (compression) + { + dataFile = SequentialWriter.open(getFilename(), + descriptor.filenameFor(Component.COMPRESSION_INFO), + metadata.compressionParameters(), + metadataCollector); + dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile); + } + else + { + dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC))); + dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + } + } + + public void mark() + { + dataMark = dataFile.mark(); + iwriter.mark(); + } + + public void resetAndTruncate() + { + dataFile.resetAndTruncate(dataMark); + iwriter.resetAndTruncate(); + } + + /** + * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written + */ + private long beforeAppend(DecoratedKey decoratedKey) + { + assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values + if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0) + throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename()); + return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); + } + + private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index) + { + metadataCollector.addKey(decoratedKey.getKey()); + lastWrittenKey = decoratedKey; + last = lastWrittenKey; + if (first == null) + first = lastWrittenKey; + + if (logger.isTraceEnabled()) + logger.trace("wrote {} at {}", decoratedKey, dataPosition); + iwriter.append(decoratedKey, index); + dbuilder.addPotentialBoundary(dataPosition); + } + + /** + * @param row + * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row + */ + public RowIndexEntry append(AbstractCompactedRow row) + { + long currentPosition = beforeAppend(row.key); + RowIndexEntry entry; + try + { + entry = row.write(currentPosition, dataFile); + if (entry == null) + return null; + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); + afterAppend(row.key, currentPosition, entry); + return entry; + } + + public void append(DecoratedKey decoratedKey, ColumnFamily cf) + { + if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT) + { + logger.error("Key size {} exceeds maximum of {}, skipping row", + decoratedKey.getKey().remaining(), + FBUtilities.MAX_UNSIGNED_SHORT); + return; + } + + long startPosition = beforeAppend(decoratedKey); + try + { + RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream); + afterAppend(decoratedKey, startPosition, entry); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); + } + + private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException + { + assert cf.hasColumns() || cf.isMarkedForDelete(); + + ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out); + ColumnIndex index = builder.build(cf); + + out.writeShort(END_OF_ROW); + return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index); + } + + /** + * @throws IOException if a read from the DataInput fails + * @throws FSWriteError if a write to the dataFile fails + */ + public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException + { + long currentPosition = beforeAppend(key); + + ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE); + ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE); + ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); + List minColumnNames = Collections.emptyList(); + List maxColumnNames = Collections.emptyList(); + StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); + boolean hasLegacyCounterShards = false; + + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); + cf.delete(DeletionTime.serializer.deserialize(in)); + + ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream); + + if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE) + { + tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); + maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); + minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); + maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); + } + + Iterator rangeTombstoneIterator = cf.deletionInfo().rangeIterator(); + while (rangeTombstoneIterator.hasNext()) + { + RangeTombstone rangeTombstone = rangeTombstoneIterator.next(); + tombstones.update(rangeTombstone.getLocalDeletionTime()); + minTimestampTracker.update(rangeTombstone.timestamp()); + maxTimestampTracker.update(rangeTombstone.timestamp()); + maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime()); + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator); + } + + Iterator iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator); + try + { + while (iter.hasNext()) + { + OnDiskAtom atom = iter.next(); + if (atom == null) + break; + + if (atom instanceof CounterCell) + { + atom = ((CounterCell) atom).markLocalToBeCleared(); + hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards(); + } + + int deletionTime = atom.getLocalDeletionTime(); + if (deletionTime < Integer.MAX_VALUE) + tombstones.update(deletionTime); + minTimestampTracker.update(atom.timestamp()); + maxTimestampTracker.update(atom.timestamp()); + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator); + maxDeletionTimeTracker.update(atom.getLocalDeletionTime()); + + columnIndexer.add(atom); // This write the atom on disk too + } + + columnIndexer.maybeWriteEmptyRowHeader(); + dataFile.stream.writeShort(END_OF_ROW); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + + metadataCollector.updateMinTimestamp(minTimestampTracker.get()) + .updateMaxTimestamp(maxTimestampTracker.get()) + .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get()) + .addRowSize(dataFile.getFilePointer() - currentPosition) + .addColumnCount(columnIndexer.writtenAtomCount()) + .mergeTombstoneHistogram(tombstones) + .updateMinColumnNames(minColumnNames) + .updateMaxColumnNames(maxColumnNames) + .updateHasLegacyCounterShards(hasLegacyCounterShards); + + afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build())); + return currentPosition; + } + + /** + * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable + */ + public void abort(boolean closeBf) + { + assert descriptor.type.isTemporary; + if (iwriter == null && dataFile == null) + return; + + if (iwriter != null) + iwriter.abort(closeBf); + + if (dataFile!= null) + dataFile.abort(); + + Set components = SSTable.componentsFor(descriptor); + try + { + if (!components.isEmpty()) + SSTable.delete(descriptor, components); + } + catch (FSWriteError e) + { + logger.error(String.format("Failed deleting temp components for %s", descriptor), e); + throw e; + } + } + + // we use this method to ensure any managed data we may have retained references to during the write are no + // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction + public void isolateReferences() + { + // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other + // data retention is done through copying + first = getMinimalKey(first); + last = lastWrittenKey = getMinimalKey(last); + } + + private Descriptor makeTmpLinks() + { + // create temp links if they don't already exist + Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK); + if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists()) + { + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX))); + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA))); + } + return link; + } + + public SSTableReader openEarly(long maxDataAge) + { + StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), + metadata.getBloomFilterFpChance(), + repairedAt).get(MetadataType.STATS); + + // find the max (exclusive) readable key + DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0); + if (exclusiveUpperBoundOfReadableIndex == null) + return null; + + Descriptor link = makeTmpLinks(); + // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers + SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY); + SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY); + SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL), + components, metadata, + partitioner, ifile, + dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex), + iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY); + + // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex); + DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1); + if (inclusiveUpperBoundOfReadableData == null) + { + // Prevent leaving tmplink files on disk - sstable.releaseReference(); ++ sstable.sharedRef().release(); + return null; + } + int offset = 2; + while (true) + { + RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT); + if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset()) + break; + inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++); + if (inclusiveUpperBoundOfReadableData == null) + { - sstable.releaseReference(); ++ sstable.sharedRef().release(); + return null; + } + } + sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData); + return sstable; + } + + public SSTableReader closeAndOpenReader() + { + return closeAndOpenReader(System.currentTimeMillis()); + } + + public SSTableReader closeAndOpenReader(long maxDataAge) + { + return finish(FinishType.NORMAL, maxDataAge, this.repairedAt); + } + + public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt) + { + Pair p; + + p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt); + Descriptor desc = p.left; + StatsMetadata metadata = p.right; + + if (finishType == FinishType.EARLY) + desc = makeTmpLinks(); + + // finalize in-memory state for the reader + SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType); + SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType); + SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL), + components, + this.metadata, + partitioner, + ifile, + dfile, + iwriter.summary.build(partitioner), + iwriter.bf, + maxDataAge, + metadata, + finishType.openReason); + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(last); + + switch (finishType) + { + case NORMAL: case FINISH_EARLY: + // try to save the summaries to disk + sstable.saveSummary(iwriter.builder, dbuilder); + iwriter = null; + dbuilder = null; + } + return sstable; + } + + // Close the writer and return the descriptor to the new sstable and it's metadata + public Pair close() + { + return close(FinishType.NORMAL, this.repairedAt); + } + + private Pair close(FinishType type, long repairedAt) + { + switch (type) + { + case EARLY: case NORMAL: + iwriter.close(); + dataFile.close(); + } + + // write sstable statistics + Map metadataComponents; + metadataComponents = metadataCollector + .finalizeMetadata(partitioner.getClass().getCanonicalName(), + metadata.getBloomFilterFpChance(),repairedAt); + + // remove the 'tmp' marker from all components + Descriptor descriptor = this.descriptor; + switch (type) + { + case NORMAL: case FINISH_EARLY: + dataFile.writeFullChecksum(descriptor); + writeMetadata(descriptor, metadataComponents); + // save the table of components + SSTable.appendTOC(descriptor, components); + descriptor = rename(descriptor, components); + } + + return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS)); + } + + private static void writeMetadata(Descriptor desc, Map components) + { + SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS))); + try + { + desc.getMetadataSerializer().serialize(components, out.stream); + } + catch (IOException e) + { + throw new FSWriteError(e, out.getPath()); + } + finally + { + out.close(); + } + } + + public long getFilePointer() + { + return dataFile.getFilePointer(); + } + + public long getOnDiskFilePointer() + { + return dataFile.getOnDiskFilePointer(); + } + + /** + * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. + */ + class IndexWriter + { + private final SequentialWriter indexFile; + public final SegmentedFile.Builder builder; + public final IndexSummaryBuilder summary; + public final IFilter bf; + private FileMark mark; + + IndexWriter(long keyCount) + { + indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL); + bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true); + } + + // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file + DecoratedKey getMaxReadableKey(int offset) + { + long maxIndexLength = indexFile.getLastFlushOffset(); + return summary.getMaxReadableKey(maxIndexLength, offset); + } + + public void append(DecoratedKey key, RowIndexEntry indexEntry) + { + bf.add(key.getKey()); + long indexPosition = indexFile.getFilePointer(); + try + { + ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream); + rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream); + } + catch (IOException e) + { + throw new FSWriteError(e, indexFile.getPath()); + } + + if (logger.isTraceEnabled()) + logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition); + + summary.maybeAddEntry(key, indexPosition); + builder.addPotentialBoundary(indexPosition); + } + + public void abort(boolean closeBf) + { + indexFile.abort(); + if (closeBf) + bf.close(); + } + + /** + * Closes the index and bloomfilter, making the public state of this writer valid for consumption. + */ + public void close() + { + if (components.contains(Component.FILTER)) + { + String path = descriptor.filenameFor(Component.FILTER); + try + { + // bloom filter + FileOutputStream fos = new FileOutputStream(path); + DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos); + FilterFactory.serialize(bf, stream); + stream.flush(); + fos.getFD().sync(); + stream.close(); + } + catch (IOException e) + { + throw new FSWriteError(e, path); + } + } + + // index + long position = indexFile.getFilePointer(); + indexFile.close(); // calls force + FileUtils.truncate(indexFile.getPath(), position); + } + + public void mark() + { + mark = indexFile.mark(); + } + + public void resetAndTruncate() + { + // we can't un-set the bloom filter addition, but extra keys in there are harmless. + // we can't reset dbuilder either, but that is the last thing called in afterappend so + // we assume that if that worked then we won't be trying to reset. + indexFile.resetAndTruncate(mark); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 35ddeef,1c5138b..7c7b0b6 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -25,12 -25,10 +25,13 @@@ import java.util.* import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -48,12 -48,18 +49,16 @@@ import org.apache.cassandra.net.IAsyncC import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.*; -import org.apache.cassandra.repair.messages.AnticompactionRequest; -import org.apache.cassandra.repair.messages.PrepareMessage; -import org.apache.cassandra.repair.messages.RepairMessage; -import org.apache.cassandra.repair.messages.SyncComplete; -import org.apache.cassandra.repair.messages.ValidationComplete; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.repair.RepairSession; +import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; + import org.apache.cassandra.utils.concurrent.Ref; + import org.apache.cassandra.utils.concurrent.RefCounted; + + import org.apache.cassandra.utils.concurrent.Refs; /** * ActiveRepairService is the starting point for manual "active" repairs. @@@ -335,17 -376,13 +340,17 @@@ public class ActiveRepairServic { assert parentRepairSession != null; ParentRepairSession prs = getParentRepairSession(parentRepairSession); + assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges"; List> futures = new ArrayList<>(); + // if we don't have successful repair ranges, then just skip anticompaction + if (successfulRanges.isEmpty()) + return futures; for (Map.Entry columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) { - Collection sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey())); + Refs sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()); ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); - futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt)); + futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt)); } return futures; @@@ -386,20 -422,11 +391,20 @@@ for (ColumnFamilyStore cfs : columnFamilyStores) this.columnFamilyStores.put(cfs.metadata.cfId, cfs); this.ranges = ranges; - this.sstableMap = sstables; this.repairedAt = repairedAt; + this.isIncremental = isIncremental; + } + + public void addSSTables(UUID cfId, Set sstables) + { + Set existingSSTables = this.sstableMap.get(cfId); + if (existingSSTables == null) + existingSSTables = new HashSet<>(); + existingSSTables.addAll(sstables); + this.sstableMap.put(cfId, existingSSTables); } - public synchronized Collection getAndReferenceSSTables(UUID cfId) + public synchronized Refs getAndReferenceSSTables(UUID cfId) { Set sstables = sstableMap.get(cfId); Iterator sstableIterator = sstables.iterator(); @@@ -412,27 -440,16 +418,29 @@@ } else { - if (!sstable.acquireReference()) + Ref ref = sstable.tryRef(); + if (ref == null) sstableIterator.remove(); + else + references.put(sstable, ref); } } - return sstables; + return new Refs<>(references.build()); } - public synchronized Set getAndReferenceSSTablesInRange(UUID cfId, Range range) ++ public synchronized Refs getAndReferenceSSTablesInRange(UUID cfId, Range range) + { - Collection allSSTables = getAndReferenceSSTables(cfId); - Set sstables = new HashSet<>(); - for (SSTableReader sstable : allSSTables) ++ Refs sstables = getAndReferenceSSTables(cfId); ++ for (SSTableReader sstable : new ArrayList<>(sstables)) + { + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Arrays.asList(range))) + sstables.add(sstable); + else - sstable.releaseReference(); ++ sstables.release(sstable); + } + return sstables; + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index d018786,44b83f9..3186291 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@@ -31,10 -31,13 +31,12 @@@ import org.apache.cassandra.concurrent. import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.utils.Pair; + import org.apache.cassandra.utils.concurrent.Refs; + /** * Task that manages receiving files for the session for certain ColumnFamily. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 454629e,6108dea..ccc2f89 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -294,9 -291,9 +298,9 @@@ public class StreamSession implements I return stores; } - private List getSSTableSectionsForRanges(Collection> ranges, Collection stores, long overriddenRepairedAt, boolean isIncremental) + private List getSSTableSectionsForRanges(Collection> ranges, Collection stores, long overriddenRepairedAt) { - List sstables = new ArrayList<>(); + Refs refs = new Refs<>(); try { for (ColumnFamilyStore cfStore : stores) @@@ -304,11 -301,11 +308,11 @@@ List> rowBoundsList = new ArrayList<>(ranges.size()); for (Range range : ranges) rowBoundsList.add(range.toRowBounds()); - ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)); - sstables.addAll(view.sstables); - refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs); ++ refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)).refs); } - List sections = new ArrayList<>(sstables.size()); - for (SSTableReader sstable : sstables) + + List sections = new ArrayList<>(refs.size()); + for (SSTableReader sstable : refs) { long repairedAt = overriddenRepairedAt; if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java index a3dd10f,b00042e..30c77e9 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@@ -23,9 -23,11 +23,11 @@@ import java.util.concurrent.ScheduledFu import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.Pair; + import org.apache.cassandra.utils.concurrent.Ref; + import org.apache.cassandra.utils.concurrent.RefCounted; /** * StreamTransferTask sends sections of SSTable files in certain ColumnFamily. @@@ -47,10 -49,10 +49,10 @@@ public class StreamTransferTask extend super(session, cfId); } - public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List> sections, long repairedAt) + public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List> sections, long repairedAt) { assert sstable != null && cfId.equals(sstable.metadata.cfId); - OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); - OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt); ++ OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); files.put(message.header.sequenceNumber, message); totalSize += message.header.size(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index ed4c4ce,5ebf289..43db5af --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@@ -57,10 -60,11 +59,11 @@@ public class OutgoingFileMessage extend } }; - public FileMessageHeader header; - public SSTableReader sstable; + public final FileMessageHeader header; + public final SSTableReader sstable; + public final Ref ref; - public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List> sections, long repairedAt, boolean keepSSTableLevel) - public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List> sections, long repairedAt) ++ public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List> sections, long repairedAt, boolean keepSSTableLevel) { super(Type.FILE); this.sstable = sstable; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/SchemaLoader.java index f7b10a2,ce65d5a..60f4703 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@@ -56,8 -58,22 +58,17 @@@ public class SchemaLoade // Migrations aren't happy if gossiper is not started. Even if we don't use migrations though, // some tests now expect us to start gossip for them. startGossiper(); - - // if you're messing with low-level sstable stuff, it can be useful to inject the schema directly - // Schema.instance.load(schemaDefinition()); - for (KSMetaData ksm : schemaDefinition()) - MigrationManager.announceNewKeyspace(ksm); } + @After + public void leakDetect() throws InterruptedException + { + System.gc(); + System.gc(); + System.gc(); + Thread.sleep(10); + } + public static void prepareServer() { // Cleanup first http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/KeyCacheTest.java index c370e4f,1f7024e..9695b4a --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@@ -43,11 -38,12 +43,12 @@@ import org.apache.cassandra.locator.Sim import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.ByteBufferUtil; + import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; -public class KeyCacheTest extends SchemaLoader +public class KeyCacheTest { - private static final String KEYSPACE1 = "KeyCacheSpace"; + private static final String KEYSPACE1 = "KeyCacheTest1"; private static final String COLUMN_FAMILY1 = "Standard1"; private static final String COLUMN_FAMILY2 = "Standard2"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 6ec56c7,d9442c7..018d643 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@@ -17,6 -17,7 +17,8 @@@ */ package org.apache.cassandra.db.compaction; ++import junit.framework.Assert; + import org.apache.cassandra.utils.concurrent.Refs; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@@ -89,9 -64,11 +91,9 @@@ public class AntiCompactionTes Range range = new Range(new BytesToken("0".getBytes()), new BytesToken("4".getBytes())); List> ranges = Arrays.asList(range); - SSTableReader.acquireReferences(sstables); - Refs refs = Refs.tryRef(sstables); - if (refs == null) - throw new IllegalStateException(); ++ Refs refs = Refs.ref(sstables); long repairedAt = 1000; - CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt); + CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt); assertEquals(2, store.getSSTables().size()); int repairedKeys = 0; @@@ -162,85 -142,27 +163,87 @@@ return writer.closeAndOpenReader(); } + public void generateSStable(ColumnFamilyStore store, String Suffix) + { + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i) + "-" + Suffix); + Mutation rm = new Mutation(KEYSPACE1, key.getKey()); + for (int j = 0; j < 10; j++) + rm.add("Standard1", Util.cellname(Integer.toString(j)), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp, + 0); + rm.apply(); + } + store.forceBlockingFlush(); + } + + @Test + public void antiCompactTenSTC() throws InterruptedException, IOException{ + antiCompactTen("SizeTieredCompactionStrategy"); + } + @Test - public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException + public void antiCompactTenLC() throws InterruptedException, IOException{ + antiCompactTen("LeveledCompactionStrategy"); + } + + public void antiCompactTen(String compactionStrategy) throws InterruptedException, IOException { - ColumnFamilyStore store = prepareColumnFamilyStore(); + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.setCompactionStrategyClass(compactionStrategy); + store.disableAutoCompaction(); + + for (int table = 0; table < 10; table++) + { + generateSStable(store,Integer.toString(table)); + } Collection sstables = store.getUnrepairedSSTables(); assertEquals(store.getSSTables().size(), sstables.size()); - Range range = new Range(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes())); + + Range range = new Range(new BytesToken("0".getBytes()), new BytesToken("4".getBytes())); List> ranges = Arrays.asList(range); - SSTableReader.acquireReferences(sstables); + Refs refs = Refs.tryRef(sstables); - if (refs == null) - throw new IllegalStateException(); - CompactionManager.instance.performAnticompaction(store, ranges, refs, 1); - assertThat(store.getSSTables().size(), is(1)); - assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false)); - assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1)); - assertThat(store.getDataTracker().getCompacting().size(), is(0)); ++ Assert.assertNotNull(refs); + long repairedAt = 1000; - CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt); ++ CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt); + /* + Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time + so there will be no net change in the number of sstables + */ + assertEquals(10, store.getSSTables().size()); + int repairedKeys = 0; + int nonRepairedKeys = 0; + for (SSTableReader sstable : store.getSSTables()) + { + ISSTableScanner scanner = sstable.getScanner(); + while (scanner.hasNext()) + { + SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + if (sstable.isRepaired()) + { + assertTrue(range.contains(row.getKey().getToken())); + assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt); + repairedKeys++; + } + else + { + assertFalse(range.contains(row.getKey().getToken())); + assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt); + nonRepairedKeys++; + } + } + } + assertEquals(repairedKeys, 40); + assertEquals(nonRepairedKeys, 60); } + @Test - public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException + public void shouldMutateRepairedAt() throws InterruptedException, IOException { ColumnFamilyStore store = prepareColumnFamilyStore(); Collection sstables = store.getUnrepairedSSTables(); @@@ -258,30 -179,6 +260,30 @@@ } + @Test + public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.disableAutoCompaction(); + + for (int table = 0; table < 10; table++) + { + generateSStable(store,Integer.toString(table)); + } + Collection sstables = store.getUnrepairedSSTables(); + assertEquals(store.getSSTables().size(), sstables.size()); + + Range range = new Range(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes())); + List> ranges = Arrays.asList(range); + - SSTableReader.acquireReferences(sstables); - CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0); ++ Refs refs = Refs.ref(sstables); ++ CompactionManager.instance.performAnticompaction(store, ranges, refs, 0); + + assertThat(store.getSSTables().size(), is(10)); + assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false)); + } + private ColumnFamilyStore prepareColumnFamilyStore() { Keyspace keyspace = Keyspace.open(KEYSPACE1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java index 1116c9e,08e3fb3..5420b1b --- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java @@@ -26,7 -26,8 +26,9 @@@ import java.util.Collection import java.util.HashSet; import java.util.Set; +import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.junit.After; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@@ -43,22 -42,20 +45,31 @@@ import static org.junit.Assert.assertEq import static org.junit.Assert.assertNotNull; import static org.apache.cassandra.Util.cellname; -public class BlacklistingCompactionsTest extends SchemaLoader +public class BlacklistingCompactionsTest { - public static final String KEYSPACE = "Keyspace1"; + private static final String KEYSPACE1 = "BlacklistingCompactionsTest"; + private static final String CF_STANDARD1 = "Standard1"; + @After + public void leakDetect() throws InterruptedException + { + System.gc(); + System.gc(); + System.gc(); + Thread.sleep(10); + } + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1)); + closeStdErr(); + } + public static void closeStdErr() { // These tests generate an error message per CorruptSSTableException since it goes through http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 3703d54,acf8c90..97625f4 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@@ -224,17 -197,13 +224,17 @@@ public class SSTableRewriterTest extend for (int i = 500; i < 1000; i++) writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); SSTableReader s2 = writer.openEarly(1000); + assertTrue(s != s2); assertFileCounts(dir.list(), 2, 3); - s.markObsolete(); + + s.setReplacedBy(s2); + s2.markObsolete(); - s.releaseReference(); - s2.releaseReference(); + s.sharedRef().release(); - Thread.sleep(1000); - assertFileCounts(dir.list(), 0, 3); ++ s2.sharedRef().release(); + writer.abort(false); + Thread.sleep(1000); int datafiles = assertFileCounts(dir.list(), 0, 0); assertEquals(datafiles, 0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ----------------------------------------------------------------------