cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [31/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:55 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
deleted file mode 100644
index 93505ae..0000000
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterators;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
-import org.apache.cassandra.io.sstable.ColumnNameHelper;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.SequentialWriter;
-import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.StreamingHistogram;
-import org.apache.cassandra.utils.Throwables;
-
-/**
- * LazilyCompactedRow only computes the row bloom filter and column index in memory
- * (at construction time); it does this by reading one column at a time from each
- * of the rows being compacted, and merging them as it does so.  So the most we have
- * in memory at a time is the bloom filter, the index, and one column from each
- * pre-compaction row.
- */
-public class LazilyCompactedRow extends AbstractCompactedRow
-{
-    protected final List<? extends OnDiskAtomIterator> rows;
-    protected final CompactionController controller;
-    protected boolean hasCalculatedMaxPurgeableTimestamp = false;
-    protected long maxPurgeableTimestamp;
-    protected final ColumnFamily emptyColumnFamily;
-    protected ColumnStats columnStats;
-    protected boolean closed;
-    protected ColumnIndex.Builder indexBuilder;
-    protected final SecondaryIndexManager.Updater indexer;
-    protected final Reducer reducer;
-    protected final Iterator<OnDiskAtom> merger;
-    protected DeletionTime maxRowTombstone;
-
-    public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
-    {
-        super(rows.get(0).getKey());
-        this.rows = rows;
-        this.controller = controller;
-        indexer = controller.cfs.indexManager.gcUpdaterFor(key);
-
-        // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp.  This may be
-        // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
-        maxRowTombstone = DeletionTime.LIVE;
-        for (OnDiskAtomIterator row : rows)
-        {
-            DeletionTime rowTombstone = row.getColumnFamily().deletionInfo().getTopLevelDeletion();
-            if (maxRowTombstone.compareTo(rowTombstone) < 0)
-                maxRowTombstone = rowTombstone;
-        }
-
-        emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-        emptyColumnFamily.delete(maxRowTombstone);
-        if (!maxRowTombstone.isLive() && maxRowTombstone.markedForDeleteAt < getMaxPurgeableTimestamp())
-            emptyColumnFamily.purgeTombstones(controller.gcBefore);
-
-        reducer = new Reducer();
-        merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator(), reducer), Predicates.notNull());
-    }
-
-    /**
-     * tombstones with a localDeletionTime before this can be purged.  This is the minimum timestamp for any sstable
-     * containing `key` outside of the set of sstables involved in this compaction.
-     */
-    private long getMaxPurgeableTimestamp()
-    {
-        if (!hasCalculatedMaxPurgeableTimestamp)
-        {
-            hasCalculatedMaxPurgeableTimestamp = true;
-            maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
-        }
-        return maxPurgeableTimestamp;
-    }
-
-    private static void removeDeleted(ColumnFamily cf, boolean shouldPurge, DecoratedKey key, CompactionController controller)
-    {
-        // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
-        // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
-        // without purging tombstones.
-        int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
-        ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.gcUpdaterFor(key));
-    }
-
-    public RowIndexEntry write(long currentPosition, SequentialWriter dataFile) throws IOException
-    {
-        assert !closed;
-
-        DataOutputPlus out = dataFile.stream;
-
-        ColumnIndex columnsIndex;
-        try
-        {
-            indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out);
-            columnsIndex = indexBuilder.buildForCompaction(merger);
-
-            // if there aren't any columns or tombstones, return null
-            if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete())
-                return null;
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        // reach into the reducer (created during iteration) to get column count, size, max column timestamp
-        columnStats = new ColumnStats(reducer.columns,
-                                      reducer.minTimestampTracker.get(),
-                                      Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampTracker.get()),
-                                      reducer.maxDeletionTimeTracker.get(),
-                                      reducer.tombstones,
-                                      reducer.minColumnNameSeen,
-                                      reducer.maxColumnNameSeen,
-                                      reducer.hasLegacyCounterShards);
-
-        // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
-        indexBuilder.maybeWriteEmptyRowHeader();
-
-        out.writeShort(BigTableWriter.END_OF_ROW);
-
-        close();
-
-        return RowIndexEntry.create(currentPosition, emptyColumnFamily.deletionInfo().getTopLevelDeletion(), columnsIndex);
-    }
-
-    public void update(MessageDigest digest)
-    {
-        assert !closed;
-
-        // no special-case for rows.size == 1, we're actually skipping some bytes here so just
-        // blindly updating everything wouldn't be correct
-        try (DataOutputBuffer out = new DataOutputBuffer())
-        {
-            // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
-            indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out);
-
-            DeletionTime.serializer.serialize(emptyColumnFamily.deletionInfo().getTopLevelDeletion(), out);
-
-            // do not update digest in case of missing or purged row level tombstones, see CASSANDRA-8979
-            // - digest for non-empty rows needs to be updated with deletion in any case to match digest with versions before patch
-            // - empty rows must not update digest in case of LIVE delete status to avoid mismatches with non-existing rows
-            //   this will however introduce in return a digest mismatch for versions before patch (which would update digest in any case)
-            if (merger.hasNext() || emptyColumnFamily.deletionInfo().getTopLevelDeletion() != DeletionTime.LIVE)
-            {
-                digest.update(out.getData(), 0, out.getLength());
-            }
-        }
-        catch (IOException e)
-        {
-            throw new AssertionError(e);
-        }
-
-        while (merger.hasNext())
-            merger.next().updateDigest(digest);
-        close();
-    }
-
-    public ColumnStats columnStats()
-    {
-        return columnStats;
-    }
-
-    public void close()
-    {
-        Throwable accumulate = null;
-        for (OnDiskAtomIterator row : rows)
-        {
-            try
-            {
-                row.close();
-            }
-            catch (IOException e)
-            {
-                accumulate = Throwables.merge(accumulate, e);
-            }
-        }
-        closed = true;
-        Throwables.maybeFail(accumulate);
-    }
-
-    protected class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
-    {
-        // all columns reduced together will have the same name, so there will only be one column
-        // in the container; we just want to leverage the conflict resolution code from CF.
-        // (Note that we add the row tombstone in getReduced.)
-        ColumnFamily container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata());
-
-        // tombstone reference; will be reconciled w/ column during getReduced.  Note that the top-level (row) tombstone
-        // is held by LCR.deletionInfo.
-        public RangeTombstone tombstone;
-
-        public int columns = 0;
-        // if the row tombstone is 'live' we need to set timestamp to MAX_VALUE to be able to overwrite it later
-        // markedForDeleteAt is MIN_VALUE for 'live' row tombstones (which we use to default maxTimestampSeen)
-
-        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
-        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
-        // we need to set MIN_VALUE if we are 'live' since we want to overwrite it later
-        // we are bound to have either a RangeTombstone or standard cells will set this properly:
-        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
-
-        public StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
-        public List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
-        public List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
-        public boolean hasLegacyCounterShards = false;
-
-        public Reducer()
-        {
-            minTimestampTracker.update(maxRowTombstone.isLive() ? Long.MAX_VALUE : maxRowTombstone.markedForDeleteAt);
-            maxTimestampTracker.update(maxRowTombstone.markedForDeleteAt);
-            maxDeletionTimeTracker.update(maxRowTombstone.isLive() ? Integer.MIN_VALUE : maxRowTombstone.localDeletionTime);
-        }
-
-        /**
-         * Called once per version of a cell that we need to merge, after which getReduced() is called.  In other words,
-         * this will be called one or more times with cells that share the same column name.
-         */
-        public void reduce(OnDiskAtom current)
-        {
-            if (current instanceof RangeTombstone)
-            {
-                if (tombstone == null || current.timestamp() >= tombstone.timestamp())
-                    tombstone = (RangeTombstone)current;
-            }
-            else
-            {
-                Cell cell = (Cell) current;
-                container.addColumn(cell);
-
-                // skip the index-update checks if there is no indexing needed since they are a bit expensive
-                if (indexer == SecondaryIndexManager.nullUpdater)
-                    return;
-
-                if (cell.isLive() && !container.getColumn(cell.name()).equals(cell))
-                    indexer.remove(cell);
-            }
-        }
-
-        /**
-         * Called after reduce() has been called for each cell sharing the same name.
-         */
-        protected OnDiskAtom getReduced()
-        {
-            if (tombstone != null)
-            {
-                RangeTombstone t = tombstone;
-                tombstone = null;
-
-                if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
-                {
-                    indexBuilder.tombstoneTracker().update(t, true);
-                    return null;
-                }
-                else
-                {
-                    tombstones.update(t.getLocalDeletionTime());
-                    minTimestampTracker.update(t.timestamp());
-                    maxTimestampTracker.update(t.timestamp());
-                    maxDeletionTimeTracker.update(t.getLocalDeletionTime());
-                    minColumnNameSeen = ColumnNameHelper.minComponents(minColumnNameSeen, t.min, controller.cfs.metadata.comparator);
-                    maxColumnNameSeen = ColumnNameHelper.maxComponents(maxColumnNameSeen, t.max, controller.cfs.metadata.comparator);
-                    return t;
-                }
-            }
-            else
-            {
-                // when we clear() the container, it removes the deletion info, so this needs to be reset each time
-                container.delete(maxRowTombstone);
-                Iterator<Cell> iter = container.iterator();
-                Cell c = iter.next();
-                boolean shouldPurge = c.getLocalDeletionTime() < Integer.MAX_VALUE && c.timestamp() < getMaxPurgeableTimestamp();
-                removeDeleted(container, shouldPurge, key, controller);
-                iter = container.iterator();
-                if (!iter.hasNext())
-                {
-                    // don't call clear() because that resets the deletion time. See CASSANDRA-7808.
-                    container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata());
-                    return null;
-                }
-
-                int localDeletionTime = container.deletionInfo().getTopLevelDeletion().localDeletionTime;
-                if (localDeletionTime < Integer.MAX_VALUE)
-                    tombstones.update(localDeletionTime);
-
-                Cell reduced = iter.next();
-                container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata());
-
-                // removeDeleted have only checked the top-level CF deletion times,
-                // not the range tombstone. For that we use the columnIndexer tombstone tracker.
-                if (indexBuilder.tombstoneTracker().isDeleted(reduced))
-                {
-                    // We skip that column so it won't be passed to the tracker by the index builded. So pass it now to
-                    // make sure we still discard potentially un-needed RT as soon as possible.
-                    indexBuilder.tombstoneTracker().update(reduced, false);
-                    indexer.remove(reduced);
-                    return null;
-                }
-
-                columns++;
-                minTimestampTracker.update(reduced.timestamp());
-                maxTimestampTracker.update(reduced.timestamp());
-                maxDeletionTimeTracker.update(reduced.getLocalDeletionTime());
-                minColumnNameSeen = ColumnNameHelper.minComponents(minColumnNameSeen, reduced.name(), controller.cfs.metadata.comparator);
-                maxColumnNameSeen = ColumnNameHelper.maxComponents(maxColumnNameSeen, reduced.name(), controller.cfs.metadata.comparator);
-
-                int deletionTime = reduced.getLocalDeletionTime();
-                if (deletionTime < Integer.MAX_VALUE)
-                    tombstones.update(deletionTime);
-
-                if (reduced instanceof CounterCell)
-                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) reduced).hasLegacyShards();
-
-                return reduced;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9eb58ff..dcdfd7f 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.io.IOException;
 import java.util.*;
 
 
