cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1143352 - in /cassandra/branches/cassandra-0.8: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ test/unit/org/apache/cassandra/db/compaction/
Date Wed, 06 Jul 2011 11:34:50 GMT
Author: slebresne
Date: Wed Jul  6 11:34:50 2011
New Revision: 1143352

URL: http://svn.apache.org/viewvc?rev=1143352&view=rev
Log:
Handle row tombstones correctly in EchoedRow
patch by slebresne; reviewed by jbellis for CASSANDRA-2786

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jul  6 11:34:50 2011
@@ -15,6 +15,7 @@
  * fix index-building status display (CASSANDRA-2853)
  * fix CLI perpetuating obsolete KsDef.replication_factor (CASSANDRA-2846)
  * improve cli treatment of multiline comments (CASSANDRA-2852)
+ * handle row tombstones correctly in EchoedRow (CASSANDRA-2786)
 
 
 0.8.1

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java Wed Jul
 6 11:34:50 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 
 /**
@@ -35,11 +36,13 @@ import org.apache.cassandra.io.sstable.S
 public class EchoedRow extends AbstractCompactedRow
 {
     private final SSTableIdentityIterator row;
+    private final int gcBefore;
 
-    public EchoedRow(SSTableIdentityIterator row)
+    public EchoedRow(CompactionController controller, SSTableIdentityIterator row)
     {
         super(row.getKey());
         this.row = row;
+        this.gcBefore = controller.gcBefore;
         // Reset SSTableIdentityIterator because we have not guarantee the filePointer hasn't
moved since the Iterator was built
         row.reset();
     }
@@ -59,7 +62,7 @@ public class EchoedRow extends AbstractC
 
     public boolean isEmpty()
     {
-        return !row.hasNext();
+        return !row.hasNext() && ColumnFamilyStore.removeDeletedCF(row.getColumnFamily(),
gcBefore) == null;
     }
 
     public int columnCount()

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
Wed Jul  6 11:34:50 2011
@@ -113,7 +113,7 @@ public class CompactionController
     public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
     {
         if (rows.size() == 1 && !needDeserialize() && !shouldPurge(rows.get(0).getKey()))
-            return new EchoedRow(rows.get(0));
+            return new EchoedRow(this, rows.get(0));
 
         long rowSize = 0;
         for (SSTableIdentityIterator row : rows)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Wed Jul  6 11:34:50 2011
@@ -412,7 +412,8 @@ public class CompactionManager implement
                         // success: perform the compaction
                         try
                         {
-                            doCompactionWithoutSizeEstimation(cfs, sstables, gcBefore, location);
+                            // Forcing deserialization because in case the user wants expired
columns to be transformed to tombstones
+                            doCompactionWithoutSizeEstimation(cfs, sstables, gcBefore, location,
true);
                         }
                         finally
                         {
@@ -501,7 +502,7 @@ public class CompactionManager implement
         {
             String compactionFileLocation = table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
             if (compactionFileLocation != null)
-                return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, gcBefore,
compactionFileLocation);
+                return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, gcBefore,
compactionFileLocation, false);
 
             logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables,
", "));
             smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
@@ -515,7 +516,7 @@ public class CompactionManager implement
      * For internal use and testing only.  The rest of the system should go through the submit*
methods,
      * which are properly serialized.
      */
