cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1126363 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/marshal/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/i...
Date Mon, 23 May 2011 07:53:49 GMT
Author: slebresne
Date: Mon May 23 07:53:48 2011
New Revision: 1126363

URL: http://svn.apache.org/viewvc?rev=1126363&view=rev
Log:
merge from 0.8

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
      - copied unchanged from r1126356, cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 23 07:53:48 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7:1026516-1125002
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1125101
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1125101,1126356
 /cassandra/branches/cassandra-0.8.0:1125021-1125100
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon May 23 07:53:48 2011
@@ -14,6 +14,7 @@
    buffers again, especially on CL writes (CASSANDRA-2660)
  * add DROP INDEX support to CLI (CASSANDRA-2616)
  * don't perform HH to client-mode [storageproxy] nodes (CASSANDRA-2668)
+ * Improve forceDeserialize/getCompactedRow encapsulation (CASSANDRA-2659)
 
 
 0.8.0-final

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 23 07:53:48 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
 /cassandra/branches/cassandra-0.7/contrib:1026516-1125002
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1125101
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1125101,1126356
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1125100
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 23 07:53:48 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1125002
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1125101
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1125101,1126356
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1125100
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 23 07:53:48 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1125002
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1125101
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1125101,1126356
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1125100
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 23 07:53:48 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1125002
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1125101
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1125101,1126356
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1125100
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 23 07:53:48 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1125002
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1125101
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1125101,1126356
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1125100
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 23 07:53:48 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1125002
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1125101
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1125101,1126356
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1125100
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 23 07:53:48
2011
@@ -968,9 +968,9 @@ public class ColumnFamilyStore implement
         data.markCompacted(sstables);
     }
 
-    boolean isCompleteSSTables(Collection<SSTableReader> sstables)
+    public boolean isCompleteSSTables(Set<SSTableReader> sstables)
     {
-        return data.getSSTables().equals(new HashSet<SSTableReader>(sstables));
+        return data.getSSTables().equals(sstables);
     }
 
     void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader>
replacements)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon May 23 07:53:48
2011
@@ -18,13 +18,11 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataOutput;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
@@ -128,7 +126,7 @@ public class CompactionManager implement
                     logger.debug("Checking to see if compaction of " + cfs.columnFamily +
" would be useful");
                     Set<List<SSTableReader>> buckets = getBuckets(convertSSTablesToPairs(cfs.getSSTables()),
50L * 1024L * 1024L);
                     updateEstimateFor(cfs, buckets);
-                    int gcBefore = cfs.isIndex() ? Integer.MAX_VALUE : getDefaultGcBefore(cfs);
+                    int gcBefore = getDefaultGcBefore(cfs);
                     
                     for (List<SSTableReader> sstables : buckets)
                     {
@@ -529,11 +527,15 @@ public class CompactionManager implement
         for (SSTableReader sstable : sstables)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
+        // compaction won't normally compact a single sstable, so if that's what we're doing
+        // it must have been requested manually by the user, which probably means he wants
to force
+        // tombstone purge, which won't happen unless we force deserializing the rows.
+        boolean forceDeserialize = sstables.size() == 1;
+        CompactionController controller = new CompactionController(cfs, sstables, gcBefore,
forceDeserialize);
         // new sstables from flush can be added during a compaction, but only the compaction
can remove them,
         // so in our single-threaded compaction world this is a valid way of determining
if we're compacting
         // all the sstables (that existed when we started)
-        boolean major = cfs.isCompleteSSTables(sstables);
-        CompactionType type = major
+        CompactionType type = controller.isMajor()
                             ? CompactionType.MAJOR
                             : CompactionType.MINOR;
         logger.info("Compacting {}: {}", type, sstables);
@@ -547,7 +549,6 @@ public class CompactionManager implement
           logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
         SSTableWriter writer;
-        CompactionController controller = new CompactionController(cfs, sstables, major,
gcBefore, false);
         CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain
a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -630,6 +631,7 @@ public class CompactionManager implement
         for (final SSTableReader sstable : sstables)
         {
             logger.info("Scrubbing " + sstable);
+            CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable),
getDefaultGcBefore(cfs), true);
 
             // Calculate the expected compacted filesize
             String compactionFileLocation = cfs.table.getDataFileLocation(sstable.length());
