cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject git commit: Ensure safe resource cleanup when replacing SSTables
Date Fri, 04 Apr 2014 20:37:50 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 57b18e600 -> 5ebadc11e


Ensure safe resource cleanup when replacing SSTables

Patch by belliotsmith; reviewed by Tyler Hobbs for CASSANDRA-6912


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ebadc11
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ebadc11
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ebadc11

Branch: refs/heads/cassandra-2.1
Commit: 5ebadc11e36749e6479f9aba19406db3aacdaf41
Parents: 57b18e6
Author: belliottsmith <github@sub.laerad.com>
Authored: Fri Apr 4 15:37:09 2014 -0500
Committer: Tyler Hobbs <tyler@datastax.com>
Committed: Fri Apr 4 15:37:09 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |  28 +-
 .../cassandra/io/sstable/IndexSummary.java      |   2 +-
 .../io/sstable/IndexSummaryManager.java         |  22 +-
 .../cassandra/io/sstable/SSTableReader.java     | 318 +++++++++++++------
 .../cassandra/utils/AlwaysPresentFilter.java    |   3 +-
 .../org/apache/cassandra/utils/BloomFilter.java |   3 +-
 .../org/apache/cassandra/utils/IFilter.java     |   2 +
 .../org/apache/cassandra/utils/obs/IBitSet.java |   2 +
 .../cassandra/utils/obs/OffHeapBitSet.java      |   2 +-
 .../apache/cassandra/utils/obs/OpenBitSet.java  |   2 +-
 .../io/sstable/IndexSummaryManagerTest.java     |   2 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  28 +-
 13 files changed, 278 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4cfc957..0f1ae93 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,7 @@
  * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
  * Lock counter cells, not partitions (CASSANDRA-6880)
  * Track presence of legacy counter shards in sstables (CASSANDRA-6888)
+ * Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912)
 Merged from 2.0:
  * Allow compaction of system tables during startup (CASSANDRA-6913)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index c8fc699..9c8f9a0 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -192,14 +192,17 @@ public class DataTracker
     public boolean markCompacting(Iterable<SSTableReader> sstables)
     {
         assert sstables != null && !Iterables.isEmpty(sstables);
+        while (true)
+        {
+            View currentView = view.get();
+            Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables),
currentView.compacting);
+            if (inactive.size() < Iterables.size(sstables))
+                return false;
 
-        View currentView = view.get();
-        Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables),
currentView.compacting);
-        if (inactive.size() < Iterables.size(sstables))
-            return false;
-
-        View newView = currentView.markCompacting(inactive);
-        return view.compareAndSet(currentView, newView);
+            View newView = currentView.markCompacting(inactive);
+            if (view.compareAndSet(currentView, newView))
+                return true;
+        }
     }
 
     /**
@@ -333,14 +336,6 @@ public class DataTracker
      */
     public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader>
newSSTables)
     {
-        // data component will be unchanged but the index summary will be a different size
-        // (since we save that to make restart fast)
-        long sizeIncrease = 0;
-        for (SSTableReader sstable : oldSSTables)
-            sizeIncrease -= sstable.bytesOnDisk();
-        for (SSTableReader sstable : newSSTables)
-            sizeIncrease += sstable.bytesOnDisk();
-
         View currentView, newView;
         do
         {
@@ -349,9 +344,6 @@ public class DataTracker
         }
         while (!view.compareAndSet(currentView, newView));
 
-        StorageMetrics.load.inc(sizeIncrease);
-        cfstore.metric.liveDiskSpaceUsed.inc(sizeIncrease);
-
         for (SSTableReader sstable : newSSTables)
             sstable.setTrackedBy(this);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index f87f356..0696fb7 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -258,7 +258,7 @@ public class IndexSummary implements Closeable
     }
 
     @Override
-    public void close() throws IOException
+    public void close()
     {
         bytes.free();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index b35f5f4..d5b7364 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -19,17 +19,24 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
-
-import org.apache.cassandra.config.CFMetaData;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -409,8 +416,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
             logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original
number of entries",
                          sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
                          entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
-            SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(entry.newSamplingLevel);
-            DataTracker tracker = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()).getDataTracker();
+            ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+            SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+            DataTracker tracker = cfs.getDataTracker();
 
             replacedByTracker.put(tracker, sstable);
             replacementsByTracker.put(tracker, replacement);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 82a0bc8..d29d5ac 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -17,16 +17,34 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.ICardinality;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
@@ -36,28 +54,62 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.cache.InstrumentingCache;
 import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.metadata.*;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.ICompressedFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.ThrottledReader;
 import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 
