cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1089993 - in /cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/ test/unit/org/apache/cassandra/io/
Date Thu, 07 Apr 2011 20:31:51 GMT
Author: slebresne
Date: Thu Apr  7 20:31:51 2011
New Revision: 1089993

URL: http://svn.apache.org/viewvc?rev=1089993&view=rev
Log:
Use {Lazy|Pre}CompactedRow for CommutativeRowIndexer
patch by slebresne; reviewed by stuhood for CASSANDRA-2313

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Apr  7 20:31:51
2011
@@ -844,7 +844,7 @@ public class ColumnFamilyStore implement
      * present in the checked SSTables, but positive ones doesn't ensure key
      * presence.
      */
-    public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<SSTable> sstablesToIgnore)
+    public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends SSTable>
sstablesToIgnore)
     {
         for (SSTableReader sstable : data.getSSTables())
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Thu Apr  7 20:31:51
2011
@@ -430,7 +430,8 @@ public class CompactionManager implement
           logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
         SSTableWriter writer;
-        CompactionIterator ci = new CompactionIterator(cfs, sstables, gcBefore, major); //
retain a handle so we can call close()
+        CompactionController controller = new CompactionController(cfs, sstables, major,
gcBefore, false);
+        CompactionIterator ci = new CompactionIterator(sstables, controller); // retain a
handle so we can call close()
         Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
         executor.beginCompaction(cfs.columnFamily, ci);
 
@@ -590,7 +591,7 @@ public class CompactionManager implement
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
                     SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile,
key, dataStart, dataSize, true);
-                    AbstractCompactedRow compactedRow = getCompactedRow(row, cfs, sstable.descriptor,
true);
+                    AbstractCompactedRow compactedRow = getCompactedRow(row, sstable.descriptor,
true);
                     if (compactedRow.isEmpty())
                     {
                         emptyRows++;
@@ -618,7 +619,7 @@ public class CompactionManager implement
                         try
                         {
                             SSTableIdentityIterator row = new SSTableIdentityIterator(sstable,
dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = getCompactedRow(row, cfs,
sstable.descriptor, true);
+                            AbstractCompactedRow compactedRow = getCompactedRow(row, sstable.descriptor,
true);
                             if (compactedRow.isEmpty())
                             {
                                 emptyRows++;
@@ -715,7 +716,7 @@ public class CompactionManager implement
                     if (Range.isTokenInRanges(row.getKey().token, ranges))
                     {
                         writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize,
writer);
-                        writer.append(getCompactedRow(row, cfs, sstable.descriptor, false));
+                        writer.append(getCompactedRow(row, sstable.descriptor, false));
                         totalkeysWritten++;
                     }
                     else
@@ -776,16 +777,16 @@ public class CompactionManager implement
     /**
      * @return an AbstractCompactedRow implementation to write the row in question.
      * If the data is from a current-version sstable, write it unchanged.  Otherwise,
-     * re-serialize it in the latest version.
+     * re-serialize it in the latest version. The returned AbstractCompactedRow will not
purge data.
      */
-    private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore
cfs, Descriptor descriptor, boolean forceDeserialize)
+    private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, Descriptor
descriptor, boolean forceDeserialize)
     {
         if (descriptor.isLatestVersion && !forceDeserialize)
             return new EchoedRow(row);
 
         return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
-               ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs),
forceDeserialize)
-               : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs),
forceDeserialize);
+               ? new LazilyCompactedRow(CompactionController.getBasicController(forceDeserialize),
Arrays.asList(row))
+               : new PrecompactedRow(CompactionController.getBasicController(forceDeserialize),
Arrays.asList(row));
     }
 
     private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation,