@@ -708,7 +710,7 @@ public class CompactionManager implement
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
                     SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile,
key, dataStart, dataSize, true);
-                    AbstractCompactedRow compactedRow = getCompactedRow(row, sstable.descriptor,
true);
+                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
                     if (compactedRow.isEmpty())
                     {
                         emptyRows++;
@@ -736,7 +738,7 @@ public class CompactionManager implement
                         try
                         {
                             SSTableIdentityIterator row = new SSTableIdentityIterator(sstable,
dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = getCompactedRow(row, sstable.descriptor,
true);
+                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
                             if (compactedRow.isEmpty())
                             {
                                 emptyRows++;
@@ -811,7 +813,9 @@ public class CompactionManager implement
 
         for (SSTableReader sstable : sstables)
         {
+            CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable),
getDefaultGcBefore(cfs), false);
             long startTime = System.currentTimeMillis();
+
             long totalkeysWritten = 0;
 
             int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
@@ -841,7 +845,7 @@ public class CompactionManager implement
                         if (Range.isTokenInRanges(row.getKey().token, ranges))
                         {
                             writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize,
writer, Collections.singletonList(sstable));
-                            writer.append(getCompactedRow(row, sstable.descriptor, false));
+                            writer.append(controller.getCompactedRow(row));
                             totalkeysWritten++;
                         }
                         else
@@ -906,21 +910,6 @@ public class CompactionManager implement
         }
     }
 
-    /**
-     * @return an AbstractCompactedRow implementation to write the row in question.
-     * If the data is from a current-version sstable, write it unchanged.  Otherwise,
-     * re-serialize it in the latest version. The returned AbstractCompactedRow will not
purge data.
-     */
-    private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, Descriptor
descriptor, boolean forceDeserialize)
-    {
-        if (descriptor.isLatestVersion && !forceDeserialize)
-            return new EchoedRow(row);
-
-        return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
-               ? new LazilyCompactedRow(CompactionController.getBasicController(forceDeserialize),
Arrays.asList(row))
-               : new PrecompactedRow(CompactionController.getBasicController(forceDeserialize),
Arrays.asList(row));
-    }
-
     private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation,
int expectedBloomFilterSize, SSTableWriter writer, Collection<SSTableReader> sstables)
             throws IOException
     {
@@ -1146,7 +1135,9 @@ public class CompactionManager implement
 
     private static int getDefaultGcBefore(ColumnFamilyStore cfs)
     {
-        return (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+        return cfs.isIndex()
+               ? Integer.MAX_VALUE
+               : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
     }
 
     private static class ValidationCompactionIterator extends CompactionIterator
@@ -1155,7 +1146,7 @@ public class CompactionManager implement
         {
             super(CompactionType.VALIDATION,
                   getCollatingIterator(cfs.getSSTables(), range),
-                  new CompactionController(cfs, cfs.getSSTables(), true, getDefaultGcBefore(cfs),
false));
+                  new CompactionController(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs),
true));
         }
 
         protected static CollatingIterator getCollatingIterator(Iterable<SSTableReader>
sstables, Range range) throws IOException
@@ -1277,40 +1268,6 @@ public class CompactionManager implement
         }
     }
 