@@ -122,15 +174,24 @@ public class SSTableReader extends SSTable implements Closeable
     // but it seems like a good extra layer of protection against reference counting bugs
to not delete data based on that alone
     private final AtomicBoolean isCompacted = new AtomicBoolean(false);
     private final AtomicBoolean isSuspect = new AtomicBoolean(false);
-    private final AtomicBoolean isReplaced = new AtomicBoolean(false);
 
-    private final SSTableDeletingTask deletingTask;
     // not final since we need to be able to change level on a file.
     private volatile StatsMetadata sstableMetadata;
 
     private final AtomicLong keyCacheHit = new AtomicLong(0);
     private final AtomicLong keyCacheRequest = new AtomicLong(0);
 
+    /**
+     * To support replacing this sstablereader with another object that represents that same
underlying sstable, but with different associated resources,
+     * we build a linked-list chain of replacement, which we synchronise using a shared object
to make maintenance of the list across multiple threads simple.
+     * On close we check if any of the closeable resources differ between any chains either
side of us; any that are in neither of the adjacent links (if any) are closed.
+     * Once we've made this decision we remove ourselves from the linked list, so that anybody
behind/ahead will compare against only other still opened resources.
+     */
+    private Object replaceLock = new Object();
+    private SSTableReader replacedBy;
+    private SSTableReader replaces;
+    private SSTableDeletingTask deletingTask;
+
     @VisibleForTesting
     public RestorableMeter readMeter;
     private ScheduledFuture readMeterSyncFuture;
@@ -275,10 +336,10 @@ public class SSTableReader extends SSTable implements Closeable
                                                   statsMetadata);
 
         // special implementation of load to use non-pooled SegmentedFile builders
-        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
         SegmentedFile.Builder dbuilder = sstable.compression
-                                       ? new CompressedSegmentedFile.Builder()
-                                       : new BufferedSegmentedFile.Builder();
+                                       ? SegmentedFile.getCompressedBuilder()
+                                       : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
         if (!sstable.loadSummary(ibuilder, dbuilder))
             sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
         sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
@@ -480,27 +541,95 @@ public class SSTableReader extends SSTable implements Closeable
         return sum;
     }
 
-    /**
-     * Clean up all opened resources.
-     *
-     * @throws IOException
-     */
-    public void close() throws IOException
+    private void tidy(boolean release)
     {
         if (readMeterSyncFuture != null)
             readMeterSyncFuture.cancel(false);
 
-        // if this SSTR was replaced by a new SSTR with a different index summary, the two
instances will share
-        // resources, so don't force unmapping, clear the FileCacheService entry, or close
the BF
-        if (!isReplaced.get())
+        assert references.get() == 0;
+
+        synchronized (replaceLock)
         {
-            // Force finalizing mmapping if necessary
-            ifile.cleanup();
-            dfile.cleanup();
-            // close the BF so it can be opened later.
-            bf.close();
+            boolean closeBf = true, closeSummary = true, closeFiles = true;
+
+            if (replacedBy != null)
+            {
+                closeBf = replacedBy.bf != bf;
+                closeSummary = replacedBy.indexSummary != indexSummary;
+                closeFiles = replacedBy.dfile != dfile;
+            }
+
+            if (replaces != null)
+            {
+                closeBf &= replaces.bf != bf;
+                closeSummary &= replaces.indexSummary != indexSummary;
+                closeFiles &= replaces.dfile != dfile;
+            }
+
+            boolean deleteAll = false;
+            if (release && isCompacted.get())
+            {
+                assert replacedBy == null;
+                if (replaces != null)
+                {
+                    replaces.replacedBy = null;
+                    replaces.deletingTask = deletingTask;
+                    replaces.markObsolete();
+                }
+                else
+                {
+                    deleteAll = true;
+                }
+            }
+            else
+            {
+                if (replaces != null)
+                    replaces.replacedBy = replacedBy;
+                if (replacedBy != null)
+                    replacedBy.replaces = replaces;
+            }
+
+            assert references.get() == 0;
+            if (closeBf)
+                bf.close();
+            if (closeSummary)
+                indexSummary.close();
+            if (closeFiles)
+            {
+                ifile.cleanup();
+                dfile.cleanup();
+            }
+            if (deleteAll)
+            {
+                /**
+                 * Do the OS a favour and suggest (using fadvice call) that we
+                 * don't want to see pages of this SSTable in memory anymore.
+                 *
+                 * NOTE: We can't use madvice in java because it requires the address of
+                 * the mapping, so instead we always open a file and run fadvice(fd, 0, 0)
on it
+                 */
+                dropPageCache();
+                deletingTask.schedule();
+            }
         }
-        indexSummary.close();
+    }
+
+    /**
+     * Schedule clean-up of resources
+     */
+    public void close()
+    {
+        tidy(false);
+    }
+
+    public String getFilename()
+    {
+        return dfile.path;
+    }
+
+    public String getIndexFilename()
+    {
+        return ifile.path;
     }
 
     public void setTrackedBy(DataTracker tracker)
