cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [32/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:56 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
new file mode 100644
index 0000000..0e18d4a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.columniterator;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.AbstractPartitionData;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ *  A Cell Iterator in reversed clustering order over SSTable
+ */
+public class SSTableReversedIterator extends AbstractSSTableIterator
+{
+    private static final Logger logger = LoggerFactory.getLogger(SSTableReversedIterator.class);
+
+    public SSTableReversedIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift)
+    {
+        this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift);
+    }
+
+    public SSTableReversedIterator(SSTableReader sstable,
+                                   FileDataInput file,
+                                   DecoratedKey key,
+                                   RowIndexEntry indexEntry,
+                                   ColumnFilter columns,
+                                   boolean isForThrift)
+    {
+        super(sstable, file, key, indexEntry, columns, isForThrift);
+    }
+
+    protected Reader createReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
+    {
+        return indexEntry.isIndexed()
+             ? new ReverseIndexedReader(indexEntry, file, isAtPartitionStart, shouldCloseFile)
+             : new ReverseReader(file, isAtPartitionStart, shouldCloseFile);
+    }
+
+    public boolean isReverseOrder()
+    {
+        return true;
+    }
+
+    private ReusablePartitionData createBuffer(int blocksCount)
+    {
+        int estimatedRowCount = 16;
+        int columnCount = metadata().partitionColumns().regulars.columnCount();
+        if (columnCount == 0 || metadata().clusteringColumns().size() == 0)
+        {
+            estimatedRowCount = 1;
+        }
+        else
+        {
+            try
+            {
+                // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that
+                // we use the stats on the number of rows per partition for that sstable.
+                // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows,
+                // we divide by the number of regular columns the table has. We should fix once we collect the
+                // stats on rows
+                int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount);
+                estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1);
+            }
+            catch (IllegalStateException e)
+            {
+                // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow
+                // shouldn't happen, it's not worth taking the risk of letting the exception bubble up.
+            }
+        }
+        return new ReusablePartitionData(metadata(), partitionKey(), DeletionTime.LIVE, columns(), estimatedRowCount);
+    }
+
+    private class ReverseReader extends Reader
+    {
+        private ReusablePartitionData partition;
+        private UnfilteredRowIterator iterator;
+
+        private ReverseReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
+        {
+            super(file, shouldCloseFile);
+            assert isAtPartitionStart;
+        }
+
+        public boolean hasNext() throws IOException
+        {
+            if (partition == null)
+            {
+                partition = createBuffer(1);
+                partition.populateFrom(this, null, null, new Tester()
+                {
+                    public boolean isDone()
+                    {
+                        return false;
+                    }
+                });
+                iterator = partition.unfilteredIterator(columns, Slices.ALL, true);
+            }
+            return iterator.hasNext();
+        }
+
+        public Unfiltered next() throws IOException
+        {
+            if (!hasNext())
+                throw new NoSuchElementException();
+            return iterator.next();
+        }
+
+        public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+        {
+            if (partition == null)
+            {
+                partition = createBuffer(1);
+                partition.populateFrom(this, slice.start(), slice.end(), new Tester()
+                {
+                    public boolean isDone()
+                    {
+                        return false;
+                    }
+                });
+            }
+
+            return partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
+        }
+    }
+
+    private class ReverseIndexedReader extends IndexedReader
+    {
+        private ReusablePartitionData partition;
+        private UnfilteredRowIterator iterator;
+
+        private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
+        {
+            super(file, shouldCloseFile, indexEntry, isAtPartitionStart);
+            this.currentIndexIdx = indexEntry.columnsIndex().size();
+        }
+
+        public boolean hasNext() throws IOException
+        {
+            // If it's called before we've created the file, create it. This then mean
+            // we're reading from the end of the partition.
+            if (!isInit)
+            {
+                seekToPosition(indexEntry.position);
+                ByteBufferUtil.skipShortLength(file); // partition key
+                DeletionTime.serializer.skip(file);   // partition deletion
+                if (sstable.header.hasStatic())
+                    UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper);
+                isInit = true;
+            }
+
+            if (partition == null)
+            {
+                partition = createBuffer(indexes.size());
+                partition.populateFrom(this, null, null, new Tester()
+                {
+                    public boolean isDone()
+                    {
+                        return false;
+                    }
+                });
+                iterator = partition.unfilteredIterator(columns, Slices.ALL, true);
+            }
+
+            return iterator.hasNext();
+        }
+
+        public Unfiltered next() throws IOException
+        {
+            if (!hasNext())
+                throw new NoSuchElementException();
+            return iterator.next();
+        }
+
+        private void prepareBlock(int blockIdx, Slice.Bound start, Slice.Bound end) throws IOException
+        {
+            updateBlock(blockIdx);
+
+            if (partition == null)
+                partition = createBuffer(indexes.size());
+            else
+                partition.clear();
+
+            final FileMark fileMark = mark;
+            final long width = currentIndex().width;
+
+            partition.populateFrom(this, start, end, new Tester()
+            {
+                public boolean isDone()
+                {
+                    return file.bytesPastMark(fileMark) >= width;
+                }
+            });
+        }
+
+        @Override
+        public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+        {
+            // if our previous slicing already got us the smallest row in the sstable, we're done
+            if (currentIndexIdx < 0)
+                return Collections.emptyIterator();
+
+            final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex();
+
+            // Find the first index block we'll need to read for the slice.
+            final int startIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, true, currentIndexIdx);
+            if (startIdx < 0)
+                return Collections.emptyIterator();
+
+            // Find the last index block we'll need to read for the slice.
+            int lastIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, true, startIdx);
+
+            // The index search is by firstname and so lastIdx is such that
+            //   indexes[lastIdx].firstName < slice.start <= indexes[lastIdx + 1].firstName
+            // However, if indexes[lastIdx].lastName < slice.start we can bump lastIdx.
+            if (lastIdx >= 0 && metadata().comparator.compare(indexes.get(lastIdx).lastName, slice.start()) < 0)
+                ++lastIdx;
+
+            final int endIdx = lastIdx;
+
+            // Because we're reversed, even if it is our current block, we should re-prepare the block since we would
+            // have skipped anything not in the previous slice.
+            prepareBlock(startIdx, slice.start(), slice.end());
+
+            return new AbstractIterator<Unfiltered>()
+            {
+                private Iterator<Unfiltered> currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
+
+                protected Unfiltered computeNext()
+                {
+                    try
+                    {
+                        if (currentBlockIterator.hasNext())
+                            return currentBlockIterator.next();
+
+                        --currentIndexIdx;
+                        if (currentIndexIdx < 0 || currentIndexIdx < endIdx)
+                            return endOfData();
+
+                        // Note that since we know we're read blocks backward, there is no point in checking the slice end, so we pass null
+                        prepareBlock(currentIndexIdx, slice.start(), null);
+                        currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
+                        return computeNext();
+                    }
+                    catch (IOException e)
+                    {
+                        try
+                        {
+                            close();
+                        }
+                        catch (IOException suppressed)
+                        {
+                            e.addSuppressed(suppressed);
+                        }
+                        sstable.markSuspect();
+                        throw new CorruptSSTableException(e, file.getPath());
+                    }
+                }
+            };
+        }
+    }
+
+    private abstract class Tester
+    {
+        public abstract boolean isDone();
+    }
+
+    private class ReusablePartitionData extends AbstractPartitionData
+    {
+        private final Writer rowWriter;
+        private final RangeTombstoneCollector markerWriter;
+
+        private ReusablePartitionData(CFMetaData metadata,
+                                      DecoratedKey partitionKey,
+                                      DeletionTime deletionTime,
+                                      PartitionColumns columns,
+                                      int initialRowCapacity)
+        {
+            super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, false);
+
+            this.rowWriter = new Writer(true);
+            // Note that even though the iterator handles the reverse case, this object holds the data for a single index bock, and we read index blocks in
+            // forward clustering order.
+            this.markerWriter = new RangeTombstoneCollector(false);
+        }
+
+        // Note that this method is here rather than in the readers because we want to use it for both readers and they
+        // don't extend one another
+        private void populateFrom(Reader reader, Slice.Bound start, Slice.Bound end, Tester tester) throws IOException
+        {
+            // If we have a start bound, skip everything that comes before it.
+            while (reader.deserializer.hasNext() && start != null && reader.deserializer.compareNextTo(start) <= 0 && !tester.isDone())
+            {
+                if (reader.deserializer.nextIsRow())
+                    reader.deserializer.skipNext();
+                else
+                    reader.updateOpenMarker((RangeTombstoneMarker)reader.deserializer.readNext());
+            }
+
+            // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block.
+            if (reader.openMarker != null)
+            {
+                // If we have no start but still an openMarker, this means we're indexed and it's coming from the previous block
+                Slice.Bound markerStart = start;
+                if (start == null)
+                {
+                    ClusteringPrefix c = ((IndexedReader)reader).previousIndex().lastName;
+                    markerStart = Slice.Bound.exclusiveStartOf(c);
+                }
+                writeMarker(markerStart, reader.openMarker);
+            }
+
+            // Now deserialize everything until we reach our requested end (if we have one)
+            while (reader.deserializer.hasNext()
+                   && (end == null || reader.deserializer.compareNextTo(end) <= 0)
+                   && !tester.isDone())
+            {
+                Unfiltered unfiltered = reader.deserializer.readNext();
+                if (unfiltered.kind() == Unfiltered.Kind.ROW)
+                {
+                    ((Row) unfiltered).copyTo(rowWriter);
+                }
+                else
+                {
+                    RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
+                    reader.updateOpenMarker(marker);
+                    marker.copyTo(markerWriter);
+                }
+            }
+
+            // If we have an open marker, we should close it before finishing
+            if (reader.openMarker != null)
+            {
+                // If we no end and still an openMarker, this means we're indexed and the marker can be close using the blocks end
+                Slice.Bound markerEnd = end;
+                if (end == null)
+                {
+                    ClusteringPrefix c = ((IndexedReader)reader).currentIndex().lastName;
+                    markerEnd = Slice.Bound.inclusiveEndOf(c);
+                }
+                writeMarker(markerEnd, reader.getAndClearOpenMarker());
+            }
+        }
+
+        private void writeMarker(Slice.Bound bound, DeletionTime dt)
+        {
+            bound.writeTo(markerWriter);
+            markerWriter.writeBoundDeletion(dt);
+            markerWriter.endOfMarker();
+        }
+
+        @Override
+        public void clear()
+        {
+            super.clear();
+            rowWriter.reset();
+            markerWriter.reset();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 02072de..ec270dd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -206,7 +206,7 @@ public class CommitLogArchiver
                     descriptor = fromHeader;
                 else descriptor = fromName;
 
-                if (descriptor.version > CommitLogDescriptor.VERSION_22)
+                if (descriptor.version > CommitLogDescriptor.current_version)
                     throw new IllegalStateException("Unsupported commit log version: " + descriptor.version);
 
                 if (descriptor.compression != null) {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index c4728fd..1eb640e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -57,12 +57,13 @@ public class CommitLogDescriptor
     public static final int VERSION_20 = 3;
     public static final int VERSION_21 = 4;
     public static final int VERSION_22 = 5;
+    public static final int VERSION_30 = 6;
     /**
      * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
      */
     @VisibleForTesting
-    public static final int current_version = VERSION_22;
+    public static final int current_version = VERSION_30;
 
     final int version;
     public final long id;
@@ -195,6 +196,8 @@ public class CommitLogDescriptor
                 return MessagingService.VERSION_21;
             case VERSION_22:
                 return MessagingService.VERSION_22;
+            case VERSION_30:
+                return MessagingService.VERSION_30;
             default:
                 throw new IllegalStateException("Unknown commitlog version " + version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 176f64b..902f1c4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -47,6 +47,8 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.ICompressor;
@@ -55,7 +57,6 @@ import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CRC32Factory;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -195,7 +196,7 @@ public class CommitLogReplayer
     
     abstract static class ReplayFilter
     {
-        public abstract Iterable<ColumnFamily> filter(Mutation mutation);
+        public abstract Iterable<PartitionUpdate> filter(Mutation mutation);
 
         public abstract boolean includes(CFMetaData metadata);
 
@@ -227,9 +228,9 @@ public class CommitLogReplayer
 
     private static class AlwaysReplayFilter extends ReplayFilter
     {
-        public Iterable<ColumnFamily> filter(Mutation mutation)
+        public Iterable<PartitionUpdate> filter(Mutation mutation)
         {
-            return mutation.getColumnFamilies();
+            return mutation.getPartitionUpdates();
         }
 
         public boolean includes(CFMetaData metadata)
@@ -247,17 +248,17 @@ public class CommitLogReplayer
             this.toReplay = toReplay;
         }
 
-        public Iterable<ColumnFamily> filter(Mutation mutation)
+        public Iterable<PartitionUpdate> filter(Mutation mutation)
         {
             final Collection<String> cfNames = toReplay.get(mutation.getKeyspaceName());
             if (cfNames == null)
                 return Collections.emptySet();
 
-            return Iterables.filter(mutation.getColumnFamilies(), new Predicate<ColumnFamily>()
+            return Iterables.filter(mutation.getPartitionUpdates(), new Predicate<PartitionUpdate>()
             {
-                public boolean apply(ColumnFamily cf)
+                public boolean apply(PartitionUpdate upd)
                 {
-                    return cfNames.contains(cf.metadata().cfName);
+                    return cfNames.contains(upd.metadata().cfName);
                 }
             });
         }
@@ -330,7 +331,8 @@ public class CommitLogReplayer
                 {
                     int uncompressedLength = reader.readInt();
                     replayEnd = replayPos + uncompressedLength;
-                } else
+                }
+                else
                 {
                     replayEnd = end;
                 }
@@ -478,11 +480,10 @@ public class CommitLogReplayer
         {
             mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
                                                        desc.getMessagingVersion(),
-                                                       ColumnSerializer.Flag.LOCAL);
+                                                       SerializationHelper.Flag.LOCAL);
             // doublecheck that what we read is [still] valid for the current schema
-            for (ColumnFamily cf : mutation.getColumnFamilies())
-                for (Cell cell : cf)
-                    cf.getComparator().validate(cell.name());
+            for (PartitionUpdate upd : mutation.getPartitionUpdates())
+                upd.validate();
         }
         catch (UnknownColumnFamilyException ex)
         {
@@ -515,7 +516,7 @@ public class CommitLogReplayer
         }
 
         if (logger.isDebugEnabled())
-            logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
+            logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
 
         Runnable runnable = new WrappedRunnable()
         {
@@ -534,12 +535,12 @@ public class CommitLogReplayer
                 // or c) are part of a cf that was dropped.
                 // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
                 Mutation newMutation = null;
-                for (ColumnFamily columnFamily : replayFilter.filter(mutation))
+                for (PartitionUpdate update : replayFilter.filter(mutation))
                 {
-                    if (Schema.instance.getCF(columnFamily.id()) == null)
+                    if (Schema.instance.getCF(update.metadata().cfId) == null)
                         continue; // dropped
 
-                    ReplayPosition rp = cfPositions.get(columnFamily.id());
+                    ReplayPosition rp = cfPositions.get(update.metadata().cfId);
 
                     // replay if current segment is newer than last flushed one or,
                     // if it is the last known segment, if we are after the replay position
@@ -547,7 +548,7 @@ public class CommitLogReplayer
                     {
                         if (newMutation == null)
                             newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
-                        newMutation.add(columnFamily);
+                        newMutation.add(update);
                         replayedCount.incrementAndGet();
                     }
                 }
@@ -571,9 +572,9 @@ public class CommitLogReplayer
     {
         long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
 
-        for (ColumnFamily families : fm.getColumnFamilies())
+        for (PartitionUpdate upd : fm.getPartitionUpdates())
         {
-            if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget)
+            if (CommitLog.instance.archiver.precision.toMillis(upd.maxTimestamp()) > restoreTarget)
                 return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index d748006..7473845 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -45,8 +45,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.CLibrary;
@@ -398,12 +398,12 @@ public abstract class CommitLogSegment
 
     void markDirty(Mutation mutation, int allocatedPosition)
     {
-        for (ColumnFamily columnFamily : mutation.getColumnFamilies())
+        for (PartitionUpdate update : mutation.getPartitionUpdates())
         {
             // check for deleted CFS
-            CFMetaData cfm = columnFamily.metadata();
+            CFMetaData cfm = update.metadata();
             if (cfm.isPurged())
-                logger.error("Attempted to write commit log entry for unrecognized table: {}", columnFamily.id());
+                logger.error("Attempted to write commit log entry for unrecognized table: {}", cfm.cfId);
             else
                 ensureAtleast(cfDirty, cfm.cfId, allocatedPosition);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
deleted file mode 100644
index 16b5fac..0000000
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.util.SequentialWriter;
-
-/**
- * a CompactedRow is an object that takes a bunch of rows (keys + columnfamilies)
- * and can write a compacted version of those rows to an output stream.  It does
- * NOT necessarily require creating a merged CF object in memory.
- */
-public abstract class AbstractCompactedRow implements Closeable
-{
-    public final DecoratedKey key;
-
-    public AbstractCompactedRow(DecoratedKey key)
-    {
-        this.key = key;
-    }
-
-    /**
-     * write the row (size + column index + filter + column data, but NOT row key) to @param out.
-     *
-     * write() may change internal state; it is NOT valid to call write() or update() a second time.
-     *
-     * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
-     */
-    public abstract RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException;
-
-    /**
-     * update @param digest with the data bytes of the row (not including row key or row size).
-     * May be called even if empty.
-     *
-     * update() may change internal state; it is NOT valid to call write() or update() a second time.
-     */
-    public abstract void update(MessageDigest digest);
-
-    /**
-     * @return aggregate information about the columns in this row.  Some fields may
-     * contain default values if computing them value would require extra effort we're not willing to make.
-     */
-    public abstract ColumnStats columnStats();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
deleted file mode 100644
index 9fe8fd9..0000000
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.utils.CloseableIterator;
-
-public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
-{
-    protected final OperationType type;
-    protected final CompactionController controller;
-    protected final long totalBytes;
-    protected volatile long bytesRead;
-    protected final List<ISSTableScanner> scanners;
-    protected final UUID compactionId;
-    /*
-     * counters for merged rows.
-     * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row),
-     * index 1 is counter for 2 rows merged, and so on.
-     */
-    protected final AtomicLong[] mergeCounters;
-
-    public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ISSTableScanner> scanners, UUID compactionId)
-    {
-        this.controller = controller;
-        this.type = type;
-        this.scanners = scanners;
-        this.bytesRead = 0;
-        this.compactionId = compactionId;
-
-        long bytes = 0;
-        for (ISSTableScanner scanner : scanners)
-            bytes += scanner.getLengthInBytes();
-        this.totalBytes = bytes;
-        mergeCounters = new AtomicLong[scanners.size()];
-        for (int i = 0; i < mergeCounters.length; i++)
-            mergeCounters[i] = new AtomicLong();
-    }
-
-    public CompactionInfo getCompactionInfo()
-    {
-        return new CompactionInfo(controller.cfs.metadata,
-                                  type,
-                                  bytesRead,
-                                  totalBytes,
-                                  compactionId);
-    }
-
-    protected void updateCounterFor(int rows)
-    {
-        assert rows > 0 && rows - 1 < mergeCounters.length;
-        mergeCounters[rows - 1].incrementAndGet();
-    }
-
-    public long[] getMergedRowCounts()
-    {
-        long[] counters = new long[mergeCounters.length];
-        for (int i = 0; i < counters.length; i++)
-            counters[i] = mergeCounters[i].get();
-        return counters;
-    }
-
-    public abstract CloseableIterator<AbstractCompactedRow> iterator();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 28144ca..d8499ea 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 81d8b7c..303de15 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
 import org.apache.cassandra.utils.OverlapIterator;
@@ -44,7 +44,7 @@ public class CompactionController implements AutoCloseable
 
     public final ColumnFamilyStore cfs;
     private Refs<SSTableReader> overlappingSSTables;
-    private OverlapIterator<RowPosition, SSTableReader> overlapIterator;
+    private OverlapIterator<PartitionPosition, SSTableReader> overlapIterator;
     private final Iterable<SSTableReader> compacting;
 
     public final int gcBefore;
@@ -189,9 +189,9 @@ public class CompactionController implements AutoCloseable
         return min;
     }
 
-    public void invalidateCachedRow(DecoratedKey key)
+    public void invalidateCachedPartition(DecoratedKey key)
     {
-        cfs.invalidateCachedRow(key);
+        cfs.invalidateCachedPartition(key);
     }
 
     public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
deleted file mode 100644
index 23d8a4a..0000000
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.UUID;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.MergeIterator;
-
-public class CompactionIterable extends AbstractCompactionIterable
-{
-    final SSTableFormat format;
-
-    private static final Comparator<OnDiskAtomIterator> comparator = new Comparator<OnDiskAtomIterator>()
-    {
-        public int compare(OnDiskAtomIterator i1, OnDiskAtomIterator i2)
-        {
-            return i1.getKey().compareTo(i2.getKey());
-        }
-    };
-
-    public CompactionIterable(OperationType type,
-                              List<ISSTableScanner> scanners,
-                              CompactionController controller,
-                              SSTableFormat.Type formatType,
-                              UUID compactionId)
-    {
-        super(controller, type, scanners, compactionId);
-        this.format = formatType.info;
-    }
-
-    public CloseableIterator<AbstractCompactedRow> iterator()
-    {
-        return MergeIterator.get(scanners, comparator, new Reducer());
-    }
-
-    public String toString()
-    {
-        return this.getCompactionInfo().toString();
-    }
-
-    protected class Reducer extends MergeIterator.Reducer<OnDiskAtomIterator, AbstractCompactedRow>
-    {
-        protected final List<OnDiskAtomIterator> rows = new ArrayList<>();
-
-        public void reduce(OnDiskAtomIterator current)
-        {
-            rows.add(current);
-        }
-
-        protected AbstractCompactedRow getReduced()
-        {
-            assert !rows.isEmpty();
-
-            CompactionIterable.this.updateCounterFor(rows.size());
-            try
-            {
-                // create a new container for rows, since we're going to clear ours for the next one,
-                // and the AbstractCompactionRow code should be able to assume that the collection it receives
-                // won't be pulled out from under it.
-                return format.getCompactedRowWriter(controller, ImmutableList.copyOf(rows));
-            }
-            finally
-            {
-                rows.clear();
-                long n = 0;
-                for (ISSTableScanner scanner : scanners)
-                    n += scanner.getCurrentPosition();
-                bytesRead = n;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
new file mode 100644
index 0000000..b3cb370
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.UUID;
+import java.util.List;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.metrics.CompactionMetrics;
+
+/**
+ * Merge multiple iterators over the content of sstable into a "compacted" iterator.
+ * <p>
+ * On top of the actual merging the source iterators, this class:
+ * <ul>
+ *   <li>purge gc-able tombstones if possible (see PurgingPartitionIterator below).</li>
+ *   <li>update 2ndary indexes if necessary (as we don't read-before-write on index updates, index entries are
+ *       not deleted on deletion of the base table data, which is ok because we'll fix index inconsistency
+ *       on reads. This however mean that potentially obsolete index entries could be kept a long time for
+ *       data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly
+ *       an optimization).</li>
+ *   <li>invalidate cached partitions that are empty post-compaction. This avoids keeping partitions with
+ *       only purgable tombstones in the row cache.</li>
+ *   <li>keep tracks of the compaction progress.</li>
+ * </ul>
+ */
+public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator
+{
+    private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
+
+    private final OperationType type;
+    private final CompactionController controller;
+    private final List<ISSTableScanner> scanners;
+    private final int nowInSec;
+    private final UUID compactionId;
+
+    private final long totalBytes;
+    private long bytesRead;
+
+    /*
+     * counters for merged rows.
+     * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row),
+     * index 1 is counter for 2 rows merged, and so on.
+     */
+    private final long[] mergeCounters;
+
+    private final UnfilteredPartitionIterator mergedIterator;
+    private final CompactionMetrics metrics;
+
+    // The number of row/RT merged by the iterator
+    private int merged;
+
+    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
+    {
+        this(type, scanners, controller, nowInSec, compactionId, null);
+    }
+
+    @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
+    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics)
+    {
+        this.controller = controller;
+        this.type = type;
+        this.scanners = scanners;
+        this.nowInSec = nowInSec;
+        this.compactionId = compactionId;
+        this.bytesRead = 0;
+
+        long bytes = 0;
+        for (ISSTableScanner scanner : scanners)
+            bytes += scanner.getLengthInBytes();
+        this.totalBytes = bytes;
+        this.mergeCounters = new long[scanners.size()];
+        this.metrics = metrics;
+
+        if (metrics != null)
+            metrics.beginCompaction(this);
+
+        this.mergedIterator = scanners.isEmpty()
+                            ? UnfilteredPartitionIterators.EMPTY
+                            : UnfilteredPartitionIterators.convertExpiredCellsToTombstones(new PurgingPartitionIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller), nowInSec);
+    }
+
+    public boolean isForThrift()
+    {
+        return false;
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        return new CompactionInfo(controller.cfs.metadata,
+                                  type,
+                                  bytesRead,
+                                  totalBytes,
+                                  compactionId);
+    }
+
+    private void updateCounterFor(int rows)
+    {
+        assert rows > 0 && rows - 1 < mergeCounters.length;
+        mergeCounters[rows - 1] += 1;
+    }
+
+    public long[] getMergedRowCounts()
+    {
+        return mergeCounters;
+    }
+
+    private UnfilteredPartitionIterators.MergeListener listener()
+    {
+        return new UnfilteredPartitionIterators.MergeListener()
+        {
+            public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+            {
+                int merged = 0;
+                for (UnfilteredRowIterator iter : versions)
+                {
+                    if (iter != null)
+                        merged++;
+                }
+
+                assert merged > 0;
+
+                CompactionIterator.this.updateCounterFor(merged);
+
+                /*
+                 * The row level listener does 2 things:
+                 *  - It updates 2ndary indexes for deleted/shadowed cells
+                 *  - It updates progress regularly (every UNFILTERED_TO_UPDATE_PROGRESS)
+                 */
+                final SecondaryIndexManager.Updater indexer = type == OperationType.COMPACTION
+                                                            ? controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec)
+                                                            : SecondaryIndexManager.nullUpdater;
+
+                return new UnfilteredRowIterators.MergeListener()
+                {
+                    private Clustering clustering;
+
+                    public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+                    {
+                    }
+
+                    public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions)
+                    {
+                        this.clustering = clustering;
+                    }
+
+                    public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedCompositeDeletion, DeletionTime[] versions)
+                    {
+                    }
+
+                    public void onMergedCells(Cell mergedCell, Cell[] versions)
+                    {
+                        if (indexer == SecondaryIndexManager.nullUpdater)
+                            return;
+
+                        for (int i = 0; i < versions.length; i++)
+                        {
+                            Cell version = versions[i];
+                            if (version != null && (mergedCell == null || !mergedCell.equals(version)))
+                                indexer.remove(clustering, version);
+                        }
+                    }
+
+                    public void onRowDone()
+                    {
+                        int merged = ++CompactionIterator.this.merged;
+                        if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0)
+                            updateBytesRead();
+                    }
+
+                    public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
+                    {
+                        int merged = ++CompactionIterator.this.merged;
+                        if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0)
+                            updateBytesRead();
+                    }
+
+                    public void close()
+                    {
+                    }
+                };
+            }
+
+            public void close()
+            {
+            }
+        };
+    }
+
+    private void updateBytesRead()
+    {
+        long n = 0;
+        for (ISSTableScanner scanner : scanners)
+            n += scanner.getCurrentPosition();
+        bytesRead = n;
+    }
+
+    public boolean hasNext()
+    {
+        return mergedIterator.hasNext();
+    }
+
+    public UnfilteredRowIterator next()
+    {
+        return mergedIterator.next();
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void close()
+    {
+        try
+        {
+            mergedIterator.close();
+        }
+        finally
+        {
+            if (metrics != null)
+                metrics.finishCompaction(this);
+        }
+    }
+
+    public String toString()
+    {
+        return this.getCompactionInfo().toString();
+    }
+
+    private class PurgingPartitionIterator extends TombstonePurgingPartitionIterator
+    {
+        private final CompactionController controller;
+
+        private DecoratedKey currentKey;
+        private long maxPurgeableTimestamp;
+        private boolean hasCalculatedMaxPurgeableTimestamp;
+
+        private PurgingPartitionIterator(UnfilteredPartitionIterator toPurge, CompactionController controller)
+        {
+            super(toPurge, controller.gcBefore);
+            this.controller = controller;
+        }
+
+        @Override
+        protected void onEmpty(DecoratedKey key)
+        {
+            if (type == OperationType.COMPACTION)
+                controller.cfs.invalidateCachedPartition(key);
+        }
+
+        @Override
+        protected boolean shouldFilter(UnfilteredRowIterator iterator)
+        {
+            currentKey = iterator.partitionKey();
+            hasCalculatedMaxPurgeableTimestamp = false;
+
+            // TODO: we could be able to skip filtering if UnfilteredRowIterator was giving us some stats
+            // (like the smallest local deletion time).
+            return true;
+        }
+
+        /*
+         * Tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable
+         * containing `currentKey` outside of the set of sstables involved in this compaction. This is computed lazily
+         * on demand as we only need this if there is tombstones and this a bit expensive (see #8914).
+         */
+        protected long getMaxPurgeableTimestamp()
+        {
+            if (!hasCalculatedMaxPurgeableTimestamp)
+            {
+                hasCalculatedMaxPurgeableTimestamp = true;
+                maxPurgeableTimestamp = controller.maxPurgeableTimestamp(currentKey);
+            }
+            return maxPurgeableTimestamp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 763871e..a6c3d8c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -41,7 +41,6 @@ import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
-import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
@@ -57,6 +56,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Bounds;
@@ -71,7 +71,6 @@ import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MerkleTree;
@@ -235,7 +234,7 @@ public class CompactionManager implements CompactionManagerMBean
                 }
 
                 CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
-                AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs));
+                AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds()));
                 if (task == null)
                 {
                     logger.debug("No tasks available");
@@ -426,7 +425,7 @@ public class CompactionManager implements CompactionManagerMBean
             @Override
             public void execute(LifecycleTransaction txn) throws IOException
             {
-                CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges);
+                CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges, FBUtilities.nowInSeconds());
                 doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes);
             }
         }, OperationType.CLEANUP);