-    int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, int gcBefore, String compactionFileLocation) throws IOException
+    int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, int gcBefore, String compactionFileLocation, boolean forceDeserialize) throws IOException
     {
         // The collection of sstables passed may be empty (but not null); even if
         // it is not empty, it may compact down to nothing if all rows are deleted.
@@ -529,10 +530,6 @@ public class CompactionManager implement
         for (SSTableReader sstable : sstables)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
-        // compaction won't normally compact a single sstable, so if that's what we're doing
-        // it must have been requested manually by the user, which probably means he wants
to force
-        // tombstone purge, which won't happen unless we force deserializing the rows.
-        boolean forceDeserialize = sstables.size() == 1;
         CompactionController controller = new CompactionController(cfs, sstables, gcBefore,
forceDeserialize);
         // new sstables from flush can be added during a compaction, but only the compaction
can remove them,
         // so in our single-threaded compaction world this is a valid way of determining
if we're compacting

Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
(original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
Wed Jul  6 11:34:50 2011
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Set;
@@ -181,27 +182,54 @@ public class CompactionsTest extends Cle
             if (i % 2 == 0)
                 store.forceBlockingFlush();
         }
+        Collection<SSTableReader> toCompact = store.getSSTables();
 
-        // Force compaction. Since each row is in only one sstable, we will be using EchoedRow.
-        CompactionManager.instance.performMajor(store);
+        // Reinserting the same keys. We will compact only the previous sstable, but we need
those new ones
+        // to make sure we use EchoedRow, otherwise it won't be used because purge can be
done.
+        for (int i=1; i < 5; i++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(i));
+            RowMutation rm = new RowMutation(TABLE1, key.key);
+            rm.add(new QueryPath("Standard2", null, ByteBufferUtil.bytes(String.valueOf(i))),
ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        SSTableReader tmpSSTable = null;
+        for (SSTableReader sstable : store.getSSTables())
+            if (!toCompact.contains(sstable))
+                tmpSSTable = sstable;
+
+        // Force compaction on first sstables. Since each row is in only one sstable, we
will be using EchoedRow.
+        CompactionManager.instance.doCompaction(store, toCompact, (int) (System.currentTimeMillis()
/ 1000) - store.metadata.getGcGraceSeconds());
+
+        // Now, we remove the sstable that was just created to force the use of EchoedRow
(so that it doesn't hide the problem)
+        store.markCompacted(Collections.singleton(tmpSSTable));
 
-        // Now assert we do have the two keys
+        // Now assert we do have the 4 keys
         assertEquals(4, Util.getRangeSlice(store).size());
     }
 
     @Test
     public void testDontPurgeAccidentaly() throws IOException, ExecutionException, InterruptedException
     {
+        // Testing with and without forcing deserialization. Without deserialization, EchoedRow
will be used.
+        testDontPurgeAccidentaly("test1", false);
+        testDontPurgeAccidentaly("test2", true);
+    }
+
+    private void testDontPurgeAccidentaly(String k, boolean forceDeserialize) throws IOException,
ExecutionException, InterruptedException
+    {
         // This test catches the regression of CASSANDRA-2786
         Table table = Table.open(TABLE1);
         String cfname = "Super5";
         ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
 
         // disable compaction while flushing
+        store.removeAllSSTables();
         store.disableAutoCompaction();
 
         // Add test row
-        DecoratedKey key = Util.dk("test");
+        DecoratedKey key = Util.dk(k);
         RowMutation rm = new RowMutation(TABLE1, key.key);
         rm.add(new QueryPath(cfname, ByteBufferUtil.bytes("sc"), ByteBufferUtil.bytes("c")),
ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
         rm.apply();
@@ -210,11 +238,10 @@ public class CompactionsTest extends Cle
 
         Collection<SSTableReader> sstablesBefore = store.getSSTables();
 
-        QueryFilter filter = QueryFilter.getIdentityFilter(Util.dk("test"), new QueryPath(cfname,
null, null));
+        QueryFilter filter = QueryFilter.getIdentityFilter(key, new QueryPath(cfname, null,
null));
         assert !store.getColumnFamily(filter).isEmpty();
 
         // Remove key
-        key = Util.dk("test");
         rm = new RowMutation(TABLE1, key.key);
         rm.delete(new QueryPath(cfname, null, null), 2);
         rm.apply();
@@ -225,12 +252,13 @@ public class CompactionsTest extends Cle
         store.forceBlockingFlush();
 
         Collection<SSTableReader> sstablesAfter = store.getSSTables();
-        Collection<Descriptor> toCompact = new ArrayList<Descriptor>();
+        Collection<SSTableReader> toCompact = new ArrayList<SSTableReader>();
         for (SSTableReader sstable : sstablesAfter)
             if (!sstablesBefore.contains(sstable))
-                toCompact.add(sstable.descriptor);
+                toCompact.add(sstable);
 
-        CompactionManager.instance.submitUserDefined(store, toCompact, (int) (System.currentTimeMillis()
/ 1000) - store.metadata.getGcGraceSeconds()).get();
+        String location = store.table.getDataFileLocation(1);
+        CompactionManager.instance.doCompactionWithoutSizeEstimation(store, toCompact, (int)
(System.currentTimeMillis() / 1000) - store.metadata.getGcGraceSeconds(), location, forceDeserialize);
 
         cf = store.getColumnFamily(filter);
         assert cf.isEmpty() : "should be empty: " + cf;



Mime
View raw message