@@ -726,6 +855,17 @@ public class SSTableReader extends SSTable implements Closeable
         }
     }
 
+    public void setReplacedBy(SSTableReader replacement)
+    {
+        synchronized (replaceLock)
+        {
+            assert replacedBy == null;
+            replacedBy = replacement;
+            replacement.replaces = this;
+            replacement.replaceLock = replaceLock;
+        }
+    }
+
     /**
      * Returns a new SSTableReader with the same properties as this SSTableReader except
that a new IndexSummary will
      * be built at the target samplingLevel.  This (original) SSTableReader instance will
be marked as replaced, have
@@ -734,49 +874,59 @@ public class SSTableReader extends SSTable implements Closeable
      * @return a new SSTableReader
      * @throws IOException
      */
-    public SSTableReader cloneWithNewSummarySamplingLevel(int samplingLevel) throws IOException
+    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel)
throws IOException
     {
-        int minIndexInterval = metadata.getMinIndexInterval();
-        int maxIndexInterval = metadata.getMaxIndexInterval();
-        double effectiveInterval = indexSummary.getEffectiveIndexInterval();
+        synchronized (replaceLock)
+        {
+            assert replacedBy == null;
 
-        IndexSummary newSummary;
+            int minIndexInterval = metadata.getMinIndexInterval();
+            int maxIndexInterval = metadata.getMaxIndexInterval();
+            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 
-         // We have to rebuild the summary from the on-disk primary index in three cases:
-         // 1. The sampling level went up, so we need to read more entries off disk
-         // 2. The min_index_interval changed (in either direction); this changes what entries
would be in the summary
-         //    at full sampling (and consequently at any other sampling level)
-         // 3. The max_index_interval was lowered, forcing us to raise the sampling level
-        if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval()
!= minIndexInterval || effectiveInterval > maxIndexInterval)
-        {
-            newSummary = buildSummaryAtLevel(samplingLevel);
-        }
-        else if (samplingLevel < indexSummary.getSamplingLevel())
-        {
-            // we can use the existing index summary to make a smaller one
-            newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval,
partitioner);
+            IndexSummary newSummary;
+            long oldSize = bytesOnDisk();
 
-            SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
-            SegmentedFile.Builder dbuilder = compression
-                                           ? SegmentedFile.getCompressedBuilder()
-                                           : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-            saveSummary(ibuilder, dbuilder, newSummary);
-        }
-        else
-        {
-            throw new AssertionError("Attempted to clone SSTableReader with the same index
summary sampling level and " +
-                                     "no adjustments to min/max_index_interval");
-        }
+            // We have to rebuild the summary from the on-disk primary index in three cases:
+            // 1. The sampling level went up, so we need to read more entries off disk
+            // 2. The min_index_interval changed (in either direction); this changes what
entries would be in the summary
+            //    at full sampling (and consequently at any other sampling level)
+            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
+            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval()
!= minIndexInterval || effectiveInterval > maxIndexInterval)
+            {
+                newSummary = buildSummaryAtLevel(samplingLevel);
+            }
+            else if (samplingLevel < indexSummary.getSamplingLevel())
+            {
+                // we can use the existing index summary to make a smaller one
+                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel,
minIndexInterval, partitioner);
+
+                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+                SegmentedFile.Builder dbuilder = compression
+                                                 ? SegmentedFile.getCompressedBuilder()
+                                                 : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+                saveSummary(ibuilder, dbuilder, newSummary);
+            }
+            else
+            {
+                throw new AssertionError("Attempted to clone SSTableReader with the same
index summary sampling level and " +
+                                         "no adjustments to min/max_index_interval");
+            }
 