int expectedBloomFilterSize, SSTableWriter writer)
@@ -980,7 +981,7 @@ public class CompactionManager implement
     {
         public ValidationCompactionIterator(ColumnFamilyStore cfs) throws IOException
         {
-            super(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true);
+            super(cfs.getSSTables(), new CompactionController(cfs, cfs.getSSTables(), true,
getDefaultGcBefore(cfs), false));
         }
 
         @Override

Added: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java?rev=1089993&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java Thu Apr  7
20:31:51 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.io;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.SSTableReader;
+
+/**
+ * Manage compaction options.
+ */
+public class CompactionController
+{
+    private final ColumnFamilyStore cfs;
+    private final Set<SSTableReader> sstables;
+    private final boolean forceDeserialize;
+
+    public final boolean isMajor;
+    public final int gcBefore;
+
+    private static final CompactionController basicController = new CompactionController(null,
Collections.<SSTableReader>emptySet(), false, Integer.MAX_VALUE, false);
+    private static final CompactionController basicDeserializingController = new CompactionController(null,
Collections.<SSTableReader>emptySet(), false, Integer.MAX_VALUE, true);
+
+    public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables,
boolean isMajor, int gcBefore, boolean forceDeserialize)
+    {
+        this.cfs = cfs;
+        this.isMajor = isMajor;
+        this.sstables = new HashSet<SSTableReader>(sstables);
+        this.gcBefore = gcBefore;
+        this.forceDeserialize = forceDeserialize;
+    }
+
+    /**
+     * Returns a controller that never purge
+     */
+    public static CompactionController getBasicController(boolean forceDeserialize)
+    {
+        return forceDeserialize ? basicDeserializingController : basicController;
+    }
+
+    public boolean shouldPurge(DecoratedKey key)
+    {
+        return isMajor || (cfs != null && !cfs.isKeyInRemainingSSTables(key, sstables));
+    }
+
+    public boolean needDeserialize()
+    {
+        if (forceDeserialize)
+            return true;
+
+        for (SSTableReader sstable : sstables)
+            if (!sstable.descriptor.isLatestVersion)
+                return true;
+
+        return false;
+    }
+
+    public void invalidateCachedRow(DecoratedKey key)
+    {
+        if (cfs != null)
+            cfs.invalidateCachedRow(key);
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Thu Apr  7 20:31:51
2011
@@ -24,8 +24,10 @@ package org.apache.cassandra.io;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.collections.iterators.CollatingIterator;
 import org.slf4j.Logger;
@@ -49,32 +51,28 @@ implements Closeable, ICompactionInfo
     public static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
     protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
-    private final ColumnFamilyStore cfs;
-    private final int gcBefore;
-    private final boolean major;
+    protected final CompactionController controller;
 
     private long totalBytes;
     private long bytesRead;
     private long row;
 
-    public CompactionIterator(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables,
int gcBefore, boolean major) throws IOException
+    public CompactionIterator(Iterable<SSTableReader> sstables, CompactionController
controller) throws IOException
     {
-        this(cfs, getCollatingIterator(sstables), gcBefore, major);
+        this(getCollatingIterator(sstables), controller);
     }
 
     @SuppressWarnings("unchecked")
-    protected CompactionIterator(ColumnFamilyStore cfs, Iterator iter, int gcBefore, boolean
major)
+    protected CompactionIterator(Iterator iter, CompactionController controller)
     {
         super(iter);
+        this.controller = controller;
         row = 0;
         totalBytes = bytesRead = 0;
         for (SSTableScanner scanner : getScanners())
         {
             totalBytes += scanner.getFileLength();
         }
-        this.cfs = cfs;
-        this.gcBefore = gcBefore;
-        this.major = major;
     }
 
     @SuppressWarnings("unchecked")
@@ -109,7 +107,7 @@ implements Closeable, ICompactionInfo
             AbstractCompactedRow compactedRow = getCompactedRow();
             if (compactedRow.isEmpty())
             {
-                cfs.invalidateCachedRow(compactedRow.key);
+                controller.invalidateCachedRow(compactedRow.key);
                 return null;
             }
             else
@@ -143,9 +141,9 @@ implements Closeable, ICompactionInfo
         {
             logger.info(String.format("Compacting large row %s (%d bytes) incrementally",
                                       ByteBufferUtil.bytesToHex(rows.get(0).getKey().key),
rowSize));
-            return new LazilyCompactedRow(cfs, rows, major, gcBefore, false);
+            return new LazilyCompactedRow(controller, rows);
         }
-        return new PrecompactedRow(cfs, rows, major, gcBefore, false);
+        return new PrecompactedRow(controller, rows);
     }
 
     public void close() throws IOException
@@ -170,6 +168,6 @@ implements Closeable, ICompactionInfo
 
     public String getTaskType()
     {
-        return major ? "Major" : "Minor";
+        return controller.isMajor ? "Major" : "Minor";
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Thu Apr  7 20:31:51
2011
@@ -57,26 +57,23 @@ import org.apache.cassandra.utils.Reduci
 public class LazilyCompactedRow extends AbstractCompactedRow implements IIterableColumns
 {
     private final List<SSTableIdentityIterator> rows;
+    private final CompactionController controller;
     private final boolean shouldPurge;
-    private final int gcBefore;
     private final DataOutputBuffer headerBuffer;
-    private final boolean forceDeserialize;
     private ColumnFamily emptyColumnFamily;
     private LazyColumnIterator iter;
     private int columnCount;
     private long columnSerializedSize;
 
-    public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator>
rows, boolean major, int gcBefore, boolean forceDeserialize)
+    public LazilyCompactedRow(CompactionController controller, List<SSTableIdentityIterator>
rows)
     {
         super(rows.get(0).getKey());
-        this.gcBefore = gcBefore;
-        this.forceDeserialize = forceDeserialize;
+        this.controller = controller;
+        this.shouldPurge = controller.shouldPurge(key);
         this.rows = new ArrayList<SSTableIdentityIterator>(rows);
 
-        Set<SSTable> sstables = new HashSet<SSTable>();
         for (SSTableIdentityIterator row : rows)
         {
-            sstables.add(row.sstable);
             ColumnFamily cf = row.getColumnFamily();
 
             if (emptyColumnFamily == null)
@@ -84,7 +81,6 @@ public class LazilyCompactedRow extends 
             else
                 emptyColumnFamily.delete(cf);
         }
-        this.shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key, sstables);
 
         // initialize row header so isEmpty can be called
         headerBuffer = new DataOutputBuffer();
@@ -97,7 +93,7 @@ public class LazilyCompactedRow extends 
 
     public void write(DataOutput out) throws IOException
     {
-        if (rows.size() == 1 && !shouldPurge && rows.get(0).sstable.descriptor.isLatestVersion
&& !forceDeserialize)
+        if (rows.size() == 1 && !shouldPurge && !controller.needDeserialize())
         {
             SSTableIdentityIterator row = rows.get(0);
             assert row.dataSize > 0;
@@ -150,7 +146,7 @@ public class LazilyCompactedRow extends 
 
     public boolean isEmpty()
     {
-        boolean cfIrrelevant = ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, gcBefore)
== null;
+        boolean cfIrrelevant = ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore)
== null;
         return cfIrrelevant && columnCount == 0;
     }
 
@@ -173,14 +169,7 @@ public class LazilyCompactedRow extends 
         {
             row.reset();
         }
-        Comparator<IColumn> nameComparator = new Comparator<IColumn>()
-        {
-            public int compare(IColumn o1, IColumn o2)
-            {
-                return getComparator().compare(o1.name(), o2.name());
-            }
-        };
-        iter = new LazyColumnIterator(new CollatingIterator(nameComparator, rows));
+        iter = new LazyColumnIterator(new CollatingIterator(getComparator().columnComparator,
rows));
         return Iterators.filter(iter, Predicates.notNull());
     }
 
@@ -215,10 +204,10 @@ public class LazilyCompactedRow extends 
         {
             assert container != null;
             IColumn reduced = container.iterator().next();
-            ColumnFamily purged = shouldPurge ? ColumnFamilyStore.removeDeleted(container,
gcBefore) : container;
+            ColumnFamily purged = shouldPurge ? ColumnFamilyStore.removeDeleted(container,
controller.gcBefore) : container;
             if (purged != null && purged.metadata().getDefaultValidator().isCommutative())
             {
-                CounterColumn.removeOldShards(purged, gcBefore);
+                CounterColumn.removeOldShards(purged, controller.gcBefore);
             }
             if (purged == null || !purged.iterator().hasNext())
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Thu Apr  7 20:31:51
2011
@@ -56,17 +56,10 @@ public class PrecompactedRow extends Abs
         this.compactedCf = compacted;
     }
 
-    public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator>
rows, boolean major, int gcBefore, boolean forceDeserialize)
+    public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator>
rows)
     {
         super(rows.get(0).getKey());
 
-        Set<SSTable> sstables = new HashSet<SSTable>();
-        for (SSTableIdentityIterator row : rows)
-        {
-            sstables.add(row.sstable);
-        }
-        boolean shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key, sstables);
-
         ColumnFamily cf = null;
         for (SSTableIdentityIterator row : rows)
         {
@@ -89,10 +82,10 @@ public class PrecompactedRow extends Abs
                 cf.addAll(thisCF);
             }
         }