@@ -541,7 +540,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput)
     {
-        FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore), splitOutput));
+        FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()), splitOutput));
     }
 
     public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput)
@@ -595,8 +594,9 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         List<Future<?>> futures = new ArrayList<>();
+        int nowInSec = FBUtilities.nowInSeconds();
         for (ColumnFamilyStore cfs : descriptors.keySet())
-            futures.add(submitUserDefined(cfs, descriptors.get(cfs), getDefaultGcBefore(cfs)));
+            futures.add(submitUserDefined(cfs, descriptors.get(cfs), getDefaultGcBefore(cfs, nowInSec)));
         FBUtilities.waitOnFutures(futures);
     }
 
@@ -817,29 +817,29 @@ public class CompactionManager implements CompactionManagerMBean
         if (compactionFileLocation == null)
             throw new IOException("disk full");
 
-        ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
-        CleanupInfo ci = new CleanupInfo(sstable, scanner);
-
-        metrics.beginCompaction(ci);
         List<SSTableReader> finished;
+        int nowInSec = FBUtilities.nowInSeconds();
         try (SSTableRewriter writer = new SSTableRewriter(cfs, txn, sstable.maxDataAge, false);
-             CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs)))
+             ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
+             CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
+             CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
             writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
 
-            while (scanner.hasNext())
+            while (ci.hasNext())
             {
                 if (ci.isStopRequested())
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                @SuppressWarnings("resource")
-                SSTableIdentityIterator row = cleanupStrategy.cleanup((SSTableIdentityIterator) scanner.next());
-                if (row == null)
-                    continue;
-                @SuppressWarnings("resource")
-                AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
-                if (writer.append(compactedRow) != null)
-                    totalkeysWritten++;
+                try (UnfilteredRowIterator partition = ci.next();
+                     UnfilteredRowIterator notCleaned = cleanupStrategy.cleanup(partition))
+                {
+                    if (notCleaned == null)
+                        continue;
+
+                    if (writer.append(notCleaned) != null)
+                        totalkeysWritten++;
+                }
             }
 
             // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