@@ -26,17 +25,18 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import com.google.common.primitives.Doubles;
 
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class LeveledCompactionStrategy extends AbstractCompactionStrategy
 {
@@ -244,7 +244,11 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
                     // Create a LeveledScanner that only opens one sstable at a time, in sorted order
                     List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
                     if (!intersecting.isEmpty())
-                        scanners.add(new LeveledScanner(intersecting, range));
+                    {
+                        @SuppressWarnings("resource") // The ScannerList will be in charge of closing (and we close properly on errors)
+                        ISSTableScanner scanner = new LeveledScanner(intersecting, range);
+                        scanners.add(scanner);
+                    }
                 }
             }
         }
@@ -284,7 +288,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
 
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
     // same level (e.g. non overlapping) - see #4142
-    private static class LeveledScanner extends AbstractIterator<OnDiskAtomIterator> implements ISSTableScanner
+    private static class LeveledScanner extends AbstractIterator<UnfilteredRowIterator> implements ISSTableScanner
     {
         private final Range<Token> range;
         private final List<SSTableReader> sstables;
@@ -332,36 +336,34 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
             return filtered;
         }
 
-        protected OnDiskAtomIterator computeNext()
+        public boolean isForThrift()
+        {
+            return false;
+        }
+
+        protected UnfilteredRowIterator computeNext()
         {
             if (currentScanner == null)
                 return endOfData();
 
-            try
+            while (true)
             {
-                while (true)
-                {
-                    if (currentScanner.hasNext())
-                        return currentScanner.next();
+                if (currentScanner.hasNext())
+                    return currentScanner.next();
 
-                    positionOffset += currentScanner.getLengthInBytes();
-                    currentScanner.close();
-                    if (!sstableIterator.hasNext())
-                    {
-                        // reset to null so getCurrentPosition does not return wrong value
-                        currentScanner = null;
-                        return endOfData();
-                    }
-                    currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter());
+                positionOffset += currentScanner.getLengthInBytes();
+                currentScanner.close();
+                if (!sstableIterator.hasNext())
+                {
+                    // reset to null so getCurrentPosition does not return wrong value
+                    currentScanner = null;
+                    return endOfData();
                 }
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
+                currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter());
             }
         }
 