-        compactedCf = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
+        compactedCf = controller.shouldPurge(key) ? ColumnFamilyStore.removeDeleted(cf, controller.gcBefore)
: cf;
         if (compactedCf != null && compactedCf.metadata().getDefaultValidator().isCommutative())
         {
-            CounterColumn.removeOldShards(compactedCf, gcBefore);
+            CounterColumn.removeOldShards(compactedCf, controller.gcBefore);
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
Thu Apr  7 20:31:51 2011
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
@@ -44,9 +45,9 @@ public class SSTableIdentityIterator imp
     private final DecoratedKey key;
     private final long finishedAt;
     private final BufferedRandomAccessFile file;
-    public final SSTableReader sstable;
     private final long dataStart;
     public final long dataSize;
+    public final boolean fromRemote;
 
     private final ColumnFamily columnFamily;
     public final int columnCount;
@@ -73,12 +74,25 @@ public class SSTableIdentityIterator imp
     public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file,
DecoratedKey key, long dataStart, long dataSize, boolean deserializeRowHeader)
     throws IOException
     {
-        this.sstable = sstable;
+        this(sstable.metadata, file, key, dataStart, dataSize, deserializeRowHeader, sstable,
false);
+    }
+
+    public SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey
key, long dataStart, long dataSize, boolean fromRemote)
+    throws IOException
+    {
+        this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
+    }
+
+    // sstable may be null *if* deserializeRowHeader is false
+    private SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey
key, long dataStart, long dataSize, boolean deserializeRowHeader, SSTableReader sstable, boolean
fromRemote)
+    throws IOException
+    {
         this.file = file;
         this.key = key;
         this.dataStart = dataStart;
         this.dataSize = dataSize;
         this.expireBefore = (int)(System.currentTimeMillis() / 1000);
+        this.fromRemote = fromRemote;
         finishedAt = dataStart + dataSize;
 
         try
@@ -111,7 +125,7 @@ public class SSTableIdentityIterator imp
 
             IndexHelper.skipBloomFilter(file);
             IndexHelper.skipIndex(file);
-            columnFamily = sstable.createColumnFamily();
+            columnFamily = ColumnFamily.create(metadata);
             ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, file);
             columnCount = file.readInt();
             columnPosition = file.getFilePointer();