@@ -847,11 +847,6 @@ public class CompactionManager implements CompactionManagerMBean
 
             finished = writer.finish();
         }
-        finally
-        {
-            scanner.close();
-            metrics.finishCompaction(ci);
-        }
 
         if (!finished.isEmpty())
         {
@@ -869,23 +864,30 @@ public class CompactionManager implements CompactionManagerMBean
 
     private static abstract class CleanupStrategy
     {
-        public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
+        protected final Collection<Range<Token>> ranges;
+        protected final int nowInSec;
+
+        protected CleanupStrategy(Collection<Range<Token>> ranges, int nowInSec)
+        {
+            this.ranges = ranges;
+            this.nowInSec = nowInSec;
+        }
+
+        public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec)
         {
             return cfs.indexManager.hasIndexes()
-                 ? new Full(cfs, ranges)
-                 : new Bounded(cfs, ranges);
+                 ? new Full(cfs, ranges, nowInSec)
+                 : new Bounded(cfs, ranges, nowInSec);
         }
 
         public abstract ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter);
-        public abstract SSTableIdentityIterator cleanup(SSTableIdentityIterator row);
+        public abstract UnfilteredRowIterator cleanup(UnfilteredRowIterator partition);
 
         private static final class Bounded extends CleanupStrategy
         {
-            private final Collection<Range<Token>> ranges;
-
-            public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
+            public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec)
             {
-                this.ranges = ranges;
+                super(ranges, nowInSec);
                 cacheCleanupExecutor.submit(new Runnable()
                 {
                     @Override
@@ -894,8 +896,8 @@ public class CompactionManager implements CompactionManagerMBean
                         cfs.cleanupCache();
                     }
                 });