-        public void close() throws IOException
+        public void close()
         {
             if (currentScanner != null)
                 currentScanner.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 0763316..0cee370 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -27,13 +27,14 @@ import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -63,7 +64,7 @@ public class LeveledManifest
     private final ColumnFamilyStore cfs;
     @VisibleForTesting
     protected final List<SSTableReader>[] generations;
-    private final RowPosition[] lastCompactedKeys;
+    private final PartitionPosition[] lastCompactedKeys;
     private final long maxSSTableSizeInBytes;
     private final SizeTieredCompactionStrategyOptions options;
     private final int [] compactionCounter;
@@ -75,7 +76,7 @@ public class LeveledManifest
         this.options = options;
 
         generations = new List[MAX_LEVEL_COUNT];
-        lastCompactedKeys = new RowPosition[MAX_LEVEL_COUNT];
+        lastCompactedKeys = new PartitionPosition[MAX_LEVEL_COUNT];
         for (int i = 0; i < generations.length; i++)
         {
             generations[i] = new ArrayList<>();
@@ -402,8 +403,8 @@ public class LeveledManifest
                     // say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
                     // this means that we will not create overlap in L2 if we add an sstable
                     // contained within 0 -> 33 to the compaction
-                    RowPosition max = null;
-                    RowPosition min = null;
+                    PartitionPosition max = null;
+                    PartitionPosition min = null;
                     for (SSTableReader candidate : candidates)
                     {
                         if (min == null || candidate.first.compareTo(min) < 0)
@@ -414,10 +415,10 @@ public class LeveledManifest
                     if (min == null || max == null || min.equals(max)) // single partition sstables - we cannot include a high level sstable.
                         return candidates;
                     Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
-                    Range<RowPosition> boundaries = new Range<>(min, max);
+                    Range<PartitionPosition> boundaries = new Range<>(min, max);
                     for (SSTableReader sstable : getLevel(i))
                     {
-                        Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
+                        Range<PartitionPosition> r = new Range<PartitionPosition>(sstable.first, sstable.last);
                         if (boundaries.contains(r) && !compacting.contains(sstable))
                         {
                             logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
@@ -546,8 +547,8 @@ public class LeveledManifest
         {
             Set<SSTableReader> compactingL0 = getCompacting(0);
 
-            RowPosition lastCompactingKey = null;
-            RowPosition firstCompactingKey = null;
+            PartitionPosition lastCompactingKey = null;
+            PartitionPosition firstCompactingKey = null;
             for (SSTableReader candidate : compactingL0)
             {
                 if (firstCompactingKey == null || candidate.first.compareTo(firstCompactingKey) < 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 10952e7..562d681 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -26,6 +26,8 @@ import com.google.common.base.Throwables;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -33,6 +35,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.UUIDGen;
@@ -45,7 +48,6 @@ public class Scrubber implements Closeable
     private final File destination;
     private final boolean skipCorrupted;
 
-    private final CompactionController controller;
     private final boolean isCommutative;
     private final boolean isIndex;
     private final boolean checkData;
@@ -72,14 +74,14 @@ public class Scrubber implements Closeable
 
     private final OutputHandler outputHandler;
 
-    private static final Comparator<Row> rowComparator = new Comparator<Row>()
+    private static final Comparator<Partition> partitionComparator = new Comparator<Partition>()
     {
-         public int compare(Row r1, Row r2)
+         public int compare(Partition r1, Partition r2)
          {
-             return r1.key.compareTo(r2.key);
+             return r1.partitionKey().compareTo(r2.partitionKey());
          }
     };
-    private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
+    private final SortedSet<Partition> outOfOrder = new TreeSet<>(partitionComparator);
 
     public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
     {
@@ -95,7 +97,9 @@ public class Scrubber implements Closeable
         this.outputHandler = outputHandler;
         this.skipCorrupted = skipCorrupted;
         this.isOffline = isOffline;
-        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
+                                                                                                        sstable.descriptor.version,
+                                                                                                        sstable.header);
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
@@ -104,10 +108,6 @@ public class Scrubber implements Closeable
         if (destination == null)
             throw new IOException("disk full");
 
-        // If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes.
-        this.controller = isOffline
-                        ? new ScrubController(cfs)
-                        : new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs));
         this.isCommutative = cfs.metadata.isCounter();
         this.isIndex = cfs.isIndex();
         this.checkData = checkData && !this.isIndex; //LocalByPartitionerType does not support validation
@@ -127,15 +127,21 @@ public class Scrubber implements Closeable
         this.nextRowPositionFromIndex = 0;
     }
 
+    private UnfilteredRowIterator withValidation(UnfilteredRowIterator iter, String filename)
+    {
+        return checkData ? UnfilteredRowIterators.withValidation(iter, filename) : iter;
+    }
+
     public void scrub()
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
+        int nowInSec = FBUtilities.nowInSeconds();
         try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline);)
         {
             nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
             {
                 // throw away variable so we don't have a side effect in the assert
-                long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position;
+                long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile).position;
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
@@ -188,7 +194,8 @@ public class Scrubber implements Closeable
                     if (currentIndexKey != null && !key.getKey().equals(currentIndexKey))
                     {
                         throw new IOError(new IOException(String.format("Key from data file (%s) does not match key from index file (%s)",
-                                ByteBufferUtil.bytesToHex(key.getKey()), ByteBufferUtil.bytesToHex(currentIndexKey))));
+                                //ByteBufferUtil.bytesToHex(key.getKey()), ByteBufferUtil.bytesToHex(currentIndexKey))));
+                                "_too big_", ByteBufferUtil.bytesToHex(currentIndexKey))));
                     }
 
                     if (dataSizeFromIndex > dataFile.length())
@@ -197,20 +204,19 @@ public class Scrubber implements Closeable
                     if (dataStart != dataStartFromIndex)
                         outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataStartFromIndex));
 
-                    SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, checkData);
-
-                    if (prevKey != null && prevKey.compareTo(key) > 0)
+                    try (UnfilteredRowIterator iterator = withValidation(new SSTableIdentityIterator(sstable, dataFile, key), dataFile.getPath()))
                     {
-                        saveOutOfOrderRow(prevKey, key, atoms);
-                        continue;
-                    }
+                        if (prevKey != null && prevKey.compareTo(key) > 0)
+                        {
+                            saveOutOfOrderRow(prevKey, key, iterator);
+                            continue;
+                        }
 
