Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4F4C010469 for ; Fri, 4 Apr 2014 20:37:54 +0000 (UTC) Received: (qmail 18013 invoked by uid 500); 4 Apr 2014 20:37:53 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 17988 invoked by uid 500); 4 Apr 2014 20:37:53 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 17962 invoked by uid 99); 4 Apr 2014 20:37:51 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2014 20:37:51 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id ED90F9432BC; Fri, 4 Apr 2014 20:37:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tylerhobbs@apache.org To: commits@cassandra.apache.org Message-Id: <43e2d06a4c734a7d9633e4f137c8baa0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Ensure safe resource cleanup when replacing SSTables Date: Fri, 4 Apr 2014 20:37:50 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 57b18e600 -> 5ebadc11e Ensure safe resource cleanup when replacing SSTables Patch by belliotsmith; reviewed by Tyler Hobbs for CASSANDRA-6912 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ebadc11 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ebadc11 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ebadc11 Branch: refs/heads/cassandra-2.1 Commit: 5ebadc11e36749e6479f9aba19406db3aacdaf41 Parents: 57b18e6 Author: belliottsmith Authored: Fri Apr 4 15:37:09 2014 -0500 Committer: Tyler Hobbs Committed: Fri Apr 4 15:37:09 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/DataTracker.java | 28 +- .../cassandra/io/sstable/IndexSummary.java | 2 +- .../io/sstable/IndexSummaryManager.java | 22 +- .../cassandra/io/sstable/SSTableReader.java | 318 +++++++++++++------ .../cassandra/utils/AlwaysPresentFilter.java | 3 +- .../org/apache/cassandra/utils/BloomFilter.java | 3 +- .../org/apache/cassandra/utils/IFilter.java | 2 + .../org/apache/cassandra/utils/obs/IBitSet.java | 2 + .../cassandra/utils/obs/OffHeapBitSet.java | 2 +- .../apache/cassandra/utils/obs/OpenBitSet.java | 2 +- .../io/sstable/IndexSummaryManagerTest.java | 2 +- .../cassandra/io/sstable/SSTableReaderTest.java | 28 +- 13 files changed, 278 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4cfc957..0f1ae93 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -41,6 +41,7 @@ * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869) * Lock counter cells, not partitions (CASSANDRA-6880) * Track presence of legacy counter shards in sstables (CASSANDRA-6888) + * Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912) Merged from 2.0: * Allow compaction of system tables during startup (CASSANDRA-6913) * Restrict Windows to parallel repairs (CASSANDRA-6907) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index c8fc699..9c8f9a0 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -192,14 +192,17 @@ public class DataTracker public boolean markCompacting(Iterable sstables) { assert sstables != null && !Iterables.isEmpty(sstables); + while (true) + { + View currentView = view.get(); + Set inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting); + if (inactive.size() < Iterables.size(sstables)) + return false; - View currentView = view.get(); - Set inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting); - if (inactive.size() < Iterables.size(sstables)) - return false; - - View newView = currentView.markCompacting(inactive); - return view.compareAndSet(currentView, newView); + View newView = currentView.markCompacting(inactive); + if (view.compareAndSet(currentView, newView)) + return true; + } } /** @@ -333,14 +336,6 @@ public class DataTracker */ public void replaceReaders(Collection oldSSTables, Collection newSSTables) { - // data component will be unchanged but the index summary will be a different size - // (since we save that to make restart fast) - long sizeIncrease = 0; - for (SSTableReader sstable : oldSSTables) - sizeIncrease -= sstable.bytesOnDisk(); - for (SSTableReader sstable : newSSTables) - sizeIncrease += sstable.bytesOnDisk(); - View currentView, newView; do { @@ -349,9 +344,6 @@ public class DataTracker } while (!view.compareAndSet(currentView, newView)); - StorageMetrics.load.inc(sizeIncrease); - cfstore.metric.liveDiskSpaceUsed.inc(sizeIncrease); - for (SSTableReader sstable : newSSTables) sstable.setTrackedBy(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/IndexSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java index f87f356..0696fb7 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java @@ -258,7 +258,7 @@ public class IndexSummary implements Closeable } @Override - public void close() throws IOException + public void close() { bytes.free(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index b35f5f4..d5b7364 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -19,17 +19,24 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; - import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; - -import org.apache.cassandra.config.CFMetaData; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -409,8 +416,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); - SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(entry.newSamplingLevel); - DataTracker tracker = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()).getDataTracker(); + ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()); + SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); + DataTracker tracker = cfs.getDataTracker(); replacedByTracker.put(tracker, sstable); replacementsByTracker.put(tracker, replacement); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 82a0bc8..d29d5ac 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -17,16 +17,34 @@ */ package org.apache.cassandra.io.sstable; -import java.io.*; +import java.io.BufferedInputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; -import com.clearspring.analytics.stream.cardinality.ICardinality; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; @@ -36,28 +54,62 @@ import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.cache.InstrumentingCache; import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; -import org.apache.cassandra.config.*; -import org.apache.cassandra.db.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DataTracker; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.index.SecondaryIndex; -import org.apache.cassandra.dht.*; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressedThrottledReader; import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.metadata.*; -import org.apache.cassandra.io.util.*; +import org.apache.cassandra.io.sstable.metadata.CompactionMetadata; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; +import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.ICompressedFile; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.io.util.ThrottledReader; import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CLibrary; +import org.apache.cassandra.utils.EstimatedHistogram; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.FilterFactory; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR; @@ -122,15 +174,24 @@ public class SSTableReader extends SSTable implements Closeable // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone private final AtomicBoolean isCompacted = new AtomicBoolean(false); private final AtomicBoolean isSuspect = new AtomicBoolean(false); - private final AtomicBoolean isReplaced = new AtomicBoolean(false); - private final SSTableDeletingTask deletingTask; // not final since we need to be able to change level on a file. private volatile StatsMetadata sstableMetadata; private final AtomicLong keyCacheHit = new AtomicLong(0); private final AtomicLong keyCacheRequest = new AtomicLong(0); + /** + * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources, + * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple. + * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed. + * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources. + */ + private Object replaceLock = new Object(); + private SSTableReader replacedBy; + private SSTableReader replaces; + private SSTableDeletingTask deletingTask; + @VisibleForTesting public RestorableMeter readMeter; private ScheduledFuture readMeterSyncFuture; @@ -275,10 +336,10 @@ public class SSTableReader extends SSTable implements Closeable statsMetadata); // special implementation of load to use non-pooled SegmentedFile builders - SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); SegmentedFile.Builder dbuilder = sstable.compression - ? new CompressedSegmentedFile.Builder() - : new BufferedSegmentedFile.Builder(); + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); if (!sstable.loadSummary(ibuilder, dbuilder)) sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)); @@ -480,27 +541,95 @@ public class SSTableReader extends SSTable implements Closeable return sum; } - /** - * Clean up all opened resources. - * - * @throws IOException - */ - public void close() throws IOException + private void tidy(boolean release) { if (readMeterSyncFuture != null) readMeterSyncFuture.cancel(false); - // if this SSTR was replaced by a new SSTR with a different index summary, the two instances will share - // resources, so don't force unmapping, clear the FileCacheService entry, or close the BF - if (!isReplaced.get()) + assert references.get() == 0; + + synchronized (replaceLock) { - // Force finalizing mmapping if necessary - ifile.cleanup(); - dfile.cleanup(); - // close the BF so it can be opened later. - bf.close(); + boolean closeBf = true, closeSummary = true, closeFiles = true; + + if (replacedBy != null) + { + closeBf = replacedBy.bf != bf; + closeSummary = replacedBy.indexSummary != indexSummary; + closeFiles = replacedBy.dfile != dfile; + } + + if (replaces != null) + { + closeBf &= replaces.bf != bf; + closeSummary &= replaces.indexSummary != indexSummary; + closeFiles &= replaces.dfile != dfile; + } + + boolean deleteAll = false; + if (release && isCompacted.get()) + { + assert replacedBy == null; + if (replaces != null) + { + replaces.replacedBy = null; + replaces.deletingTask = deletingTask; + replaces.markObsolete(); + } + else + { + deleteAll = true; + } + } + else + { + if (replaces != null) + replaces.replacedBy = replacedBy; + if (replacedBy != null) + replacedBy.replaces = replaces; + } + + assert references.get() == 0; + if (closeBf) + bf.close(); + if (closeSummary) + indexSummary.close(); + if (closeFiles) + { + ifile.cleanup(); + dfile.cleanup(); + } + if (deleteAll) + { + /** + * Do the OS a favour and suggest (using fadvice call) that we + * don't want to see pages of this SSTable in memory anymore. + * + * NOTE: We can't use madvice in java because it requires the address of + * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it + */ + dropPageCache(); + deletingTask.schedule(); + } } - indexSummary.close(); + } + + /** + * Schedule clean-up of resources + */ + public void close() + { + tidy(false); + } + + public String getFilename() + { + return dfile.path; + } + + public String getIndexFilename() + { + return ifile.path; } public void setTrackedBy(DataTracker tracker) @@ -726,6 +855,17 @@ public class SSTableReader extends SSTable implements Closeable } } + public void setReplacedBy(SSTableReader replacement) + { + synchronized (replaceLock) + { + assert replacedBy == null; + replacedBy = replacement; + replacement.replaces = this; + replacement.replaceLock = replaceLock; + } + } + /** * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have @@ -734,49 +874,59 @@ public class SSTableReader extends SSTable implements Closeable * @return a new SSTableReader * @throws IOException */ - public SSTableReader cloneWithNewSummarySamplingLevel(int samplingLevel) throws IOException + public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException { - int minIndexInterval = metadata.getMinIndexInterval(); - int maxIndexInterval = metadata.getMaxIndexInterval(); - double effectiveInterval = indexSummary.getEffectiveIndexInterval(); + synchronized (replaceLock) + { + assert replacedBy == null; - IndexSummary newSummary; + int minIndexInterval = metadata.getMinIndexInterval(); + int maxIndexInterval = metadata.getMaxIndexInterval(); + double effectiveInterval = indexSummary.getEffectiveIndexInterval(); - // We have to rebuild the summary from the on-disk primary index in three cases: - // 1. The sampling level went up, so we need to read more entries off disk - // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary - // at full sampling (and consequently at any other sampling level) - // 3. The max_index_interval was lowered, forcing us to raise the sampling level - if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval) - { - newSummary = buildSummaryAtLevel(samplingLevel); - } - else if (samplingLevel < indexSummary.getSamplingLevel()) - { - // we can use the existing index summary to make a smaller one - newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); + IndexSummary newSummary; + long oldSize = bytesOnDisk(); - SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); - SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - saveSummary(ibuilder, dbuilder, newSummary); - } - else - { - throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " + - "no adjustments to min/max_index_interval"); - } + // We have to rebuild the summary from the on-disk primary index in three cases: + // 1. The sampling level went up, so we need to read more entries off disk + // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary + // at full sampling (and consequently at any other sampling level) + // 3. The max_index_interval was lowered, forcing us to raise the sampling level + if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval) + { + newSummary = buildSummaryAtLevel(samplingLevel); + } + else if (samplingLevel < indexSummary.getSamplingLevel()) + { + // we can use the existing index summary to make a smaller one + newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); + + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + SegmentedFile.Builder dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + saveSummary(ibuilder, dbuilder, newSummary); + } + else + { + throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " + + "no adjustments to min/max_index_interval"); + } - markReplaced(); - if (readMeterSyncFuture != null) - readMeterSyncFuture.cancel(false); + long newSize = bytesOnDisk(); + StorageMetrics.load.inc(newSize - oldSize); + parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize); - SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata); - replacement.readMeter = this.readMeter; - replacement.first = this.first; - replacement.last = this.last; - return replacement; + if (readMeterSyncFuture != null) + readMeterSyncFuture.cancel(false); + + SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata); + replacement.readMeter = this.readMeter; + replacement.first = this.first; + replacement.last = this.last; + setReplacedBy(replacement); + return replacement; + } } private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException @@ -1342,12 +1492,6 @@ public class SSTableReader extends SSTable implements Closeable return dfile.onDiskLength; } - public void markReplaced() - { - boolean success = isReplaced.compareAndSet(false, true); - assert success : "Attempted to mark an SSTableReader as replaced more than once"; - } - public boolean acquireReference() { while (true) @@ -1368,27 +1512,7 @@ public class SSTableReader extends SSTable implements Closeable public void releaseReference() { if (references.decrementAndGet() == 0) - { - FileUtils.closeQuietly(this); - - // if this SSTR instance was replaced by another with a different index summary, let the new instance - // handle clearing the page cache and deleting the files - if (isCompacted.get()) - { - assert !isReplaced.get(); - - /** - * Do the OS a favour and suggest (using fadvice call) that we - * don't want to see pages of this SSTable in memory anymore. - * - * NOTE: We can't use madvice in java because it requires the address of - * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it - */ - dropPageCache(); - - deletingTask.schedule(); - } - } + tidy(true); assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path; } @@ -1406,6 +1530,10 @@ public class SSTableReader extends SSTable implements Closeable if (logger.isDebugEnabled()) logger.debug("Marking {} compacted", getFilename()); + synchronized (replaceLock) + { + assert replacedBy == null; + } return !isCompacted.getAndSet(true); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java index 0f5136b..83d8f3a 100644 --- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java +++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.utils; -import java.io.IOException; import java.nio.ByteBuffer; public class AlwaysPresentFilter implements IFilter @@ -31,7 +30,7 @@ public class AlwaysPresentFilter implements IFilter public void clear() { } - public void close() throws IOException { } + public void close() { } public long serializedSize() { return 0; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/BloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java index 9fbb38e..ceba89b 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilter.java +++ b/src/java/org/apache/cassandra/utils/BloomFilter.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.utils; -import java.io.IOException; import java.nio.ByteBuffer; import com.google.common.annotations.VisibleForTesting; @@ -112,7 +111,7 @@ public abstract class BloomFilter implements IFilter bitset.clear(); } - public void close() throws IOException + public void close() { bitset.close(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/IFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java index 10f6df2..91c0e36 100644 --- a/src/java/org/apache/cassandra/utils/IFilter.java +++ b/src/java/org/apache/cassandra/utils/IFilter.java @@ -29,4 +29,6 @@ public interface IFilter extends Closeable void clear(); long serializedSize(); + + void close(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/IBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java index c6fbddd..96aac6b 100644 --- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java @@ -49,4 +49,6 @@ public interface IBitSet extends Closeable public long serializedSize(TypeSizes type); public void clear(); + + public void close(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java index 29dd848..de8da01 100644 --- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java @@ -139,7 +139,7 @@ public class OffHeapBitSet implements IBitSet return new OffHeapBitSet(memory); } - public void close() throws IOException + public void close() { bytes.free(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java index 5657d41..1d2f690 100644 --- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java @@ -387,7 +387,7 @@ public class OpenBitSet implements IBitSet return (int)((h>>32) ^ h) + 0x98761234; } - public void close() throws IOException { + public void close() { // noop, let GC do the cleanup. } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 35fd9bd..9b2b492 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -421,7 +421,7 @@ public class IndexSummaryManagerTest extends SchemaLoader SSTableReader sstable = original; for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++) { - sstable = sstable.cloneWithNewSummarySamplingLevel(samplingLevel); + sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel); assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel()); int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL); assertEquals(expectedSize, sstable.getIndexSummarySize(), 1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index bd50538..8429d37 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -23,17 +23,20 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; -import org.junit.Assert; import com.google.common.collect.Sets; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,27 +44,34 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IndexExpression; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.Row; +import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.composites.Composites; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.db.composites.Composites; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.LocalToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.service.CacheService; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.MmappedSegmentedFile; import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.Util.cellname; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; -import static org.apache.cassandra.Util.cellname; +import static org.junit.Assert.assertTrue; @RunWith(OrderedJUnit4ClassRunner.class) public class SSTableReaderTest extends SchemaLoader @@ -400,7 +410,7 @@ public class SSTableReaderTest extends SchemaLoader })); } - SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(1); + SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1); store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement)); for (Future future : futures) future.get();