-
             }
+
             @Override
             public ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
             {
@@ -903,23 +905,20 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             @Override
-            public SSTableIdentityIterator cleanup(SSTableIdentityIterator row)
+            public UnfilteredRowIterator cleanup(UnfilteredRowIterator partition)
             {
-                return row;
+                return partition;
             }
         }
 
         private static final class Full extends CleanupStrategy
         {
-            private final Collection<Range<Token>> ranges;
             private final ColumnFamilyStore cfs;
-            private List<Cell> indexedColumnsInRow;
 
-            public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
+            public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec)
             {
+                super(ranges, nowInSec);
                 this.cfs = cfs;
-                this.ranges = ranges;
-                this.indexedColumnsInRow = null;
             }
 
             @Override
@@ -929,36 +928,17 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             @Override
-            public SSTableIdentityIterator cleanup(SSTableIdentityIterator row)
+            public UnfilteredRowIterator cleanup(UnfilteredRowIterator partition)
             {
-                if (Range.isInRanges(row.getKey().getToken(), ranges))
-                    return row;
+                if (Range.isInRanges(partition.partitionKey().getToken(), ranges))
+                    return partition;
 
-                cfs.invalidateCachedRow(row.getKey());
+                cfs.invalidateCachedPartition(partition.partitionKey());
 
-                if (indexedColumnsInRow != null)
-                    indexedColumnsInRow.clear();
-
-                while (row.hasNext())
-                {
-                    OnDiskAtom column = row.next();
-
-                    if (column instanceof Cell && cfs.indexManager.indexes((Cell) column))
-                    {
-                        if (indexedColumnsInRow == null)
-                            indexedColumnsInRow = new ArrayList<>();
-
-                        indexedColumnsInRow.add((Cell) column);
-                    }
-                }
-
-                if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
+                // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
+                try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
                 {
-                    // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
-                    try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
-                    {
-                        cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow, opGroup);
-                    }
+                    cfs.indexManager.deleteFromIndexes(partition, opGroup, nowInSec);
                 }
                 return null;
             }