-    private static class EchoedRow extends AbstractCompactedRow
-    {
-        private final SSTableIdentityIterator row;
-
-        public EchoedRow(SSTableIdentityIterator row)
-        {
-            super(row.getKey());
-            this.row = row;
-        }
-
-        public void write(DataOutput out) throws IOException
-        {
-            assert row.dataSize > 0;
-            out.writeLong(row.dataSize);
-            row.echoData(out);
-        }
-
-        public void update(MessageDigest digest)
-        {
-            // EchoedRow is not used in anti-entropy validation
-            throw new UnsupportedOperationException();
-        }
-
-        public boolean isEmpty()
-        {
-            return !row.hasNext();
-        }
-
-        public int columnCount()
-        {
-            return row.columnCount;
-        }
-    }
-
     private static class CleanupInfo implements CompactionInfo.Holder
     {
         private final SSTableReader sstable;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java Mon May 23
07:53:48 2011
@@ -100,7 +100,7 @@ public abstract class AbstractType<T> im
 
     /** get a string representation of a particular type. */
     public abstract String toString(T t);
-    
+
     /** get a string representation of the bytes suitable for log messages */
     public abstract String getString(ByteBuffer bytes);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java Mon May 23
07:53:48 2011
@@ -19,21 +19,28 @@
  */
 package org.apache.cassandra.io;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EchoedRow;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Manage compaction options.
  */
 public class CompactionController
 {
+    private static Logger logger = LoggerFactory.getLogger(CompactionController.class);
+
     private final ColumnFamilyStore cfs;
     private final Set<SSTableReader> sstables;
     private final boolean forceDeserialize;
@@ -41,41 +48,31 @@ public class CompactionController
     public final boolean isMajor;
     public final int gcBefore;
 
-    private static final CompactionController basicController = new CompactionController(null,
Collections.<SSTableReader>emptySet(), false, Integer.MAX_VALUE, false);
-    private static final CompactionController basicDeserializingController = new CompactionController(null,
Collections.<SSTableReader>emptySet(), false, Integer.MAX_VALUE, true);
-
-    public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables,
boolean isMajor, int gcBefore, boolean forceDeserialize)
+    public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables,
int gcBefore, boolean forceDeserialize)
     {
+        assert cfs != null;
         this.cfs = cfs;
-        this.isMajor = isMajor;
         this.sstables = new HashSet<SSTableReader>(sstables);
         this.gcBefore = gcBefore;
         this.forceDeserialize = forceDeserialize;
+        isMajor = cfs.isCompleteSSTables(this.sstables);
     }
 
-    /**
-     * Returns a controller that never purge
-     */
-    public static CompactionController getBasicController(boolean forceDeserialize)
-    {
-        return forceDeserialize ? basicDeserializingController : basicController;
-    }
-
-    /** @return The keyspace name: only valid if created with a non-null CFS. */
+    /** @return the keyspace name */
     public String getKeyspace()
     {
-        return cfs != null ? cfs.table.name : "n/a";
+        return cfs.table.name;
     }
 
-    /** @return The column family name: only valid if created with a non-null CFS. */
+    /** @return the column family name */
     public String getColumnFamily()
     {
-        return cfs != null ? cfs.columnFamily : "n/a";
+        return cfs.columnFamily;
     }
 
     public boolean shouldPurge(DecoratedKey key)
     {
-        return isMajor || (cfs != null && !cfs.isKeyInRemainingSSTables(key, sstables));
+        return isMajor || !cfs.isKeyInRemainingSSTables(key, sstables);
     }
 
     public boolean needDeserialize()
@@ -92,18 +89,50 @@ public class CompactionController
 
     public void invalidateCachedRow(DecoratedKey key)
     {
-        if (cfs != null)
-            cfs.invalidateCachedRow(key);
+        cfs.invalidateCachedRow(key);
     }
 
     public void removeDeletedInCache(DecoratedKey key)
     {
-        if (cfs != null)
+        ColumnFamily cachedRow = cfs.getRawCachedRow(key);
+        if (cachedRow != null)
+            ColumnFamilyStore.removeDeleted(cachedRow, gcBefore);
+    }
+
+    public boolean isMajor()
+    {
+        return isMajor;
+    }
+
+    /**
+     * @return an AbstractCompactedRow implementation to write the merged rows in question.
+     *
+     * If there is a single source row, the data is from a current-version sstable,
+     * and we aren't forcing deserialization for scrub,
+     * write it unchanged.  Otherwise, we deserialize, purge tombstones, and
+     * reserialize in the latest version.
+     */
+    public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
+    {
+        if (rows.size() == 1 && !needDeserialize())
+            return new EchoedRow(rows.get(0));
+
+        long rowSize = 0;
+        for (SSTableIdentityIterator row : rows)
+            rowSize += row.dataSize;
+
+        if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())
         {
-            ColumnFamily cachedRow = cfs.getRawCachedRow(key);
-            if (cachedRow != null)
-                ColumnFamilyStore.removeDeleted(cachedRow, gcBefore);
+            logger.info(String.format("Compacting large row %s (%d bytes) incrementally",
+                                      ByteBufferUtil.bytesToHex(rows.get(0).getKey().key),
rowSize));
+            return new LazilyCompactedRow(this, rows);
         }
