cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Ensure compacted sstables are never used
Date Thu, 26 Jul 2012 15:47:27 GMT
Updated Branches:
  refs/heads/cassandra-1.1 cc0be1b40 -> 6a6b7ec1f


Ensure compacted sstables are never used

patch by slebresne; reviewed by jbellis for CASSANDRA-4436


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a6b7ec1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a6b7ec1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a6b7ec1

Branch: refs/heads/cassandra-1.1
Commit: 6a6b7ec1f00d8084fb29379111a798c09c35e6d6
Parents: cc0be1b
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Jul 18 19:54:20 2012 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Jul 26 17:46:19 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   32 ++++++++++--
 .../org/apache/cassandra/io/sstable/Component.java |    2 +
 .../apache/cassandra/io/sstable/Descriptor.java    |   10 +++-
 .../org/apache/cassandra/io/sstable/SSTable.java   |    2 +
 .../cassandra/io/sstable/SSTableMetadata.java      |   35 +++++++++----
 .../apache/cassandra/io/sstable/SSTableReader.java |   39 +++++----------
 7 files changed, 80 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 169f66d..84db73d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * fix "Can't Modify Index Name" problem on CF update (CASSANDRA-4439)
  * Fix assertion error in getOverlappingSSTables during repair (CASSANDRA-4456)
  * fix nodetool's setcompactionthreshold command (CASSANDRA-4455)
+ * Ensure compacted files are never used, to avoid counter overcount (CASSANDRA-4436)
 Merged from 1.0:
  * allow dropping columns shadowed by not-yet-expired supercolumn or row
    tombstones in PrecompactedRow (CASSANDRA-4396)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b93adc1..a39530a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -234,8 +234,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         if (loadSSTables)
         {
-            Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true);
-            data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys,
data, metadata, this.partitioner));
+            Directories.SSTableLister sstableFiles = directories.sstableLister().skipCompacted(true).skipTemporary(true);
+            Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(),
savedKeys, data, metadata, this.partitioner);
+
+            // Filter non-compacted sstables, remove compacted ones
+            Set<Integer> compactedSSTables = new HashSet<Integer>();
+            for (SSTableReader sstable : sstables)
+                compactedSSTables.addAll(sstable.getAncestors());
+
+            Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
+            for (SSTableReader sstable : sstables)
+            {
+                if (compactedSSTables.contains(sstable.descriptor.generation))
+                    sstable.releaseReference(); // this amount to deleting the sstable
+                else
+                    liveSSTables.add(sstable);
+            }
+            data.addInitialSSTables(liveSSTables);
         }
 
         // compaction strategy should be created after the CFS has been prepared
@@ -492,7 +507,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             SSTableReader reader;
             try
             {
-                reader = SSTableReader.open(newDescriptor, entry.getValue(), Collections.<DecoratedKey>emptySet(),
data, metadata, partitioner);
+                reader = SSTableReader.open(newDescriptor, entry.getValue(), Collections.<DecoratedKey>emptySet(),
metadata, partitioner);
             }
             catch (IOException e)
             {
@@ -1969,9 +1984,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         ReplayPosition rp = ReplayPosition.getReplayPosition(sstables);
         SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(rp);
 
-        // get the max timestamp of the precompacted sstables
+        // Get the max timestamp of the precompacted sstables
+        // and adds generation of live ancestors
         for (SSTableReader sstable : sstables)
+        {
             sstableMetadataCollector.updateMaxTimestamp(sstable.getMaxTimestamp());
+            sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+            for (Integer i : sstable.getAncestors())
+            {
+                if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+                    sstableMetadataCollector.addAncestor(i);
+            }
+        }
 
         return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner,
sstableMetadataCollector);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 4517244..a57f8c0 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -48,6 +48,8 @@ public class Component
         // serialized bloom filter for the row keys in the sstable
         FILTER("Filter.db"),
         // 0-length file that is created when an sstable is ready to be deleted
+        // @deprecated: deletion of compacted file is based on the lineag information stored
in the compacted sstabl
+        // metadata. This ensure we can guarantee never using a sstable and some of its parents,
even in case of failure.
         COMPACTED_MARKER("Compacted"),
         // file to hold information about uncompressed data length, chunk offsets etc.
         COMPRESSION_INFO("CompressionInfo.db"),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 3c63c8b..07bd6e8 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -58,7 +58,8 @@ public class Descriptor
     // hb (1.0.3): records compression ration in metadata component
     // hc (1.0.4): records partitioner in metadata component
     // hd (1.0.10): includes row tombstones in maxtimestamp