@@ -978,14 +958,15 @@ public class CompactionManager implements CompactionManagerMBean
                                     expectedBloomFilterSize,
                                     repairedAt,
                                     sstable.getSSTableLevel(),
-                                    cfs.partitioner);
+                                    cfs.partitioner,
+                                    sstable.header);
     }
 
     public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs,
-                                             File compactionFileLocation,
-                                             int expectedBloomFilterSize,
-                                             long repairedAt,
-                                             Collection<SSTableReader> sstables)
+                                                              File compactionFileLocation,
+                                                              int expectedBloomFilterSize,
+                                                              long repairedAt,
+                                                              Collection<SSTableReader> sstables)
     {
         FileUtils.createDirectory(compactionFileLocation);
         int minLevel = Integer.MAX_VALUE;
@@ -1008,7 +989,8 @@ public class CompactionManager implements CompactionManagerMBean
                                     repairedAt,
                                     cfs.metadata,
                                     cfs.partitioner,
-                                    new MetadataCollector(sstables, cfs.metadata.comparator, minLevel));
+                                    new MetadataCollector(sstables, cfs.metadata.comparator, minLevel),
+                                    SerializationHeader.make(cfs.metadata, sstables));
     }
 
 
@@ -1033,6 +1015,7 @@ public class CompactionManager implements CompactionManagerMBean
 
             String snapshotName = validator.desc.sessionId.toString();
             int gcBefore;