+        return new PrecompactedRow(this, rows);
     }
 
+    /** convenience method for single-sstable compactions */
+    public AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row)
+    {
+        return getCompactedRow(Collections.singletonList(row));
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Mon May 23 07:53:48
2011
@@ -38,7 +38,6 @@ import org.apache.cassandra.io.sstable.S
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ReducingIterator;
 
@@ -122,7 +121,7 @@ implements Closeable, CompactionInfo.Hol
 
         try
         {
-            AbstractCompactedRow compactedRow = getCompactedRow();
+            AbstractCompactedRow compactedRow = controller.getCompactedRow(rows);
             if (compactedRow.isEmpty())
             {
                 controller.invalidateCachedRow(compactedRow.key);
@@ -151,23 +150,6 @@ implements Closeable, CompactionInfo.Hol
         }
     }
 
-    protected AbstractCompactedRow getCompactedRow()
-    {
-        long rowSize = 0;
-        for (SSTableIdentityIterator row : rows)
-        {
-            rowSize += row.dataSize;
-        }
-
-        if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-        {
-            logger.info(String.format("Compacting large row %s (%d bytes) incrementally",
-                                      ByteBufferUtil.bytesToHex(rows.get(0).getKey().key),
rowSize));
-            return new LazilyCompactedRow(controller, rows);
-        }
-        return new PrecompactedRow(controller, rows);
-    }
-
     private void throttle()
     {
         if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Mon May 23 07:53:48
2011
@@ -93,15 +93,6 @@ public class LazilyCompactedRow extends 
 
     public void write(DataOutput out) throws IOException
     {
-        if (rows.size() == 1 && !shouldPurge && !controller.needDeserialize())
-        {
-            SSTableIdentityIterator row = rows.get(0);
-            assert row.dataSize > 0;
-            out.writeLong(row.dataSize);
-            row.echoData(out);
-            return;
-        }
-
         DataOutputBuffer clockOut = new DataOutputBuffer();
         ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon May 23
07:53:48 2011
@@ -494,9 +494,7 @@ public class SSTableWriter extends SSTab
             long rows = 0L;
             DecoratedKey key;
 
-            CompactionController controller = CompactionController.getBasicController(true);
-
-            long dfileLength = dfile.length();
+            CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(),
Integer.MAX_VALUE, true);
             while (!dfile.isEOF())
             {
                 // read key
@@ -506,17 +504,7 @@ public class SSTableWriter extends SSTab
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
                 SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata,
dfile, key, dfile.getFilePointer(), dataSize, true);
 
-                AbstractCompactedRow row;
-                if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-                {
-                    logger.info(String.format("Rebuilding post-streaming large counter row
%s (%d bytes) incrementally", ByteBufferUtil.bytesToHex(key.key), dataSize));
-                    row = new LazilyCompactedRow(controller, Collections.singletonList(iter));
-                }
-                else
-                {
-                    row = new PrecompactedRow(controller, Collections.singletonList(iter));
-                }
-
+                AbstractCompactedRow row = controller.getCompactedRow(iter);
                 updateCache(key, dataSize, row);
 
                 rowSizes.add(dataSize);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1126363&r1=1126362&r2=1126363&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Mon May
23 07:53:48 2011
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.CleanupHelper;
@@ -40,6 +41,7 @@ import org.apache.cassandra.db.RowMutati
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.MappedFileDataInput;
@@ -53,12 +55,11 @@ import org.junit.Test;
 
 public class LazilyCompactedRowTest extends CleanupHelper
 {
-    private void assertBytes(ColumnFamilyStore cfs, int gcBefore, boolean major) throws IOException
+    private void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionController controller = new CompactionController(cfs, sstables, major,
gcBefore, false);
-        CompactionIterator ci1 = new PreCompactingIterator(sstables, controller);
-        CompactionIterator ci2 = new LazyCompactionIterator(sstables, controller);
+        CompactionIterator ci1 = new CompactionIterator(CompactionType.UNKNOWN, sstables,
new PreCompactingController(cfs, sstables, gcBefore, false));
+        CompactionIterator ci2 = new CompactionIterator(CompactionType.UNKNOWN, sstables,
new LazilyCompactingController(cfs, sstables, gcBefore, false));
 
         while (true)
         {
@@ -129,12 +130,11 @@ public class LazilyCompactedRowTest exte
         }
     }
     
-    private void assertDigest(ColumnFamilyStore cfs, int gcBefore, boolean major) throws
IOException, NoSuchAlgorithmException
+    private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws IOException, NoSuchAlgorithmException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionController controller = new CompactionController(cfs, sstables, major,
gcBefore, false);
-        CompactionIterator ci1 = new PreCompactingIterator(sstables, controller);
-        CompactionIterator ci2 = new LazyCompactionIterator(sstables, controller);
+        CompactionIterator ci1 = new CompactionIterator(CompactionType.UNKNOWN, sstables,
new PreCompactingController(cfs, sstables, gcBefore, false));
+        CompactionIterator ci2 = new CompactionIterator(CompactionType.UNKNOWN, sstables,
new LazilyCompactingController(cfs, sstables, gcBefore, false));
 
         while (true)
         {
@@ -170,8 +170,8 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -189,8 +189,8 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -211,8 +211,8 @@ public class LazilyCompactedRowTest exte
         assert out.getLength() > DatabaseDescriptor.getColumnIndexSize();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -232,8 +232,8 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -254,8 +254,8 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -277,8 +277,8 @@ public class LazilyCompactedRowTest exte
             cfs.forceBlockingFlush();
         }
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
-        assertDigest(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
+        assertDigest(cfs, Integer.MAX_VALUE);
     }
 
     @Test
@@ -299,35 +299,35 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs, Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE);
     }
 
 
-    private static class LazyCompactionIterator extends CompactionIterator
+    private static class LazilyCompactingController extends CompactionController
     {
-        public LazyCompactionIterator(Iterable<SSTableReader> sstables, CompactionController
controller) throws IOException
+        public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, int gcBefore, boolean forceDeserialize)
         {
-            super(CompactionType.UNKNOWN, sstables, controller);
+            super(cfs, sstables, gcBefore, forceDeserialize);
         }
 
         @Override
-        protected AbstractCompactedRow getCompactedRow()
+        public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
         {
-            return new LazilyCompactedRow(controller, rows);
+            return new LazilyCompactedRow(this, rows);
         }
     }
 
-    private static class PreCompactingIterator extends CompactionIterator
+    private static class PreCompactingController extends CompactionController
     {
-        public PreCompactingIterator(Iterable<SSTableReader> sstables, CompactionController
controller) throws IOException
+        public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, int gcBefore, boolean forceDeserialize)
         {
-            super(CompactionType.UNKNOWN, sstables, controller);
+            super(cfs, sstables, gcBefore, forceDeserialize);
         }
 
         @Override
-        protected AbstractCompactedRow getCompactedRow()
+        public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
         {
-            return new PrecompactedRow(controller, rows);
+            return new PrecompactedRow(this, rows);
         }
     }
 }



Mime
View raw message