-                    @SuppressWarnings("resource")
-                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
-                    if (writer.tryAppend(compactedRow) == null)
-                        emptyRows++;
-                    else
-                        goodRows++;
+                        if (writer.tryAppend(iterator) == null)
+                            emptyRows++;
+                        else
+                            goodRows++;
+                    }
 
                     prevKey = key;
                 }
@@ -229,20 +235,20 @@ public class Scrubber implements Closeable
                         {
                             dataFile.seek(dataStartFromIndex);
 
-                            SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, checkData);
-                            if (prevKey != null && prevKey.compareTo(key) > 0)
+                            try (UnfilteredRowIterator iterator = withValidation(new SSTableIdentityIterator(sstable, dataFile, key), dataFile.getPath()))
                             {
-                                saveOutOfOrderRow(prevKey, key, atoms);
-                                continue;
+                                if (prevKey != null && prevKey.compareTo(key) > 0)
+                                {
+                                    saveOutOfOrderRow(prevKey, key, iterator);
+                                    continue;
+                                }
+
+                                if (writer.tryAppend(iterator) == null)
+                                    emptyRows++;
+                                else
+                                    goodRows++;
                             }
 
-                            @SuppressWarnings("resource")
-                            AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
-                            if (writer.tryAppend(compactedRow) == null)
-                                emptyRows++;
-                            else
-                                goodRows++;
-
                             prevKey = key;
                         }
                         catch (Throwable th2)
@@ -267,18 +273,18 @@ public class Scrubber implements Closeable
                 }
             }
 
-            if (!outOfOrderRows.isEmpty())
+            if (!outOfOrder.isEmpty())
             {
                 // out of order rows, but no bad rows found - we can keep our repairedAt time
                 long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
                 try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);)
                 {
-                    for (Row row : outOfOrderRows)
-                        inOrderWriter.append(row.key, row.cf);
+                    for (Partition partition : outOfOrder)
+                        inOrderWriter.append(partition.unfilteredIterator());
                     newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true);
                 }
                 transaction.update(newInOrderSstable, false);
-                outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
+                outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrder.size(), sstable, newInOrderSstable));
             }
 
             // finish obsoletes the old sstable
@@ -290,10 +296,6 @@ public class Scrubber implements Closeable
         {
             throw Throwables.propagate(e);
         }
-        finally
-        {
-            controller.close();
-        }
 
         if (newSstable == null)
         {
@@ -318,8 +320,8 @@ public class Scrubber implements Closeable
         {
             nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
             nextRowPositionFromIndex = indexFile.isEOF()
-                    ? dataFile.length()
-                    : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position;
+                                     ? dataFile.length()
+                                     : rowIndexEntrySerializer.deserialize(indexFile).position;
         }
         catch (Throwable th)
         {
@@ -350,19 +352,11 @@ public class Scrubber implements Closeable
         }
     }
 
-    private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+    private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, UnfilteredRowIterator iterator)
     {
         // TODO bitch if the row is too large?  if it is there's not much we can do ...
         outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
-        // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
-        // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
-        ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false);
-        while (atoms.hasNext())
-        {
-            OnDiskAtom atom = atoms.next();
-            cf.addAtom(atom);
-        }
-        outOfOrderRows.add(new Row(key, cf));
+        outOfOrder.add(ArrayBackedPartition.create(iterator));
     }
 
     public SSTableReader getNewSSTable()
@@ -442,20 +436,6 @@ public class Scrubber implements Closeable
         }
     }
 
-    private static class ScrubController extends CompactionController
-    {
-        public ScrubController(ColumnFamilyStore cfs)
-        {
-            super(cfs, Integer.MAX_VALUE);
-        }
-
-        @Override
-        public long maxPurgeableTimestamp(DecoratedKey key)
-        {
-            return Long.MIN_VALUE;
-        }
-    }
-
     @VisibleForTesting
     public ScrubResult scrubWithResult()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index a0cce24..e3764c8 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -21,16 +21,18 @@ import java.io.File;
 import java.util.*;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -78,24 +80,27 @@ public class Upgrader
                 sstableMetadataCollector.addAncestor(i);
         }
         sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
-        return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+        return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)),
+                                    estimatedRows,
+                                    repairedAt,
+                                    cfs.metadata,
+                                    cfs.partitioner,
+                                    sstableMetadataCollector,
+                                    SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)));
     }
 
     public void upgrade()
     {
         outputHandler.output("Upgrading " + sstable);
 
+        int nowInSec = FBUtilities.nowInSeconds();
         try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true);
              AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
-             CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator())
+             CompactionIterator iter = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
-            {
-                @SuppressWarnings("resource")
-                AbstractCompactedRow row = iter.next();
-                writer.append(row);
-            }
+                writer.append(iter.next());
 
             writer.finish();
             outputHandler.output("Upgrade of " + sstable + " complete.");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 0177819..90a97a0 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -18,8 +18,9 @@
 package org.apache.cassandra.db.compaction;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
+
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
@@ -31,7 +32,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -71,7 +72,7 @@ public class Verifier implements Closeable
         this.cfs = cfs;
         this.sstable = sstable;
         this.outputHandler = outputHandler;
-        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata, sstable.descriptor.version, sstable.header);
 
         this.controller = new VerifyController(cfs);
 
@@ -102,7 +103,7 @@ public class Verifier implements Closeable
             }
             else
             {
-                outputHandler.output("Data digest missing, assuming extended verification of disk atoms");
+                outputHandler.output("Data digest missing, assuming extended verification of disk values");
                 extended = true;
             }
         }
@@ -119,14 +120,14 @@ public class Verifier implements Closeable
         if ( !extended )
             return;
 
-        outputHandler.output("Extended Verify requested, proceeding to inspect atoms");
+        outputHandler.output("Extended Verify requested, proceeding to inspect values");
 
 
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
             {
-                long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position;
+                long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile).position;
                 if (firstRowPositionFromIndex != 0)
                     markAndThrow();
             }
@@ -160,7 +161,7 @@ public class Verifier implements Closeable
                     nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
                     nextRowPositionFromIndex = indexFile.isEOF()
                                              ? dataFile.length()