+            int nowInSec = FBUtilities.nowInSeconds();
             boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
             if (isSnapshotValidation)
             {
@@ -1046,7 +1029,7 @@ public class CompactionManager implements CompactionManagerMBean
                 // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
                 // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
                 // 'as good as in the non-snapshot' case)
-                gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+                gcBefore = cfs.gcBefore((int)(cfs.getSnapshotCreationTime(snapshotName) / 1000));
             }
             else
             {
@@ -1084,7 +1067,7 @@ public class CompactionManager implements CompactionManagerMBean
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;
                 else
-                    gcBefore = getDefaultGcBefore(cfs);
+                    gcBefore = getDefaultGcBefore(cfs, nowInSec);
             }
 
             // Create Merkle tree suitable to hold estimated partitions for given range.
@@ -1099,32 +1082,28 @@ public class CompactionManager implements CompactionManagerMBean
             MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 
             long start = System.nanoTime();
-            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range))
+            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range);
+                 ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
+                 CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
             {
-                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
-                Iterator<AbstractCompactedRow> iter = ci.iterator();
-                metrics.beginCompaction(ci);
-                try
+                // validate the CF as we iterate over it
+                validator.prepare(cfs, tree);
+                while (ci.hasNext())
                 {
-                    // validate the CF as we iterate over it
-                    validator.prepare(cfs, tree);
-                    while (iter.hasNext())
+                    if (ci.isStopRequested())
+                        throw new CompactionInterruptedException(ci.getCompactionInfo());
+                    try (UnfilteredRowIterator partition = ci.next())
                     {
-                        if (ci.isStopRequested())
-                            throw new CompactionInterruptedException(ci.getCompactionInfo());
-                        AbstractCompactedRow row = iter.next();
-                        validator.add(row);
+                        validator.add(partition);
                     }
-                    validator.complete();
                 }
-                finally
+                validator.complete();
+            }
+            finally
+            {
+                if (isSnapshotValidation)
                 {
-                    if (isSnapshotValidation)
-                    {
-                        cfs.clearSnapshot(snapshotName);
-                    }
-
-                    metrics.finishCompaction(ci);
+                    cfs.clearSnapshot(snapshotName);
                 }
             }
 
