cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r998692 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/db/
Date Sun, 19 Sep 2010 16:37:50 GMT
Author: jbellis
Date: Sun Sep 19 16:37:50 2010
New Revision: 998692

URL: http://svn.apache.org/viewvc?rev=998692&view=rev
Log:
backport 1074 from trunk.  patch by Sylvain Lebresne; reviewed by jbellis

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
    cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Sun Sep 19 16:37:50 2010
@@ -22,6 +22,8 @@
    newly added nodes (CASSANDRA-1467)
  * use JNA, if present, to take snapshots (CASSANDRA-1371)
  * make IndexInterval configurable (CASSANDRA-1488)
+ * remove tombstones during non-major compactions when bloom filter
+   verifies that row does not exist in other sstables (CASSANDRA-1074)
 
 
 0.6.5

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Sun Sep 19 16:37:50 2010
@@ -513,6 +513,24 @@ public class ColumnFamilyStore implement
         }
     }
 
+    /**
+     * Uses bloom filters to check if key may be present in any sstable in this
+     * ColumnFamilyStore, minus a set of provided ones.
+     *
+     * Because BFs are checked, negative returns ensure that the key is not
+     * present in the checked SSTables, but positive ones doesn't ensure key
+     * presence.
+     */
+    public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<SSTable> sstablesToIgnore)
+    {
+        for (SSTableReader sstable : ssTables_)
+        {
+            if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key))
+                return true;
+        }
+        return false;
+    }
+
     /*
      * Called after the Memtable flushes its in-memory data, or we add a file
      * via bootstrap. This information is

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
Sun Sep 19 16:37:50 2010
@@ -275,7 +275,7 @@ public class CompactionManager implement
           logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
         SSTableWriter writer;
-        CompactionIterator ci = new CompactionIterator(sstables, gcBefore, major); // retain
a handle so we can call close()
+        CompactionIterator ci = new CompactionIterator(cfs, sstables, gcBefore, major); //
retain a handle so we can call close()
         Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
         executor.beginCompaction(cfs, ci);
 
@@ -359,7 +359,7 @@ public class CompactionManager implement
           logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
         SSTableWriter writer = null;
-        CompactionIterator ci = new AntiCompactionIterator(sstables, ranges, getDefaultGCBefore(),
cfs.isCompleteSSTables(sstables));
+        CompactionIterator ci = new AntiCompactionIterator(cfs, sstables, ranges, getDefaultGCBefore(),
cfs.isCompleteSSTables(sstables));
         Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
         executor.beginCompaction(cfs, ci);
 
@@ -422,7 +422,7 @@ public class CompactionManager implement
     private void doValidationCompaction(ColumnFamilyStore cfs, AntiEntropyService.Validator
validator) throws IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionIterator ci = new CompactionIterator(sstables, getDefaultGCBefore(), true);
+        CompactionIterator ci = new CompactionIterator(cfs, sstables, getDefaultGCBefore(),
true);
         executor.beginCompaction(cfs, ci);
         try
         {
@@ -495,10 +495,10 @@ public class CompactionManager implement
     {
         private Set<SSTableScanner> scanners;
 
-        public AntiCompactionIterator(Collection<SSTableReader> sstables, Collection<Range>
ranges, int gcBefore, boolean isMajor)
+        public AntiCompactionIterator(ColumnFamilyStore cfStore, Collection<SSTableReader>
sstables, Collection<Range> ranges, int gcBefore, boolean isMajor)
                 throws IOException
         {
-            super(getCollatedRangeIterator(sstables, ranges), gcBefore, isMajor);
+            super(cfStore, getCollatedRangeIterator(sstables, ranges), gcBefore, isMajor);
         }
 
         private static Iterator getCollatedRangeIterator(Collection<SSTableReader>
sstables, final Collection<Range> ranges)

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java
Sun Sep 19 16:37:50 2010
@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.io.IOError;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.Iterator;
 
 import org.apache.log4j.Logger;
@@ -45,6 +47,7 @@ public class CompactionIterator extends 
     protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
     private final List<IteratingRow> rows = new ArrayList<IteratingRow>();
+    private final ColumnFamilyStore cfs;
     private final int gcBefore;
     private final boolean major;
 
@@ -52,13 +55,13 @@ public class CompactionIterator extends 
     private long bytesRead;
     private long row;
 
-    public CompactionIterator(Iterable<SSTableReader> sstables, int gcBefore, boolean
major) throws IOException
+    public CompactionIterator(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables,
int gcBefore, boolean major) throws IOException
     {
-        this(getCollatingIterator(sstables), gcBefore, major);
+        this(cfs, getCollatingIterator(sstables), gcBefore, major);
     }
 
     @SuppressWarnings("unchecked")
-    protected CompactionIterator(Iterator iter, int gcBefore, boolean major)
+    protected CompactionIterator(ColumnFamilyStore cfs, Iterator iter, int gcBefore, boolean
major)
     {
         super(iter);
         row = 0;
@@ -67,6 +70,7 @@ public class CompactionIterator extends 
         {
             totalBytes += scanner.getFileLength();
         }
+        this.cfs = cfs;
         this.gcBefore = gcBefore;
         this.major = major;
     }
@@ -99,9 +103,14 @@ public class CompactionIterator extends 
         DataOutputBuffer buffer = new DataOutputBuffer();
         DecoratedKey key = rows.get(0).getKey();
 
+        Set<SSTable> sstables = new HashSet<SSTable>();
+        for (IteratingRow row : rows)
+            sstables.add(row.sstable);
+        boolean shouldPurge = major || !cfs.isKeyInRemainingSSTables(key, sstables);
+
         try
         {
-            if (rows.size() > 1 || major)
+            if (rows.size() > 1 || shouldPurge)
             {
                 ColumnFamily cf = null;
                 for (IteratingRow row : rows)
@@ -125,7 +134,7 @@ public class CompactionIterator extends 
                         cf.addAll(thisCF);
                     }
                 }
-                ColumnFamily cfPurged = major ? ColumnFamilyStore.removeDeleted(cf, gcBefore)
: cf;
+                ColumnFamily cfPurged = shouldPurge ? ColumnFamilyStore.removeDeleted(cf,
gcBefore) : cf;
                 if (cfPurged == null)
                     return null;
                 ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java Sun
Sep 19 16:37:50 2010
@@ -37,7 +37,7 @@ public class IteratingRow extends Abstra
     private final DecoratedKey key;
     private final long finishedAt;
     private final BufferedRandomAccessFile file;
-    private SSTableReader sstable;
+    public final SSTableReader sstable;
     private long dataStart;
 
     public IteratingRow(BufferedRandomAccessFile file, SSTableReader sstable) throws IOException

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java Sun
Sep 19 16:37:50 2010
@@ -497,6 +497,11 @@ public class SSTableReader extends SSTab
         bf = BloomFilter.alwaysMatchingBloomFilter();
     }
 
+    public BloomFilter getBloomFilter()
+    {
+      return bf;
+    }
+
     public IPartitioner getPartitioner()
     {
         return partitioner;

Modified: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
(original)
+++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
Sun Sep 19 16:37:50 2010
@@ -35,9 +35,10 @@ import static org.apache.cassandra.db.Ta
 public class CompactionsPurgeTest extends CleanupHelper
 {
     public static final String TABLE1 = "Keyspace1";
+    public static final String TABLE2 = "Keyspace2";
 
     @Test
-    public void testCompactionPurge() throws IOException, ExecutionException, InterruptedException
+    public void testMajorCompactionPurge() throws IOException, ExecutionException, InterruptedException
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -72,25 +73,71 @@ public class CompactionsPurgeTest extend
         rm.apply();
         cfs.forceBlockingFlush();
 
-        // verify that non-major compaction does no GC to ensure correctness (see CASSANDRA-604)
-        Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
-        rm = new RowMutation(TABLE1, key + "x");
-        rm.add(new QueryPath(cfName, null, "0".getBytes()), new byte[0], 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-        CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, CompactionManager.getDefaultGCBefore());
-        ColumnFamily cf = cfs.getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
-        assert cf.getColumnCount() == 10;
-
         // major compact and test that all columns but the resurrected one is completely
gone
         CompactionManager.instance.submitMajor(cfs, 0, Integer.MAX_VALUE).get();
         cfs.invalidateCachedRow(key);
-        cf = cfs.getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
+        ColumnFamily cf = cfs.getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
         assertColumns(cf, "5");
         assert cf.getColumn(String.valueOf(5).getBytes()) != null;
     }
 
     @Test
+    public void testMinorCompactionPurge() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open(TABLE2);
+        String cfName = "Standard1";
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        RowMutation rm;
+        for (int k = 1; k <= 2; ++k) {
+            String key = "key" + k;
+
+            // inserts
+            rm = new RowMutation(TABLE2, key);
+            for (int i = 0; i < 10; i++)
+            {
+                rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), new byte[0],
0);
+            }
+            rm.apply();
+            cfs.forceBlockingFlush();
+
+            // deletes
+            for (int i = 0; i < 10; i++)
+            {
+                rm = new RowMutation(TABLE2, key);
+                rm.delete(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 1);
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        String key1 = "key1";
+        String key2 = "key2";
+
+        // flush, remember the current sstable and then resurrect one column
+        // for first key. Then submit minor compaction on remembered sstables.
+        cfs.forceBlockingFlush();
+        Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
+        rm = new RowMutation(TABLE2, key1);
+        rm.add(new QueryPath(cfName, null, String.valueOf(5).getBytes()), new byte[0], 2);
+        rm.apply();
+        cfs.forceBlockingFlush();
+        CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, Integer.MAX_VALUE);
+
+        // verify that minor compaction does not GC when key is present
+        // in a non-compacted sstable
+        ColumnFamily cf = cfs.getColumnFamily(new IdentityQueryFilter(key1, new QueryPath(cfName)));
+        assert cf.getColumnCount() == 10;
+
+        // verify that minor compaction does GC when key is provably not
+        // present in a non-compacted sstable
+        cf = cfs.getColumnFamily(new IdentityQueryFilter(key2, new QueryPath(cfName)));
+        assert cf == null;
+    }
+
+    @Test
     public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException
     {
         CompactionManager.instance.disableAutoCompaction();



Mime
View raw message