-                                             : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position;
+                                             : rowIndexEntrySerializer.deserialize(indexFile).position;
                 }
                 catch (Throwable th)
                 {
@@ -185,7 +186,10 @@ public class Verifier implements Closeable
                         markAndThrow();
 
                     //mimic the scrub read path
-                    new SSTableIdentityIterator(sstable, dataFile, key, true);
+                    try (UnfilteredRowIterator iterator = new SSTableIdentityIterator(sstable, dataFile, key))
+                    {
+                    }
+
                     if ( (prevKey != null && prevKey.compareTo(key) > 0) || !key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex )
                         markAndThrow();
                     

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 20c96d6..610592f 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -23,7 +23,7 @@ import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
@@ -55,11 +55,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     }
 
     /**
-     * Writes a row in an implementation specific way
-     * @param row the row to append
-     * @return true if the row was written, false otherwise
+     * Writes a partition in an implementation specific way
+     * @param partition the partition to append
+     * @return true if the partition was written, false otherwise
      */
-    public abstract boolean append(AbstractCompactedRow row);
+    public abstract boolean append(UnfilteredRowIterator partition);
 
     @Override
     protected Throwable doAbort(Throwable accumulate)
@@ -117,4 +117,4 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
         return directory;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 7d88458..8fc7bec 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -25,7 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -54,14 +55,15 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
-                                                    new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0));
+                                                    new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
         sstableWriter.switchWriter(writer);
     }
 
     @Override
-    public boolean append(AbstractCompactedRow row)
+    public boolean append(UnfilteredRowIterator partition)
     {
-        return sstableWriter.append(row) != null;
+        return sstableWriter.append(partition) != null;
     }
 
     @Override
@@ -69,4 +71,4 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     {
         return estimatedTotalKeys;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 95d7a0c..5328fa5 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -25,7 +25,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -68,16 +69,17 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors));
+                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
         sstableWriter.switchWriter(writer);
     }
 
     @Override
     @SuppressWarnings("resource")
-    public boolean append(AbstractCompactedRow row)
+    public boolean append(UnfilteredRowIterator partition)
     {
         long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer();
-        RowIndexEntry rie = sstableWriter.append(row);
+        RowIndexEntry rie = sstableWriter.append(partition);
         totalWrittenInLevel += sstableWriter.currentWriter().getOnDiskFilePointer() - posBefore;
         partitionsWritten++;
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
@@ -95,7 +97,8 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                                         minRepairedAt,
                                                         cfs.metadata,
                                                         cfs.partitioner,
-                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors));
+                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
+                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
             sstableWriter.switchWriter(writer);
             partitionsWritten = 0;
             sstablesWritten++;
@@ -103,4 +106,4 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
         return rie != null;
 
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index d30a612..4832fd5 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -22,7 +22,8 @@ import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -57,14 +58,15 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, level));
+                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
         sstableWriter.switchWriter(writer);
     }
 
     @Override
-    public boolean append(AbstractCompactedRow row)
+    public boolean append(UnfilteredRowIterator partition)
     {
-        RowIndexEntry rie = sstableWriter.append(row);
+        RowIndexEntry rie = sstableWriter.append(partition);
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
         {
             File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
@@ -74,7 +76,8 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                                                 minRepairedAt,
                                                                 cfs.metadata,
                                                                 cfs.partitioner,
-                                                                new MetadataCollector(allSSTables, cfs.metadata.comparator, level));
+                                                                new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
+                                                                SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
 
             sstableWriter.switchWriter(writer);
         }
@@ -86,4 +89,4 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
     {
         return estimatedTotalKeys;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 9ff1325..ba85eef 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -26,7 +26,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -90,16 +91,17 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
                                                                             minRepairedAt,
                                                                             cfs.metadata,
                                                                             cfs.partitioner,
-                                                                            new MetadataCollector(allSSTables, cfs.metadata.comparator, 0));
+                                                                            new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+                                                                            SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
 
         sstableWriter.switchWriter(writer);
         logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
     }
 
     @Override
-    public boolean append(AbstractCompactedRow row)
+    public boolean append(UnfilteredRowIterator partition)
     {
-        RowIndexEntry rie = sstableWriter.append(row);
+        RowIndexEntry rie = sstableWriter.append(partition);
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
         {
             currentRatioIndex++;
@@ -112,10 +114,11 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
                                                                                 minRepairedAt,
                                                                                 cfs.metadata,
                                                                                 cfs.partitioner,
-                                                                                new MetadataCollector(allSSTables, cfs.metadata.comparator, 0));
+                                                                                new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+                                                                                SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
             sstableWriter.switchWriter(writer);
             logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
         }
         return rie != null;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