@@ -1209,45 +1188,38 @@ public class CompactionManager implements CompactionManagerMBean
         File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
         long repairedKeyCount = 0;
         long unrepairedKeyCount = 0;
+        int nowInSec = FBUtilities.nowInSeconds();
+
         CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
         try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
              SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
              AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
-             CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
+             CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
+             CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
             int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
             repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
             unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
 
-            CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
-            metrics.beginCompaction(ci);
-            try
+            while (ci.hasNext())
             {
-                @SuppressWarnings("resource")
-                CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-                while (iter.hasNext())
+                try (UnfilteredRowIterator partition = ci.next())
                 {
-                    @SuppressWarnings("resource")
-                    AbstractCompactedRow row = iter.next();
                     // if current range from sstable is repaired, save it into the new repaired sstable
-                    if (Range.isInRanges(row.key.getToken(), ranges))
+                    if (Range.isInRanges(partition.partitionKey().getToken(), ranges))
                     {
-                        repairedSSTableWriter.append(row);
+                        repairedSSTableWriter.append(partition);
                         repairedKeyCount++;
                     }
                     // otherwise save into the new 'non-repaired' table
                     else
                     {
-                        unRepairedSSTableWriter.append(row);
+                        unRepairedSSTableWriter.append(partition);
                         unrepairedKeyCount++;
                     }
                 }
             }
-            finally
-            {
-                metrics.finishCompaction(ci);
-            }
 
             List<SSTableReader> anticompactedSSTables = new ArrayList<>();
             // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method,
@@ -1342,19 +1314,18 @@ public class CompactionManager implements CompactionManagerMBean
         return executor.submit(runnable);
     }
 
-    public static int getDefaultGcBefore(ColumnFamilyStore cfs)
+    public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec)
     {
         // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
         // add any GcGrace however since 2ndary indexes are local to a node.
-        return cfs.isIndex() ? (int) (System.currentTimeMillis() / 1000) : cfs.gcBefore(System.currentTimeMillis());
+        return cfs.isIndex() ? nowInSec : cfs.gcBefore(nowInSec);
     }
 
-    private static class ValidationCompactionIterable extends CompactionIterable
+    private static class ValidationCompactionIterator extends CompactionIterator
     {
-        @SuppressWarnings("resource")
-        public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore)
+        public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, CompactionMetrics metrics)
         {
-            super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
+            super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics);
         }
     }
 
@@ -1518,36 +1489,6 @@ public class CompactionManager implements CompactionManagerMBean
         return metrics.completedTasks.getValue();
     }
 