-    public static final String CURRENT_VERSION = "hd";
+    // he (1.0.11): includes row tombstones in maxtimestamp
+    public static final String CURRENT_VERSION = "he";
 
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
@@ -78,6 +79,7 @@ public class Descriptor
     public final boolean tracksMaxTimestamp;
     public final boolean hasCompressionRatio;
     public final boolean hasPartitioner;
+    public final boolean hasAncestors;
 
     /**
      * A descriptor that assumes CURRENT_VERSION.
@@ -106,9 +108,15 @@ public class Descriptor
         tracksMaxTimestamp = version.compareTo("hd") >= 0;
         hasCompressionRatio = version.compareTo("hb") >= 0;
         hasPartitioner = version.compareTo("hc") >= 0;
+        hasAncestors = version.compareTo("he") >= 0;
         isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
     }
 
+    public Descriptor withGeneration(int newGeneration)
+    {
+        return new Descriptor(version, directory, ksname, cfname, newGeneration, temporary);
+    }
+
     public String filenameFor(Component component)
     {
         return filenameFor(component.name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index aed3c62..9a29066 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -140,6 +140,8 @@ public abstract class SSTable
             FileUtils.deleteWithConfirm(desc.filenameFor(component));
         }
         // remove the COMPACTED_MARKER component last if it exists
+        // Note: newly created sstable should not have a marker, but we keep this for now
to make sure
+        // we don't leave older marker around
         FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
 
         logger.debug("Deleted {}", desc);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index bf17c6f..147f2b2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -19,12 +19,8 @@
 
 package org.apache.cassandra.io.sstable;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
+import java.io.*;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +38,7 @@ import org.apache.cassandra.utils.EstimatedHistogram;
  *  - max column timestamp
  *  - compression ratio
  *  - partitioner
+ *  - generations of sstables from which this sstable was compacted, if any
  *
  * An SSTableMetadata should be instantiated via the Collector, openFromDescriptor()
  * or createDefaultInstance()
@@ -58,6 +55,7 @@ public class SSTableMetadata
     public final long maxTimestamp;
     public final double compressionRatio;
     public final String partitioner;
+    public final Set<Integer> ancestors;
 
     private SSTableMetadata()
     {
@@ -66,10 +64,11 @@ public class SSTableMetadata
              ReplayPosition.NONE,
              Long.MIN_VALUE,
              Double.MIN_VALUE,
-             null);
+             null,
+             Collections.<Integer>emptySet());
     }
 
-    private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts,
ReplayPosition replayPosition, long maxTimestamp, double cr, String partitioner)
+    private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts,
ReplayPosition replayPosition, long maxTimestamp, double cr, String partitioner, Set<Integer>
ancestors)
     {
         this.estimatedRowSize = rowSizes;
         this.estimatedColumnCount = columnCounts;
@@ -77,6 +76,7 @@ public class SSTableMetadata
         this.maxTimestamp = maxTimestamp;
         this.compressionRatio = cr;
         this.partitioner = partitioner;
+        this.ancestors = ancestors;
     }
 
     public static SSTableMetadata createDefaultInstance()
@@ -108,6 +108,7 @@ public class SSTableMetadata
         protected ReplayPosition replayPosition = ReplayPosition.NONE;
         protected long maxTimestamp = Long.MIN_VALUE;
         protected double compressionRatio = Double.MIN_VALUE;
+        protected Set<Integer> ancestors = new HashSet<Integer>();
 
         public void addRowSize(long rowSize)
         {
@@ -140,7 +141,8 @@ public class SSTableMetadata
                                        replayPosition,
                                        maxTimestamp,
                                        compressionRatio,
-                                       partitioner);
+                                       partitioner,
+                                       ancestors);
         }
 
         public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
@@ -160,6 +162,12 @@ public class SSTableMetadata
             this.replayPosition = replayPosition;
             return this;
         }
+
+        public Collector addAncestor(int generation)
+        {
+            this.ancestors.add(generation);
+            return this;
+        }
     }
 
     public static class SSTableMetadataSerializer
@@ -176,6 +184,9 @@ public class SSTableMetadata
             dos.writeLong(sstableStats.maxTimestamp);
             dos.writeDouble(sstableStats.compressionRatio);
             dos.writeUTF(sstableStats.partitioner);
+            dos.writeInt(sstableStats.ancestors.size());
+            for (Integer g : sstableStats.ancestors)
+                dos.writeInt(g);
         }
 
         public SSTableMetadata deserialize(Descriptor descriptor) throws IOException
@@ -213,7 +224,11 @@ public class SSTableMetadata
                                     ? dis.readDouble()
                                     : Double.MIN_VALUE;
             String partitioner = desc.hasPartitioner ? dis.readUTF() : null;
-            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp,
compressionRatio, partitioner);
+            int nbAncestors = desc.hasAncestors ? dis.readInt() : 0;
+            Set<Integer> ancestors = new HashSet<Integer>(nbAncestors);
+            for (int i = 0; i < nbAncestors; i++)
+                ancestors.add(dis.readInt());
+            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp,
compressionRatio, partitioner, ancestors);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 0aaa932..21dc71d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -140,23 +140,22 @@ public class SSTableReader extends SSTable
 
     public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component>
components, CFMetaData metadata) throws IOException
     {
-        return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null,
metadata, StorageService.getPartitioner(), false);
+        return open(descriptor, components, Collections.<DecoratedKey>emptySet(), metadata,
StorageService.getPartitioner(), false);
     }
 
     public static SSTableReader open(Descriptor descriptor, Set<Component> components,
CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
-        return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null,
metadata, partitioner);
+        return open(descriptor, components, Collections.<DecoratedKey>emptySet(), metadata,
partitioner);
     }
 
-    public static SSTableReader open(Descriptor descriptor, Set<Component> components,
Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData metadata, IPartitioner
partitioner) throws IOException
+    public static SSTableReader open(Descriptor descriptor, Set<Component> components,
Set<DecoratedKey> savedKeys, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
-        return open(descriptor, components, savedKeys, tracker, metadata, partitioner, true);
+        return open(descriptor, components, savedKeys, metadata, partitioner, true);
     }
 
     private static SSTableReader open(Descriptor descriptor,
                                       Set<Component> components,
                                       Set<DecoratedKey> savedKeys,
-                                      DataTracker tracker,
                                       CFMetaData metadata,
                                       IPartitioner partitioner,
                                       boolean validate) throws IOException
@@ -191,8 +190,6 @@ public class SSTableReader extends SSTable
                                                   null,
                                                   System.currentTimeMillis(),
                                                   sstableMetadata);
-        sstable.setTrackedBy(tracker);
-
         // versions before 'c' encoded keys as utf-16 before hashing to the filter
         if (descriptor.hasStringsInBloomFilter)
         {
@@ -242,7 +239,7 @@ public class SSTableReader extends SSTable
                     SSTableReader sstable;
                     try
                     {
-                        sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker,
metadata, partitioner);
+                        sstable = open(entry.getKey(), entry.getValue(), savedKeys, metadata,
partitioner);
                     }
                     catch (IOException ex)
                     {
@@ -320,11 +317,8 @@ public class SSTableReader extends SSTable
 
     public void setTrackedBy(DataTracker tracker)
     {
-        if (tracker != null)
-        {
-            keyCache = CacheService.instance.keyCache;
-            deletingTask.setTracker(tracker);
-        }
+        keyCache = CacheService.instance.keyCache;
+        deletingTask.setTracker(tracker);
     }
 
     void loadBloomFilter() throws IOException
@@ -827,19 +821,7 @@ public class SSTableReader extends SSTable
         if (logger.isDebugEnabled())
             logger.debug("Marking " + getFilename() + " compacted");
 
-        if (isCompacted.getAndSet(true))
-            return false;
-
-        try
-        {
-            if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile())
-                throw new IOException("Compaction marker already exists");
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-        return true;
+        return !isCompacted.getAndSet(true);
     }
 
     public void markSuspect()
@@ -1015,6 +997,11 @@ public class SSTableReader extends SSTable
         return sstableMetadata.maxTimestamp;
     }
 
+    public Set<Integer> getAncestors()
+    {
+        return sstableMetadata.ancestors;
+    }
+
     public RandomAccessReader openDataReader(boolean skipIOCache) throws IOException
     {
         return compression


Mime
View raw message