deleted file mode 100644
index a982280..0000000
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.NativeCell;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.AbstractCompositeType;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-
-public abstract class AbstractCType implements CType
-{
-    static final Comparator<Cell> rightNativeCell = new Comparator<Cell>()
-    {
-        public int compare(Cell o1, Cell o2)
-        {
-            return -((NativeCell) o2).compareTo(o1.name());
-        }
-    };
-
-    static final Comparator<Cell> neitherNativeCell = new Comparator<Cell>()
-    {
-        public int compare(Cell o1, Cell o2)
-        {
-            return compareUnsigned(o1.name(), o2.name());
-        }
-    };
-
-    // only one or the other of these will ever be used
-    static final Comparator<Object> asymmetricRightNativeCell = new Comparator<Object>()
-    {
-        public int compare(Object o1, Object o2)
-        {
-            return -((NativeCell) o2).compareTo((Composite) o1);
-        }
-    };
-
-    static final Comparator<Object> asymmetricNeitherNativeCell = new Comparator<Object>()
-    {
-        public int compare(Object o1, Object o2)
-        {
-            return compareUnsigned((Composite) o1, ((Cell) o2).name());
-        }
-    };
-
-    private final Comparator<Composite> reverseComparator;
-    private final Comparator<IndexInfo> indexComparator;
-    private final Comparator<IndexInfo> indexReverseComparator;
-
-    private final Serializer serializer;
-
-    private final IVersionedSerializer<ColumnSlice> sliceSerializer;
-    private final IVersionedSerializer<SliceQueryFilter> sliceQueryFilterSerializer;
-    private final DeletionInfo.Serializer deletionInfoSerializer;
-    private final RangeTombstone.Serializer rangeTombstoneSerializer;
-
-    protected final boolean isByteOrderComparable;
-
-    protected AbstractCType(boolean isByteOrderComparable)
-    {
-        reverseComparator = new Comparator<Composite>()
-        {
-            public int compare(Composite c1, Composite c2)
-            {
-                return AbstractCType.this.compare(c2, c1);
-            }
-        };
-        indexComparator = new Comparator<IndexInfo>()
-        {
-            public int compare(IndexInfo o1, IndexInfo o2)
-            {
-                return AbstractCType.this.compare(o1.lastName, o2.lastName);
-            }
-        };
-        indexReverseComparator = new Comparator<IndexInfo>()
-        {
-            public int compare(IndexInfo o1, IndexInfo o2)
-            {
-                return AbstractCType.this.compare(o1.firstName, o2.firstName);
-            }
-        };
-
-        serializer = new Serializer(this);
-
-        sliceSerializer = new ColumnSlice.Serializer(this);
-        sliceQueryFilterSerializer = new SliceQueryFilter.Serializer(this);
-        deletionInfoSerializer = new DeletionInfo.Serializer(this);
-        rangeTombstoneSerializer = new RangeTombstone.Serializer(this);
-        this.isByteOrderComparable = isByteOrderComparable;
-    }
-
-    protected static boolean isByteOrderComparable(Iterable<AbstractType<?>> types)
-    {
-        boolean isByteOrderComparable = true;
-        for (AbstractType<?> type : types)
-            isByteOrderComparable &= type.isByteOrderComparable();
-        return isByteOrderComparable;
-    }
-
-    static int compareUnsigned(Composite c1, Composite c2)
-    {
-        if (c1.isStatic() != c2.isStatic())
-        {
-            // Static sorts before non-static no matter what, except for empty which
-            // always sort first
-            if (c1.isEmpty())
-                return c2.isEmpty() ? 0 : -1;
-            if (c2.isEmpty())
-                return 1;
-            return c1.isStatic() ? -1 : 1;
-        }
-
-        int s1 = c1.size();
-        int s2 = c2.size();
-        int minSize = Math.min(s1, s2);
-
-        for (int i = 0; i < minSize; i++)
-        {
-            int cmp = ByteBufferUtil.compareUnsigned(c1.get(i), c2.get(i));
-            if (cmp != 0)
-                return cmp;
-        }
-
-        if (s1 == s2)
-            return c1.eoc().compareTo(c2.eoc());
-        return s1 < s2 ? c1.eoc().prefixComparisonResult : -c2.eoc().prefixComparisonResult;
-    }
-
-    public int compare(Composite c1, Composite c2)
-    {
-        if (c1.isStatic() != c2.isStatic())
-        {
-            // Static sorts before non-static no matter what, except for empty which
-            // always sort first
-            if (c1.isEmpty())
-                return c2.isEmpty() ? 0 : -1;
-            if (c2.isEmpty())
-                return 1;
-            return c1.isStatic() ? -1 : 1;
-        }
-
-        int s1 = c1.size();
-        int s2 = c2.size();
-        int minSize = Math.min(s1, s2);
-
-        for (int i = 0; i < minSize; i++)
-        {
-            int cmp = isByteOrderComparable
-                      ? ByteBufferUtil.compareUnsigned(c1.get(i), c2.get(i))
-                      : subtype(i).compare(c1.get(i), c2.get(i));
-            if (cmp != 0)
-                return cmp;
-        }
-
-        if (s1 == s2)
-            return c1.eoc().compareTo(c2.eoc());
-        return s1 < s2 ? c1.eoc().prefixComparisonResult : -c2.eoc().prefixComparisonResult;
-    }
-
-    protected Comparator<Cell> getByteOrderColumnComparator(boolean isRightNative)
-    {
-        if (isRightNative)
-            return rightNativeCell;
-        return neitherNativeCell;
-    }
-
-    protected Comparator<Object> getByteOrderAsymmetricColumnComparator(boolean isRightNative)
-    {
-        if (isRightNative)
-            return asymmetricRightNativeCell;
-        return asymmetricNeitherNativeCell;
-    }
-
-    public void validate(Composite name)
-    {
-        ByteBuffer previous = null;
-        for (int i = 0; i < name.size(); i++)
-        {
-            AbstractType<?> comparator = subtype(i);
-            ByteBuffer value = name.get(i);
-            comparator.validateCollectionMember(value, previous);
-            previous = value;
-        }
-    }
-
-    public boolean isCompatibleWith(CType previous)
-    {
-        if (this == previous)
-            return true;
-
-        // Extending with new components is fine, shrinking is not
-        if (size() < previous.size())
-            return false;
-
-        for (int i = 0; i < previous.size(); i++)
-        {
-            AbstractType<?> tprev = previous.subtype(i);
-            AbstractType<?> tnew = subtype(i);
-            if (!tnew.isCompatibleWith(tprev))
-                return false;
-        }
-        return true;
-    }
-
-    public String getString(Composite c)
-    {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < c.size(); i++)
-        {
-            if (i > 0)
-                sb.append(":");
-            sb.append(AbstractCompositeType.escape(subtype(i).getString(c.get(i))));
-        }
-        switch (c.eoc())
-        {
-            case START:
-                sb.append(":_");
-                break;
-            case END:
-                sb.append(":!");
-                break;
-        }
-        return sb.toString();
-    }
-
-    public Composite make(Object... components)
-    {
-        if (components.length > size())
-            throw new IllegalArgumentException("Too many components, max is " + size());
-
-        CBuilder builder = builder();
-        for (int i = 0; i < components.length; i++)
-        {
-            Object obj = components[i];
-            if (obj instanceof ByteBuffer)
-                builder.add((ByteBuffer)obj);
-            else
-                builder.add(obj);
-        }
-        return builder.build();
-    }
-
-    public CType.Serializer serializer()
-    {
-        return serializer;
-    }
-
-    public Comparator<Composite> reverseComparator()
-    {
-        return reverseComparator;
-    }
-
-    public Comparator<IndexInfo> indexComparator()
-    {
-        return indexComparator;
-    }
-
-    public Comparator<IndexInfo> indexReverseComparator()
-    {
-        return indexReverseComparator;
-    }
-
-    public IVersionedSerializer<ColumnSlice> sliceSerializer()
-    {
-        return sliceSerializer;
-    }
-
-    public IVersionedSerializer<SliceQueryFilter> sliceQueryFilterSerializer()
-    {
-        return sliceQueryFilterSerializer;
-    }
-
-    public DeletionInfo.Serializer deletionInfoSerializer()
-    {
-        return deletionInfoSerializer;
-    }
-
-    public RangeTombstone.Serializer rangeTombstoneSerializer()
-    {
-        return rangeTombstoneSerializer;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-            return true;
-
-        if (o == null)
-            return false;
-
-        if (!getClass().equals(o.getClass()))
-            return false;
-
-        CType c = (CType)o;
-        if (size() != c.size())
-            return false;
-
-        for (int i = 0; i < size(); i++)
-        {
-            if (!subtype(i).equals(c.subtype(i)))
-                return false;
-        }
-        return true;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int h = 31;
-        for (int i = 0; i < size(); i++)
-            h += subtype(i).hashCode();
-        return h + getClass().hashCode();
-    }
-
-    @Override
-    public String toString()
-    {
-        return asAbstractType().toString();
-    }
-
-    protected static ByteBuffer sliceBytes(ByteBuffer bb, int offs, int length)
-    {
-        ByteBuffer copy = bb.duplicate();
-        copy.position(offs);
-        copy.limit(offs + length);
-        return copy;
-    }
-
-    protected static void checkRemaining(ByteBuffer bb, int offs, int length)
-    {
-        if (offs + length > bb.limit())
-            throw new IllegalArgumentException("Not enough bytes");
-    }
-
-    private static class Serializer implements CType.Serializer
-    {
-        private final CType type;
-
-        public Serializer(CType type)
-        {
-            this.type = type;
-        }
-
-        public void serialize(Composite c, DataOutputPlus out) throws IOException
-        {
-            ByteBufferUtil.writeWithShortLength(c.toByteBuffer(), out);
-        }
-
-        public Composite deserialize(DataInput in) throws IOException
-        {
-            return type.fromByteBuffer(ByteBufferUtil.readWithShortLength(in));
-        }
-
-        public long serializedSize(Composite c, TypeSizes type)
-        {
-            return type.sizeofWithShortLength(c.toByteBuffer());
-        }
-
-        public void skip(DataInput in) throws IOException
-        {
-            ByteBufferUtil.skipShortLength(in);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java b/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
deleted file mode 100644
index c62f890..0000000
--- a/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.composites;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.CQL3Row;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CollectionType;
-import org.apache.cassandra.db.marshal.ColumnToCollectionType;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public abstract class AbstractCellNameType extends AbstractCType implements CellNameType
-{
-    final Comparator<Cell> columnComparator;
-    private final Comparator<Cell> columnReverseComparator;
-    final Comparator<Object> asymmetricComparator;
-    private final Comparator<OnDiskAtom> onDiskAtomComparator;
-
-    private final ISerializer<CellName> cellSerializer;
-    private final ColumnSerializer columnSerializer;
-    private final OnDiskAtom.Serializer onDiskAtomSerializer;
-    private final IVersionedSerializer<NamesQueryFilter> namesQueryFilterSerializer;
-    private final IVersionedSerializer<IDiskAtomFilter> diskAtomFilterSerializer;
-
-    protected AbstractCellNameType(boolean isByteOrderComparable)
-    {
-        super(isByteOrderComparable);
-        columnComparator = new Comparator<Cell>()
-        {
-            public int compare(Cell c1, Cell c2)
-            {
-                return AbstractCellNameType.this.compare(c1.name(), c2.name());
-            }
-        };
-        asymmetricComparator = new Comparator<Object>()
-        {
-            public int compare(Object c1, Object c2)
-            {
-                return AbstractCellNameType.this.compare((Composite) c1, ((Cell) c2).name());
-            }
-        };
-        columnReverseComparator = new Comparator<Cell>()
-        {
-            public int compare(Cell c1, Cell c2)
-            {
-                return AbstractCellNameType.this.compare(c2.name(), c1.name());
-            }
-        };
-        onDiskAtomComparator = new Comparator<OnDiskAtom>()
-        {
-            public int compare(OnDiskAtom c1, OnDiskAtom c2)
-            {
-                int comp = AbstractCellNameType.this.compare(c1.name(), c2.name());
-                if (comp != 0)
-                    return comp;
-
-                if (c1 instanceof RangeTombstone)
-                {
-                    if (c2 instanceof RangeTombstone)
-                    {
-                        RangeTombstone t1 = (RangeTombstone)c1;
-                        RangeTombstone t2 = (RangeTombstone)c2;
-                        int comp2 = AbstractCellNameType.this.compare(t1.max, t2.max);
-                        return comp2 == 0 ? t1.data.compareTo(t2.data) : comp2;
-                    }
-                    else
-                    {
-                        return -1;
-                    }
-                }
-                else
-                {
-                    return c2 instanceof RangeTombstone ? 1 : 0;
-                }
-            }
-        };
-
-        // A trivial wrapped over the composite serializer
-        cellSerializer = new ISerializer<CellName>()
-        {
-            public void serialize(CellName c, DataOutputPlus out) throws IOException
-            {
-                serializer().serialize(c, out);
-            }
-
-            public CellName deserialize(DataInput in) throws IOException
-            {
-                Composite ct = serializer().deserialize(in);
-                if (ct.isEmpty())
-                    throw ColumnSerializer.CorruptColumnException.create(in, ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
-                assert ct instanceof CellName : ct;
-                return (CellName)ct;
-            }
-
-            public long serializedSize(CellName c, TypeSizes type)
-            {
-                return serializer().serializedSize(c, type);
-            }
-        };
-        columnSerializer = new ColumnSerializer(this);
-        onDiskAtomSerializer = new OnDiskAtom.Serializer(this);
-        namesQueryFilterSerializer = new NamesQueryFilter.Serializer(this);
-        diskAtomFilterSerializer = new IDiskAtomFilter.Serializer(this);
-    }
-
-    public final Comparator<Cell> columnComparator(boolean isRightNative)
-    {
-        if (!isByteOrderComparable)
-            return columnComparator;
-        return getByteOrderColumnComparator(isRightNative);
-    }
-
-    public final Comparator<Object> asymmetricColumnComparator(boolean isRightNative)
-    {
-        if (!isByteOrderComparable)
-            return asymmetricComparator;
-        return getByteOrderAsymmetricColumnComparator(isRightNative);
-    }
-
-    public Comparator<Cell> columnReverseComparator()
-    {
-        return columnReverseComparator;
-    }
-
-    public Comparator<OnDiskAtom> onDiskAtomComparator()
-    {
-        return onDiskAtomComparator;
-    }
-
-    public ISerializer<CellName> cellSerializer()
-    {
-        return cellSerializer;
-    }
-
-    public ColumnSerializer columnSerializer()
-    {
-        return columnSerializer;
-    }
-
-    public OnDiskAtom.Serializer onDiskAtomSerializer()
-    {
-        return onDiskAtomSerializer;
-    }
-
-    public IVersionedSerializer<NamesQueryFilter> namesQueryFilterSerializer()
-    {
-        return namesQueryFilterSerializer;
-    }
-
-    public IVersionedSerializer<IDiskAtomFilter> diskAtomFilterSerializer()
-    {
-        return diskAtomFilterSerializer;
-    }
-
-    public CellName cellFromByteBuffer(ByteBuffer bytes)
-    {
-        // we're not guaranteed to get a CellName back from fromByteBuffer(), so it's on the caller to guarantee this
-        return (CellName)fromByteBuffer(bytes);
-    }
-
-    public CellName create(Composite prefix, ColumnDefinition column, ByteBuffer collectionElement)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public CellName rowMarker(Composite prefix)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public Composite staticPrefix()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean hasCollections()
-    {
-        return false;
-    }
-
-    public boolean supportCollections()
-    {
-        return false;
-    }
-
-    public ColumnToCollectionType collectionType()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public CellNameType addOrUpdateCollection(ColumnIdentifier columnName, CollectionType newCollection)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Composite make(Object... components)
-    {
-        return components.length == size() ? makeCellName(components) : super.make(components);
-    }
-
-    public CellName makeCellName(Object... components)
-    {
-        ByteBuffer[] rawComponents = new ByteBuffer[components.length];
-        for (int i = 0; i < components.length; i++)
-        {
-            Object c = components[i];
-            if (c instanceof ByteBuffer)
-            {
-                rawComponents[i] = (ByteBuffer)c;
-            }
-            else
-            {
-                AbstractType<?> type = subtype(i);
-                // If it's a collection type, we need to find the right collection and use the key comparator (since we're building a cell name)
-                if (type instanceof ColumnToCollectionType)
-                {
-                    assert i > 0;
-                    type = ((ColumnToCollectionType)type).defined.get(rawComponents[i-1]).nameComparator();
-                }
-                rawComponents[i] = ((AbstractType)type).decompose(c);
-            }
-        }
-        return makeCellName(rawComponents);
-    }
-
-    protected abstract CellName makeCellName(ByteBuffer[] components);
-
-    protected static CQL3Row.Builder makeDenseCQL3RowBuilder(final long now)
-    {
-        return new CQL3Row.Builder()
-        {
-            public CQL3Row.RowIterator group(Iterator<Cell> cells)
-            {
-                return new DenseRowIterator(cells, now);
-            }
-        };
-    }
-
-    private static class DenseRowIterator extends AbstractIterator<CQL3Row> implements CQL3Row.RowIterator
-    {
-        private final Iterator<Cell> cells;
-        private final long now;
-
-        public DenseRowIterator(Iterator<Cell> cells, long now)
-        {
-            this.cells = cells;
-            this.now = now;
-        }
-
-        public CQL3Row getStaticRow()
-        {
-            // There can't be static columns in dense tables
-            return null;
-        }
-
-        protected CQL3Row computeNext()
-        {
-            while (cells.hasNext())
-            {
-                final Cell cell = cells.next();
-                if (!cell.isLive(now))
-                    continue;
-
-                return new CQL3Row()
-                {
-                    public ByteBuffer getClusteringColumn(int i)
-                    {
-                        return cell.name().get(i);
-                    }
-
-                    public Cell getColumn(ColumnIdentifier name)
-                    {
-                        return cell;
-                    }
-
-                    public List<Cell> getMultiCellColumn(ColumnIdentifier name)
-                    {
-                        return null;
-                    }
-                };
-            }
-            return endOfData();
-        }
-    }
-
-    protected static CQL3Row.Builder makeSparseCQL3RowBuilder(final CFMetaData cfMetaData, final CellNameType type, final long now)
-    {
-        return new CQL3Row.Builder()
-        {
-            public CQL3Row.RowIterator group(Iterator<Cell> cells)
-            {
-                return new SparseRowIterator(cfMetaData, type, cells, now);
-            }
-        };
-    }
-
-    private static class SparseRowIterator extends AbstractIterator<CQL3Row> implements CQL3Row.RowIterator
-    {
-        private final CFMetaData cfMetaData;
-        private final CellNameType type;
-        private final Iterator<Cell> cells;
-        private final long now;
-        private final CQL3Row staticRow;
-
-        private Cell nextCell;
-        private CellName previous;
-        private CQL3RowOfSparse currentRow;
-
-        public SparseRowIterator(CFMetaData cfMetaData, CellNameType type, Iterator<Cell> cells, long now)
-        {
-            this.cfMetaData = cfMetaData;
-            this.type = type;
-            this.cells = cells;
-            this.now = now;
-            this.staticRow = hasNextCell() && nextCell.name().isStatic()
-                           ? computeNext()
-                           : null;
-        }
-
-        public CQL3Row getStaticRow()
-        {
-            return staticRow;
-        }
-
-        private boolean hasNextCell()
-        {
-            if (nextCell != null)
-                return true;
-
-            while (cells.hasNext())
-            {
-                Cell cell = cells.next();
-                if (!cell.isLive(now))
-                    continue;
-
-                nextCell = cell;
-                return true;
-            }
-            return false;
-        }
-
-        protected CQL3Row computeNext()
-        {
-            while (hasNextCell())
-            {
-                CQL3Row toReturn = null;
-                CellName current = nextCell.name();
-                if (currentRow == null || !current.isSameCQL3RowAs(type, previous))
-                {
-                    toReturn = currentRow;
-                    currentRow = new CQL3RowOfSparse(cfMetaData, current);
-                }
-                currentRow.add(nextCell);
-                nextCell = null;
-                previous = current;
-
-                if (toReturn != null)
-                    return toReturn;
-            }
-            if (currentRow != null)
-            {
-                CQL3Row toReturn = currentRow;
-                currentRow = null;
-                return toReturn;
-            }
-            return endOfData();
-        }
-    }
-
-    private static class CQL3RowOfSparse implements CQL3Row
-    {
-        private final CFMetaData cfMetaData;
-        private final CellName cell;
-        private Map<ColumnIdentifier, Cell> columns;
-        private Map<ColumnIdentifier, List<Cell>> collections;
-
-        CQL3RowOfSparse(CFMetaData metadata, CellName cell)
-        {
-            this.cfMetaData = metadata;
-            this.cell = cell;
-        }
-
-        public ByteBuffer getClusteringColumn(int i)
-        {
-            return cell.get(i);
-        }
-
-        void add(Cell cell)
-        {
-            CellName cellName = cell.name();
-            ColumnIdentifier columnName =  cellName.cql3ColumnName(cfMetaData);
-            if (cellName.isCollectionCell())
-            {
-                if (collections == null)
-                    collections = new HashMap<>();
-
-                List<Cell> values = collections.get(columnName);
-                if (values == null)
-                {
-                    values = new ArrayList<Cell>();
-                    collections.put(columnName, values);
-                }
-                values.add(cell);
-            }
-            else
-            {
-                if (columns == null)
-                    columns = new HashMap<>();
-                columns.put(columnName, cell);
-            }
-        }
-
-        public Cell getColumn(ColumnIdentifier name)
-        {
-            return columns == null ? null : columns.get(name);
-        }
-
-        public List<Cell> getMultiCellColumn(ColumnIdentifier name)
-        {
-            return collections == null ? null : collections.get(name);
-        }
-    }
-}


Mime
View raw message