-    private static class CleanupInfo extends CompactionInfo.Holder
-    {
-        private final SSTableReader sstable;
-        private final ISSTableScanner scanner;
-        private final UUID cleanupCompactionId;
-
-        public CleanupInfo(SSTableReader sstable, ISSTableScanner scanner)
-        {
-            this.sstable = sstable;
-            this.scanner = scanner;
-            cleanupCompactionId = UUIDGen.getTimeUUID();
-        }
-
-        public CompactionInfo getCompactionInfo()
-        {
-            try
-            {
-                return new CompactionInfo(sstable.metadata,
-                                          OperationType.CLEANUP,
-                                          scanner.getCurrentPosition(),
-                                          scanner.getLengthInBytes(),
-                                          cleanupCompactionId);
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException();
-            }
-        }
-    }
-
     public void stopCompaction(String type)
     {
         OperationType operation = OperationType.valueOf(type);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index dd6261c..cfe28e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -339,11 +339,11 @@ public class CompactionStrategyManager implements INotificationConsumer
      * @param range
      * @return
      */
+    @SuppressWarnings("resource")
     public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         List<SSTableReader> repairedSSTables = new ArrayList<>();
         List<SSTableReader> unrepairedSSTables = new ArrayList<>();
-
         for (SSTableReader sstable : sstables)
         {
             if (sstable.isRepaired())
@@ -352,24 +352,24 @@ public class CompactionStrategyManager implements INotificationConsumer
                 unrepairedSSTables.add(sstable);
         }
 
-        List<ISSTableScanner> scanners = new ArrayList<>();
 
-        if (!repairedSSTables.isEmpty())
-            scanners.addAll(repaired.getScanners(repairedSSTables, range).scanners);
-        if (!unrepairedSSTables.isEmpty())
-            scanners.addAll(unrepaired.getScanners(unrepairedSSTables, range).scanners);
+        AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
+        AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
 
+        List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
+        scanners.addAll(repairedScanners.scanners);
+        scanners.addAll(unrepairedScanners.scanners);
         return new AbstractCompactionStrategy.ScannerList(scanners);
     }
 
-    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
+    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
     {
-        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+        return getScanners(sstables, null);
     }
 
-    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
+    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
     {
-        return getScanners(sstables, null);
+        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
     }
 
     public long getMaxSSTableBytes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 7dbeb44..911b4af 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorSt
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -142,70 +143,63 @@ public class CompactionTask extends AbstractCompactionTask
         logger.info("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg);
 
         long start = System.nanoTime();
-
         long totalKeysWritten = 0;
-
         long estimatedKeys = 0;
         try (CompactionController controller = getCompactionController(transaction.originals()))
         {
             Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
 
-            SSTableFormat.Type sstableFormat = getFormatType(transaction.originals());
-
             List<SSTableReader> newSStables;
-            AbstractCompactionIterable ci;
+
+            long[] mergedRowCounts;
 
             // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
             // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
             // See CASSANDRA-8019 and CASSANDRA-8399
+            int nowInSec = FBUtilities.nowInSeconds();
             try (Refs<SSTableReader> refs = Refs.ref(actuallyCompact);
-                 AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+                 AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact);
+                 CompactionIterator ci = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, taskId))
             {
+                if (collector != null)
+                    collector.beginCompaction(ci);
+                long lastCheckObsoletion = start;
+
+                if (!controller.cfs.getCompactionStrategyManager().isActive)
+                    throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
-                try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
                 {
-                    if (collector != null)
-                        collector.beginCompaction(ci);
-                    long lastCheckObsoletion = start;
+                    estimatedKeys = writer.estimatedKeys();
+                    while (ci.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                    if (!controller.cfs.getCompactionStrategyManager().isActive)
-                        throw new CompactionInterruptedException(ci.getCompactionInfo());
+                        if (writer.append(ci.next()))
+                            totalKeysWritten++;
 
-                    try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
-                    {
-                        estimatedKeys = writer.estimatedKeys();
-                        while (iter.hasNext())
+                        if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
                         {
-                            if (ci.isStopRequested())
-                                throw new CompactionInterruptedException(ci.getCompactionInfo());
-
-                            try (AbstractCompactedRow row = iter.next())
-                            {
-                                if (writer.append(row))
-                                    totalKeysWritten++;
-
-                                if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
-                                {
-                                    controller.maybeRefreshOverlaps();
-                                    lastCheckObsoletion = System.nanoTime();
-                                }
-                            }
+                            controller.maybeRefreshOverlaps();
+                            lastCheckObsoletion = System.nanoTime();
                         }
-
-                        // don't replace old sstables yet, as we need to mark the compaction finished in the system table
-                        newSStables = writer.finish();
                     }
-                    finally
-                    {
-                        // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
-                        // (in replaceCompactedSSTables)
-                        if (taskId != null)
-                            SystemKeyspace.finishCompaction(taskId);
 
-                        if (collector != null)
-                            collector.finishCompaction(ci);
-                    }
+                    // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+                    newSStables = writer.finish();
+                }
+                finally
+                {
+                    // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+                    // (in replaceCompactedSSTables)
+                    if (taskId != null)
+                        SystemKeyspace.finishCompaction(taskId);
+
+                    if (collector != null)
+                        collector.finishCompaction(ci);
+
+                    mergedRowCounts = ci.getMergedRowCounts();
                 }
             }
 
@@ -221,7 +215,7 @@ public class CompactionTask extends AbstractCompactionTask
 
             double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
             long totalSourceRows = 0;
-            String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
+            String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
             logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
                                       taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
             logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
@@ -242,14 +236,13 @@ public class CompactionTask extends AbstractCompactionTask
 
     }
 
-    public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, AbstractCompactionIterable ci, long startSize, long endSize)
+    public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
     {
-        long[] counts = ci.getMergedRowCounts();
-        StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+        StringBuilder mergeSummary = new StringBuilder(mergedRowCounts.length * 10);
         Map<Integer, Long> mergedRows = new HashMap<>();
-        for (int i = 0; i < counts.length; i++)
+        for (int i = 0; i < mergedRowCounts.length; i++)
         {
-            long count = counts[i];
+            long count = mergedRowCounts[i];
             if (count == 0)
                 continue;
 
@@ -305,13 +298,4 @@ public class CompactionTask extends AbstractCompactionTask
         }
         return max;
     }
-
-    public static SSTableFormat.Type getFormatType(Collection<SSTableReader> sstables)
-    {
-        if (sstables.isEmpty() || !SSTableFormat.enableSSTableDevelopmentTestMode)
-            return DatabaseDescriptor.getSSTableFormat();
-
-        //Allows us to test compaction of non-default formats
-        return sstables.iterator().next().descriptor.formatType;
-    }
 }


Mime
View raw message