-        markReplaced();
-        if (readMeterSyncFuture != null)
-            readMeterSyncFuture.cancel(false);
+            long newSize = bytesOnDisk();
+            StorageMetrics.load.inc(newSize - oldSize);
+            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 
-        SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner,
ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata);
-        replacement.readMeter = this.readMeter;
-        replacement.first = this.first;
-        replacement.last = this.last;
-        return replacement;
+            if (readMeterSyncFuture != null)
+                readMeterSyncFuture.cancel(false);
+
+            SSTableReader replacement = new SSTableReader(descriptor, components, metadata,
partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata);
+            replacement.readMeter = this.readMeter;
+            replacement.first = this.first;
+            replacement.last = this.last;
+            setReplacedBy(replacement);
+            return replacement;
+        }
     }
 
     private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
@@ -1342,12 +1492,6 @@ public class SSTableReader extends SSTable implements Closeable
         return dfile.onDiskLength;
     }
 
-    public void markReplaced()
-    {
-        boolean success = isReplaced.compareAndSet(false, true);
-        assert success : "Attempted to mark an SSTableReader as replaced more than once";
-    }
-
     public boolean acquireReference()
     {
         while (true)
@@ -1368,27 +1512,7 @@ public class SSTableReader extends SSTable implements Closeable
     public void releaseReference()
     {
         if (references.decrementAndGet() == 0)
-        {
-            FileUtils.closeQuietly(this);
-
-            // if this SSTR instance was replaced by another with a different index summary,
let the new instance
-            // handle clearing the page cache and deleting the files
-            if (isCompacted.get())
-            {
-                assert !isReplaced.get();
-
-                /**
-                 * Do the OS a favour and suggest (using fadvice call) that we
-                 * don't want to see pages of this SSTable in memory anymore.
-                 *
-                 * NOTE: We can't use madvice in java because it requires the address of
-                 * the mapping, so instead we always open a file and run fadvice(fd, 0, 0)
on it
-                 */
-                dropPageCache();
-
-                deletingTask.schedule();
-            }
-        }
+            tidy(true);
         assert references.get() >= 0 : "Reference counter " +  references.get() + " for
" + dfile.path;
     }
 
@@ -1406,6 +1530,10 @@ public class SSTableReader extends SSTable implements Closeable
         if (logger.isDebugEnabled())
             logger.debug("Marking {} compacted", getFilename());
 
+        synchronized (replaceLock)
+        {
+            assert replacedBy == null;
+        }
         return !isCompacted.getAndSet(true);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0f5136b..83d8f3a 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class AlwaysPresentFilter implements IFilter
@@ -31,7 +30,7 @@ public class AlwaysPresentFilter implements IFilter
 
     public void clear() { }
 
-    public void close() throws IOException { }
+    public void close() { }
 
     public long serializedSize() { return 0; }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..ceba89b 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -112,7 +111,7 @@ public abstract class BloomFilter implements IFilter
         bitset.clear();
     }
 
-    public void close() throws IOException
+    public void close()
     {
         bitset.close();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index 10f6df2..91c0e36 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -29,4 +29,6 @@ public interface IFilter extends Closeable
     void clear();
 
     long serializedSize();
+
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index c6fbddd..96aac6b 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -49,4 +49,6 @@ public interface IBitSet extends Closeable
     public long serializedSize(TypeSizes type);
 
     public void clear();
+
+    public void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 29dd848..de8da01 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -139,7 +139,7 @@ public class OffHeapBitSet implements IBitSet
         return new OffHeapBitSet(memory);
     }
 
-    public void close() throws IOException
+    public void close()
     {
         bytes.free();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 5657d41..1d2f690 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -387,7 +387,7 @@ public class OpenBitSet implements IBitSet
     return (int)((h>>32) ^ h) + 0x98761234;
   }
 
-  public void close() throws IOException {
+  public void close() {
     // noop, let GC do the cleanup.
   }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 35fd9bd..9b2b492 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -421,7 +421,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
         SSTableReader sstable = original;
         for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
         {
-            sstable = sstable.cloneWithNewSummarySamplingLevel(samplingLevel);
+            sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel);
             assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
             int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval()
* BASE_SAMPLING_LEVEL);
             assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ebadc11/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index bd50538..8429d37 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -23,17 +23,20 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import org.junit.Assert;
 import com.google.common.collect.Sets;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,27 +44,34 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.composites.Composites;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.db.composites.Composites;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.LocalToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.MmappedSegmentedFile;
 import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.Util.cellname;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
-import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class SSTableReaderTest extends SchemaLoader
@@ -400,7 +410,7 @@ public class SSTableReaderTest extends SchemaLoader
             }));
         }
 
-        SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(1);
+        SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
         store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement));
         for (Future future : futures)
             future.get();


Mime
View raw message