@@ -141,7 +155,7 @@ public class SSTableIdentityIterator imp
     {
         try
         {
-            return sstable.getColumnSerializer().deserialize(file, null, false, expireBefore);
+            return columnFamily.getColumnSerializer().deserialize(file, null, fromRemote,
expireBefore);
         }
         catch (IOException e)
         {
@@ -177,7 +191,7 @@ public class SSTableIdentityIterator imp
     {
         file.seek(columnPosition - 4); // seek to before column count int
         ColumnFamily cf = columnFamily.cloneMeShallow();
-        ColumnFamily.serializer().deserializeColumns(file, cf, false, false);
+        ColumnFamily.serializer().deserializeColumns(file, cf, false, fromRemote);
         return cf;
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Thu Apr 
7 20:31:51 2011
@@ -41,7 +41,7 @@ public class SSTableScanner implements I
     private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class);
 
     private final BufferedRandomAccessFile file;
-    private final SSTableReader sstable;
+    public final SSTableReader sstable;
     private IColumnIterator row;
     private boolean exhausted = false;
     private Iterator<IColumnIterator> iterator;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Apr  7
20:31:51 2011
@@ -41,7 +41,10 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.marshal.AbstractCommutativeType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.AbstractCompactedRow;
+import org.apache.cassandra.io.CompactionController;
 import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.LazilyCompactedRow;
+import org.apache.cassandra.io.PrecompactedRow;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.FileUtils;
@@ -446,69 +449,54 @@ public class SSTableWriter extends SSTab
             EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
             EstimatedHistogram columnCounts = SSTable.defaultColumnHistogram();
             long rows = 0L;
-            ByteBuffer diskKey;
             DecoratedKey key;
 
-            long readRowPosition  = 0L;
-            long writeRowPosition = 0L;
-
-            writerDfile.seek(writeRowPosition);
-            dfile.seek(readRowPosition);
+            CompactionController controller = CompactionController.getBasicController(true);
 
             long dfileLength = dfile.length();
-            while (readRowPosition < dfileLength)
+            while (!dfile.isEOF())
             {
                 // read key
-                diskKey = ByteBufferUtil.readWithShortLength(dfile);
+                key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(dfile));
 
                 // skip data size, bloom filter, column index
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
-                dfile.skipBytes(dfile.readInt());
-                dfile.skipBytes(dfile.readInt());
+                SSTableIdentityIterator iter = new SSTableIdentityIterator(metadata, dfile,
key, dfile.getFilePointer(), dataSize, true);
 
-                // deserialize CF
-                ColumnFamily cf = ColumnFamily.create(desc.ksname, desc.cfname);
-                ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
-                // The data is coming from another host
-                ColumnFamily.serializer().deserializeColumns(dfile, cf, false, true);
-                rowSizes.add(dataSize);
-                columnCounts.add(cf.getEstimatedColumnCount());
+                AbstractCompactedRow row;
+                if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+                {
+                    logger.info(String.format("Rebuilding post-streaming large counter row
%s (%d bytes) incrementally", ByteBufferUtil.bytesToHex(key.key), dataSize));
+                    row = new LazilyCompactedRow(controller, Collections.singletonList(iter));
+                }
+                else
+                {
+                    row = new PrecompactedRow(controller, Collections.singletonList(iter));
+                }
 
-                readRowPosition = dfile.getFilePointer();
+                rowSizes.add(dataSize);
+                columnCounts.add(row.columnCount());
 
                 // update index writer
-                key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, diskKey);
-                iwriter.afterAppend(key, writeRowPosition);
-
-                // write key
-                ByteBufferUtil.writeWithShortLength(diskKey, writerDfile);
-
-                // write data size; serialize CF w/ bloom filter, column index
-                long writeSizePosition = writerDfile.getFilePointer();
-                writerDfile.writeLong(-1L);
-                ColumnFamily.serializer().serializeWithIndexes(cf, writerDfile);
-                long writeEndPosition = writerDfile.getFilePointer();
-                writerDfile.seek(writeSizePosition);
-                writerDfile.writeLong(writeEndPosition - (writeSizePosition + 8L));
-
-                writeRowPosition = writeEndPosition;
-                writerDfile.seek(writeRowPosition);
+                iwriter.afterAppend(key, writerDfile.getFilePointer());
+                // write key and row
+                ByteBufferUtil.writeWithShortLength(key.key, writerDfile);
+                row.write(writerDfile);
 
                 rows++;
             }
             writeStatistics(desc, rowSizes, columnCounts);
 
-            if (writeRowPosition != readRowPosition)
+            if (writerDfile.getFilePointer() != dfile.getFilePointer())
             {
                 // truncate file to new, reduced length
-                writerDfile.setLength(writeRowPosition);
+                writerDfile.setLength(writerDfile.getFilePointer());
             }
             writerDfile.sync();
 
             return rows;
         }
 
-
         @Override
         void close() throws IOException
         {

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1089993&r1=1089992&r2=1089993&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Thu Apr
 7 20:31:51 2011
@@ -27,7 +27,9 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.HashSet;
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.CleanupHelper;
@@ -56,8 +58,9 @@ public class LazilyCompactedRowTest exte
     private void assertBytes(ColumnFamilyStore cfs, int gcBefore, boolean major) throws IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionIterator ci1 = new PreCompactingIterator(cfs, sstables, gcBefore, major);
-        CompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, gcBefore, major);
+        CompactionController controller = new CompactionController(cfs, sstables, major,
gcBefore, false);
+        CompactionIterator ci1 = new PreCompactingIterator(sstables, controller);
+        CompactionIterator ci2 = new LazyCompactionIterator(sstables, controller);
 
         while (true)
         {
@@ -131,8 +134,9 @@ public class LazilyCompactedRowTest exte
     private void assertDigest(ColumnFamilyStore cfs, int gcBefore, boolean major) throws
IOException, NoSuchAlgorithmException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionIterator ci1 = new PreCompactingIterator(cfs, sstables, gcBefore, major);
-        CompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, gcBefore, major);
+        CompactionController controller = new CompactionController(cfs, sstables, major,
gcBefore, false);
+        CompactionIterator ci1 = new PreCompactingIterator(sstables, controller);
+        CompactionIterator ci2 = new LazyCompactionIterator(sstables, controller);
 
         while (true)
         {
@@ -303,35 +307,29 @@ public class LazilyCompactedRowTest exte
 
     private static class LazyCompactionIterator extends CompactionIterator
     {
-        private final ColumnFamilyStore cfStore;
-
-        public LazyCompactionIterator(ColumnFamilyStore cfStore, Iterable<SSTableReader>
sstables, int gcBefore, boolean major) throws IOException
+        public LazyCompactionIterator(Iterable<SSTableReader> sstables, CompactionController
controller) throws IOException
         {
-            super(cfStore, sstables, gcBefore, major);
-            this.cfStore = cfStore;
+            super(sstables, controller);
         }
 
         @Override
         protected AbstractCompactedRow getCompactedRow()
         {
-            return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE, true);
+            return new LazilyCompactedRow(controller, rows);
         }
     }
 
     private static class PreCompactingIterator extends CompactionIterator
     {
-        private final ColumnFamilyStore cfStore;
-
-        public PreCompactingIterator(ColumnFamilyStore cfStore, Iterable<SSTableReader>
sstables, int gcBefore, boolean major) throws IOException
+        public PreCompactingIterator(Iterable<SSTableReader> sstables, CompactionController
controller) throws IOException
         {
-            super(cfStore, sstables, gcBefore, major);
-            this.cfStore = cfStore;
+            super(sstables, controller);
         }
 
         @Override
         protected AbstractCompactedRow getCompactedRow()
         {
-            return new PrecompactedRow(cfStore, rows, true, Integer.MAX_VALUE, true);
+            return new PrecompactedRow(controller, rows);
         }
     }
 }



Mime
View raw message