cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [3/4] Add support for range tombstones
Date Fri, 18 May 2012 18:35:10 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 402deaa..9680c95 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -32,8 +32,6 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.IntervalTree.Interval;
-import org.apache.cassandra.utils.IntervalTree.IntervalTree;
 import org.apache.cassandra.utils.Throttle;
 
 /**
@@ -44,7 +42,7 @@ public class CompactionController
     private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
 
     private final ColumnFamilyStore cfs;
-    private final IntervalTree<SSTableReader> overlappingTree;
+    private final DataTracker.SSTableIntervalTree overlappingTree;
 
     public final int gcBefore;
     public final int mergeShardBefore;
@@ -93,7 +91,7 @@ public class CompactionController
      */
     public boolean shouldPurge(DecoratedKey key)
     {
-        List<SSTableReader> filteredSSTables = overlappingTree.search(new Interval(key, key));
+        List<SSTableReader> filteredSSTables = overlappingTree.search(key);
         for (SSTableReader sstable : filteredSSTables)
         {
             if (sstable.getBloomFilter().isPresent(key.key))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index e37480c..2b168ec 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -37,9 +37,9 @@ public class CompactionIterable extends AbstractCompactionIterable
 
     private long row;
 
-    private static final Comparator<IColumnIterator> comparator = new Comparator<IColumnIterator>()
+    private static final Comparator<OnDiskAtomIterator> comparator = new Comparator<OnDiskAtomIterator>()
     {
-        public int compare(IColumnIterator i1, IColumnIterator i2)
+        public int compare(OnDiskAtomIterator i1, OnDiskAtomIterator i2)
         {
             return i1.getKey().compareTo(i2.getKey());
         }
@@ -61,11 +61,11 @@ public class CompactionIterable extends AbstractCompactionIterable
         return this.getCompactionInfo().toString();
     }
 
-    protected class Reducer extends MergeIterator.Reducer<IColumnIterator, AbstractCompactedRow>
+    protected class Reducer extends MergeIterator.Reducer<OnDiskAtomIterator, AbstractCompactedRow>
     {
         protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
 
-        public void reduce(IColumnIterator current)
+        public void reduce(OnDiskAtomIterator current)
         {
             rows.add((SSTableIdentityIterator) current);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ebe1254..16d90d5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -482,7 +482,7 @@ public class CompactionManager implements CompactionManagerMBean
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
             {
                 // throw away variable so we don't have a side effect in the assert
-                long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor).position;
+                long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position;
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
@@ -502,7 +502,7 @@ public class CompactionManager implements CompactionManagerMBean
                 try
                 {
                     key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
-                    dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+                    dataSize = sstable.descriptor.version.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
                     if (logger.isDebugEnabled())
                         logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
                 }
@@ -519,7 +519,7 @@ public class CompactionManager implements CompactionManagerMBean
                     nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
                     nextRowPositionFromIndex = indexFile.isEOF()
                                              ? dataFile.length()
-                                             : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor).position;
+                                             : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position;
                 }
                 catch (Throwable th)
                 {
@@ -531,7 +531,7 @@ public class CompactionManager implements CompactionManagerMBean
                 long dataStart = dataFile.getFilePointer();
                 long dataStartFromIndex = currentIndexKey == null
                                         ? -1
-                                        : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+                                        : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.version.hasIntRowSize ? 4 : 8);
                 long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
                 assert currentIndexKey != null || indexFile.isEOF();
                 if (logger.isDebugEnabled() && currentIndexKey != null)
@@ -733,15 +733,15 @@ public class CompactionManager implements CompactionManagerMBean
 
                             while (row.hasNext())
                             {
-                                IColumn column = row.next();
+                                OnDiskAtom column = row.next();
                                 if (column instanceof CounterColumn)
                                     renewer.maybeRenew((CounterColumn) column);
-                                if (indexedColumns.contains(column.name()))
+                                if (column instanceof IColumn && indexedColumns.contains(column.name()))
                                 {
                                     if (indexedColumnsInRow == null)
                                         indexedColumnsInRow = new ArrayList<IColumn>();
 
-                                    indexedColumnsInRow.add(column);
+                                    indexedColumnsInRow.add((IColumn)column);
                                 }
                             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
index 0b1a263..ebee3ed 100644
--- a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
+++ b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
@@ -19,14 +19,14 @@
 
 package org.apache.cassandra.db.compaction;
 
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.utils.CloseableIterator;
 
 /**
  * An ICompactionScanner is an abstraction allowing multiple SSTableScanners to be
  * chained together under the hood.  See LeveledCompactionStrategy.getScanners.
  */
-public interface ICompactionScanner extends CloseableIterator<IColumnIterator>
+public interface ICompactionScanner extends CloseableIterator<OnDiskAtomIterator>
 {
     public long getLengthInBytes();
     public long getCurrentPosition();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 2f1f47f..122326e 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -33,12 +33,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.MergeIterator;
 
 /**
@@ -52,7 +50,7 @@ import org.apache.cassandra.utils.MergeIterator;
  * rows to write the merged columns or update the hash, again with at most one column
  * from each row deserialized at a time.
  */
-public class LazilyCompactedRow extends AbstractCompactedRow implements IIterableColumns
+public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable<OnDiskAtom>
 {
     private static Logger logger = LoggerFactory.getLogger(LazilyCompactedRow.class);
 
@@ -64,7 +62,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
     private final ColumnStats columnStats;
     private long columnSerializedSize;
     private boolean closed;
-    private final ColumnIndex columnsIndex;
+    private ColumnIndex.Builder indexBuilder;
+    private ColumnIndex columnsIndex;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends ICountableColumnIterator> rows)
     {
@@ -73,7 +72,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
         this.controller = controller;
         this.shouldPurge = controller.shouldPurge(key);
 
-        for (IColumnIterator row : rows)
+        for (OnDiskAtomIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
 
@@ -83,7 +82,15 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
                 emptyColumnFamily.delete(cf);
         }
 
-        this.columnsIndex = new ColumnIndex.Builder(emptyColumnFamily.getComparator(), key.key, getEstimatedColumnCount()).build(this);
+        try
+        {
+            indexAndWrite(null);
+        }
+        catch (IOException e)
+        {
+            // Since we don't write on disk this time, we should get this
+            throw new AssertionError();
+        }
         // reach into the reducer used during iteration to get column count, size, max column timestamp
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen,
@@ -93,12 +100,18 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
         reducer = null;
     }
 
+    private void indexAndWrite(DataOutput out) throws IOException
+    {
+        this.indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, getEstimatedColumnCount(), out);
+        this.columnsIndex = indexBuilder.build(this);
+    }
+
     public long write(DataOutput out) throws IOException
     {
         assert !closed;
 
         DataOutputBuffer clockOut = new DataOutputBuffer();
-        ColumnFamily.serializer.serializeCFInfo(emptyColumnFamily, clockOut);
+        DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), clockOut);
 
         long dataSize = clockOut.getLength() + columnSerializedSize;
         if (logger.isDebugEnabled())
@@ -106,15 +119,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
         assert dataSize > 0;
         out.writeLong(dataSize);
         out.write(clockOut.getData(), 0, clockOut.getLength());
-        out.writeInt(columnStats.columnCount);
+        out.writeInt(indexBuilder.writtenAtomCount());
+
+        // We rebuild the column index uselessly, but we need to do that because range tombstone markers depend
+        // on indexing. If we're able to remove the two-phase compaction, we'll avoid that.
+        indexAndWrite(out);
 
-        IColumnSerializer columnSerializer = emptyColumnFamily.getColumnSerializer();
-        Iterator<IColumn> iter = iterator();
-        while (iter.hasNext())
-        {
-            IColumn column = iter.next();
-            columnSerializer.serialize(column, out);
-        }
         long secondPassColumnSize = reducer == null ? 0 : reducer.serializedSize;
         assert secondPassColumnSize == columnSerializedSize
                : "originally calculated column size of " + columnSerializedSize + " but now it is " + secondPassColumnSize;
@@ -133,7 +143,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
 
         try
         {
-            ColumnFamily.serializer.serializeCFInfo(emptyColumnFamily, out);
+            DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), out);
             out.writeInt(columnStats.columnCount);
             digest.update(out.getData(), 0, out.getLength());
         }
@@ -142,7 +152,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
             throw new IOError(e);
         }
 
-        Iterator<IColumn> iter = iterator();
+        Iterator<OnDiskAtom> iter = iterator();
         while (iter.hasNext())
         {
             iter.next().updateDigest(digest);
@@ -171,12 +181,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
         return emptyColumnFamily.getComparator();
     }
 
-    public Iterator<IColumn> iterator()
+    public Iterator<OnDiskAtom> iterator()
     {
         for (ICountableColumnIterator row : rows)
             row.reset();
         reducer = new Reducer();
-        Iterator<IColumn> iter = MergeIterator.get(rows, getComparator().columnComparator, reducer);
+        Iterator<OnDiskAtom> iter = MergeIterator.get(rows, getComparator().onDiskAtomComparator, reducer);
         return Iterators.filter(iter, Predicates.notNull());
     }
 
@@ -187,7 +197,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
 
     private void close()
     {
-        for (IColumnIterator row : rows)
+        for (OnDiskAtomIterator row : rows)
         {
             try
             {
@@ -214,39 +224,67 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
         return columnsIndex;
     }
 
-    private class Reducer extends MergeIterator.Reducer<IColumn, IColumn>
+    private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
     {
         ColumnFamily container = emptyColumnFamily.cloneMeShallow();
+        RangeTombstone tombstone;
         long serializedSize = 4; // int for column count
         int columns = 0;
         long maxTimestampSeen = Long.MIN_VALUE;
         StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
 
-        public void reduce(IColumn current)
+        public void reduce(OnDiskAtom current)
         {
-            container.addColumn(current);
+            if (current instanceof RangeTombstone)
+                tombstone = (RangeTombstone)current;
+            else
+                container.addColumn((IColumn)current);
         }
 
-        protected IColumn getReduced()
+        protected OnDiskAtom getReduced()
         {
-            ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
-            if (purged == null || !purged.iterator().hasNext())
+            if (tombstone != null)
             {
-                container.clear();
-                return null;
+                RangeTombstone t = tombstone;
+                tombstone = null;
+
+                if (t.data.isGcAble(controller.gcBefore))
+                {
+                    return null;
+                }
+                else
+                {
+                    serializedSize += t.serializedSizeForSSTable();
+                    return t;
+                }
             }
-            IColumn reduced = purged.iterator().next();
-            container.clear();
-
-            serializedSize += reduced.serializedSize(TypeSizes.NATIVE);
-            columns++;
-            maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
-            int deletionTime = reduced.getLocalDeletionTime();
-            if (deletionTime < Integer.MAX_VALUE)
+            else
             {
-                tombstones.update(deletionTime);
+                ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
+                if (purged == null || !purged.iterator().hasNext())
+                {
+                    container.clear();
+                    return null;
+                }
+                IColumn reduced = purged.iterator().next();
+                container.clear();
+
+                // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
+                // not the range tombstone. For that we use the columnIndexer tombstone tracker.
+                // Note that this doesn't work for super columns.
+                if (indexBuilder.tombstoneTracker().isDeleted(reduced))
+                    return null;
+
+                serializedSize += reduced.serializedSizeForSSTable();
+                columns++;
+                maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
+                int deletionTime = reduced.getLocalDeletionTime();
+                if (deletionTime < Integer.MAX_VALUE)
+                {
+                    tombstones.update(deletionTime);
+                }
+                return reduced;
             }
-            return reduced;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 454a1bb..3e124de 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -199,7 +199,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
 
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
     // same level (e.g. non overlapping) - see #4142
-    private static class LeveledScanner extends AbstractIterator<IColumnIterator> implements ICompactionScanner
+    private static class LeveledScanner extends AbstractIterator<OnDiskAtomIterator> implements ICompactionScanner
     {
         private final Range<Token> range;
         private final List<SSTableReader> sstables;
@@ -222,7 +222,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
             totalLength = length;
         }
 
-        protected IColumnIterator computeNext()
+        protected OnDiskAtomIterator computeNext()
         {
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index e3aa69e..2222391 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -257,7 +257,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 return iter.hasNext();
             }
 
-            public IColumn next()
+            public OnDiskAtom next()
             {
                 return iter.next();
             }
@@ -377,7 +377,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
             return wrapped.hasNext();
         }
 
-        public IColumn next()
+        public OnDiskAtom next()
         {
             return wrapped.next();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 34451ec..b7c6883 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -27,12 +27,7 @@ import org.apache.cassandra.io.sstable.ColumnStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ColumnIndex;
-import org.apache.cassandra.db.CounterColumn;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.HeapAllocator;
@@ -45,6 +40,7 @@ public class PrecompactedRow extends AbstractCompactedRow
     private static final Logger logger = LoggerFactory.getLogger(PrecompactedRow.class);
 
     private final ColumnFamily compactedCf;
+    private ColumnIndex columnIndex;
 
     /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
     public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
@@ -132,9 +128,15 @@ public class PrecompactedRow extends AbstractCompactedRow
     {
         assert compactedCf != null;
         DataOutputBuffer buffer = new DataOutputBuffer();
-        ColumnFamily.serializer.serializeForSSTable(compactedCf, buffer);
-        int dataSize = buffer.getLength();
-        out.writeLong(buffer.getLength());
+        ColumnIndex.Builder builder = new ColumnIndex.Builder(compactedCf, key.key, compactedCf.getColumnCount(), buffer);
+        columnIndex = builder.build(compactedCf);
+
+        TypeSizes typeSizes = TypeSizes.NATIVE;
+        long delSize = DeletionTime.serializer.serializedSize(compactedCf.deletionInfo().getTopLevelDeletion(), typeSizes);
+        long dataSize = buffer.getLength() + delSize + typeSizes.sizeof(0);
+        out.writeLong(dataSize);
+        DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), out);
+        out.writeInt(builder.writtenAtomCount());
         out.write(buffer.getData(), 0, buffer.getLength());
         return dataSize;
     }
@@ -145,7 +147,7 @@ public class PrecompactedRow extends AbstractCompactedRow
         DataOutputBuffer buffer = new DataOutputBuffer();
         try
         {
-            ColumnFamily.serializer.serializeCFInfo(compactedCf, buffer);
+            DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), buffer);
             buffer.writeInt(compactedCf.getColumnCount());
             digest.update(buffer.getData(), 0, buffer.getLength());
         }
@@ -187,6 +189,6 @@ public class PrecompactedRow extends AbstractCompactedRow
      */
     public ColumnIndex index()
     {
-        return new ColumnIndex.Builder(compactedCf.getComparator(), key.key, compactedCf.getColumnCount()).build(compactedCf);
+        return columnIndex;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/filter/AbstractColumnIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/AbstractColumnIterator.java b/src/java/org/apache/cassandra/db/filter/AbstractColumnIterator.java
index 4392adb..19c64e0 100644
--- a/src/java/org/apache/cassandra/db/filter/AbstractColumnIterator.java
+++ b/src/java/org/apache/cassandra/db/filter/AbstractColumnIterator.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.db.filter;
 
 import java.io.IOException;
 
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 
-public abstract class AbstractColumnIterator implements IColumnIterator
+public abstract class AbstractColumnIterator implements OnDiskAtomIterator
 {
     public void close() throws IOException
     {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index f578c34..58e800f 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -250,8 +250,8 @@ public abstract class ExtendedFilter
             if (initialFilter == originalFilter)
                 return data;
             ColumnFamily pruned = data.cloneMeShallow();
-            IColumnIterator iter = originalFilter.getMemtableColumnIterator(data, null);
-            originalFilter.collectReducedColumns(pruned, iter, cfs.gcBefore());
+            OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
+            originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore());
             return pruned;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/filter/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IFilter.java b/src/java/org/apache/cassandra/db/filter/IFilter.java
index 7c1ffb1..06e6489 100644
--- a/src/java/org/apache/cassandra/db/filter/IFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IFilter.java
@@ -19,10 +19,8 @@ package org.apache.cassandra.db.filter;
 
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
-
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
@@ -40,7 +38,7 @@ public interface IFilter
      * returns an iterator that returns columns from the given memtable
      * matching the Filter criteria in sorted order.
      */
-    public abstract IColumnIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key);
+    public abstract OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key);
 
     /**
      * Get an iterator that returns columns from the given SSTable using the opened file
@@ -49,13 +47,13 @@ public interface IFilter
      * @param file Already opened file data input, saves us opening another one
      * @param key The key of the row we are about to iterate over
      */
-    public abstract IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry);
+    public abstract OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry);
 
     /**
      * returns an iterator that returns columns from the given SSTable
      * matching the Filter criteria in sorted order.
      */
-    public abstract IColumnIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key);
+    public abstract OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key);
 
     /**
      * collects columns from reducedColumns into returnCF.  Termination is determined

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index a2c995f..f376caa 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -20,13 +20,12 @@ package org.apache.cassandra.db.filter;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 import java.util.SortedSet;
 
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.SSTableNamesIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -47,17 +46,17 @@ public class NamesQueryFilter implements IFilter
         this(FBUtilities.singleton(column));
     }
 
-    public IColumnIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
     {
         return Memtable.getNamesIterator(key, cf, this);
     }
 
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
+    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
     {
         return new SSTableNamesIterator(sstable, key, columns);
     }
 
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
+    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
         return new SSTableNamesIterator(sstable, file, key, columns, indexEntry);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 1c923a4..b81ea69 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.filter;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
@@ -24,7 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -51,7 +52,7 @@ public class QueryFilter
         superFilter = path.superColumnName == null ? null : new NamesQueryFilter(path.superColumnName);
     }
 
-    public IColumnIterator getMemtableColumnIterator(Memtable memtable)
+    public OnDiskAtomIterator getMemtableColumnIterator(Memtable memtable)
     {
         ColumnFamily cf = memtable.getColumnFamily(key);
         if (cf == null)
@@ -59,7 +60,7 @@ public class QueryFilter
         return getMemtableColumnIterator(cf, key);
     }
 
-    public IColumnIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
     {
         assert cf != null;
         if (path.superColumnName == null)
@@ -68,24 +69,33 @@ public class QueryFilter
     }
 
     // TODO move gcBefore into a field
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable)
+    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable)
     {
         if (path.superColumnName == null)
             return filter.getSSTableColumnIterator(sstable, key);
         return superFilter.getSSTableColumnIterator(sstable, key);
     }
 
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
+    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
         if (path.superColumnName == null)
             return filter.getSSTableColumnIterator(sstable, file, key, indexEntry);
         return superFilter.getSSTableColumnIterator(sstable, file, key, indexEntry);
     }
 
+    public void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends CloseableIterator<OnDiskAtom>> toCollate, final int gcBefore)
+    {
+        List<CloseableIterator<IColumn>> filteredIterators = new ArrayList<CloseableIterator<IColumn>>(toCollate.size());
+        for (CloseableIterator<OnDiskAtom> iter : toCollate)
+            filteredIterators.add(gatherTombstones(returnCF, iter));
+        collateColumns(returnCF, filteredIterators, gcBefore);
+    }
+
     // TODO move gcBefore into a field
     public void collateColumns(final ColumnFamily returnCF, List<? extends CloseableIterator<IColumn>> toCollate, final int gcBefore)
     {
         IFilter topLevelFilter = (superFilter == null ? filter : superFilter);
+
         Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(returnCF.getComparator());
         // define a 'reduced' iterator that merges columns w/ the same name, which
         // greatly simplifies computing liveColumns in the presence of tombstones.
@@ -118,12 +128,10 @@ public class QueryFilter
                     // filterSuperColumn only looks at immediate parent (the supercolumn) when determining if a subcolumn
                     // is still live, i.e., not shadowed by the parent's tombstone.  so, bump it up temporarily to the tombstone
                     // time of the cf, if that is greater.
-                    long deletedAt = c.getMarkedForDeleteAt();
-                    if (returnCF.getMarkedForDeleteAt() > deletedAt)
-                        ((SuperColumn) c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
-
+                    DeletionInfo delInfo = ((SuperColumn) c).deletionInfo();
+                    ((SuperColumn) c).delete(returnCF.deletionInfo());
                     c = filter.filterSuperColumn((SuperColumn) c, gcBefore);
-                    ((SuperColumn) c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
+                    ((SuperColumn) c).setDeletionInfo(delInfo); // reset sc tombstone time to what it should be
                 }
                 curCF.clear();
 
@@ -135,6 +143,66 @@ public class QueryFilter
         topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore);
     }
 
+    /**
+     * Given an iterator of on disk atom, returns an iterator that filters the tombstone range
+     * markers adding them to {@code returnCF} and returns the normal column.
+     */
+    public static CloseableIterator<IColumn> gatherTombstones(final ColumnFamily returnCF, final CloseableIterator<OnDiskAtom> iter)
+    {
+        return new CloseableIterator<IColumn>()
+        {
+            private IColumn next;
+
+            public boolean hasNext()
+            {
+                if (next != null)
+                    return true;
+
+                getNext();
+                return next != null;
+            }
+
+            public IColumn next()
+            {
+                if (next == null)
+                    getNext();
+
+                assert next != null;
+                IColumn toReturn = next;
+                next = null;
+                return toReturn;
+            }
+
+            private void getNext()
+            {
+                while (iter.hasNext())
+                {
+                    OnDiskAtom atom = iter.next();
+
+                    if (atom instanceof IColumn)
+                    {
+                        next = (IColumn)atom;
+                        break;
+                    }
+                    else
+                    {
+                        returnCF.addAtom(atom);
+                    }
+                }
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void close() throws IOException
+            {
+                iter.close();
+            }
+        };
+    }
+
     public String getColumnFamilyName()
     {
         return path.columnFamilyName;
@@ -147,7 +215,7 @@ public class QueryFilter
         // (since otherwise, the only thing repair cares about is the container tombstone)
         long maxChange = column.mostRecentNonGCableChangeAt(gcBefore);
         return (column.getLocalDeletionTime() >= gcBefore || maxChange > column.getMarkedForDeleteAt()) // (1)
-               && (!container.isMarkedForDelete() || maxChange > container.getMarkedForDeleteAt()); // (2)
+               && (!container.deletionInfo().isDeleted(column.name(), maxChange)); // (2)
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 9901130..5517aa0 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.SSTableSliceIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -52,17 +52,17 @@ public class SliceQueryFilter implements IFilter
         this.count = count;
     }
 
-    public IColumnIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
     {
         return Memtable.getSliceIterator(key, cf, this);
     }
 
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
+    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
     {
         return new SSTableSliceIterator(sstable, key, start, finish, reversed);
     }
 
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
+    public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
         return new SSTableSliceIterator(sstable, file, key, start, finish, reversed, indexEntry);
     }
@@ -126,8 +126,7 @@ public class SliceQueryFilter implements IFilter
 
             // only count live columns towards the `count` criteria
             if (column.isLive()
-                && (!container.isMarkedForDelete()
-                    || column.mostRecentLiveChangeAt() > container.getMarkedForDeleteAt()))
+                && (!container.deletionInfo().isDeleted(column)))
             {
                 liveColumns++;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 654d756..ab7ced3 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -24,6 +24,8 @@ import java.util.Map;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
 import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 
 /**
@@ -40,6 +42,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     public final Comparator<IndexInfo> indexReverseComparator;
     public final Comparator<IColumn> columnComparator;
     public final Comparator<IColumn> columnReverseComparator;
+    public final Comparator<OnDiskAtom> onDiskAtomComparator;
     public final Comparator<ByteBuffer> reverseComparator;
 
     protected AbstractType()
@@ -72,6 +75,41 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
                 return AbstractType.this.compare(c2.name(), c1.name());
             }
         };
+        onDiskAtomComparator = new Comparator<OnDiskAtom>()
+        {
+            public int compare(OnDiskAtom c1, OnDiskAtom c2)
+            {
+                int comp = AbstractType.this.compare(c1.name(), c2.name());
+                if (comp != 0)
+                    return comp;
+
+                if (c1 instanceof RangeTombstone)
+                {
+                    if (c2 instanceof RangeTombstone)
+                    {
+                        RangeTombstone t1 = (RangeTombstone)c1;
+                        RangeTombstone t2 = (RangeTombstone)c2;
+                        int comp2 = AbstractType.this.compare(t1.max, t1.max);
+                        if (comp2 == 0)
+                            return t1.data.compareTo(t2.data);
+                        else
+                            return comp2;
+                    }
+                    else
+                    {
+                        return -1;
+                    }
+                }
+                else if (c2 instanceof RangeTombstone)
+                {
+                    return 1;
+                }
+                else
+                {
+                    return 0;
+                }
+            }
+        };
         reverseComparator = new Comparator<ByteBuffer>()
         {
             public int compare(ByteBuffer o1, ByteBuffer o2)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/IColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/IColumnSerializer.java b/src/java/org/apache/cassandra/io/IColumnSerializer.java
index 6063b66..5802140 100644
--- a/src/java/org/apache/cassandra/io/IColumnSerializer.java
+++ b/src/java/org/apache/cassandra/io/IColumnSerializer.java
@@ -42,3 +42,4 @@ public interface IColumnSerializer extends ISerializer<IColumn>
 
     public IColumn deserialize(DataInput in, Flag flag, int expireBefore) throws IOException;
 }
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/ISSTableSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ISSTableSerializer.java b/src/java/org/apache/cassandra/io/ISSTableSerializer.java
new file mode 100644
index 0000000..c2cefc6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/ISSTableSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+public interface ISSTableSerializer<T>
+{
+    /**
+     * Serialize the specified type into the specified DataOutputStream
+     * instance in the format suited for SSTables.
+     * @param t type that needs to be serialized
+     * @param dos DataOutput into which serialization needs to happen.
+     * @throws java.io.IOException
+     */
+    public void serializeForSSTable(T t, DataOutput dos) throws IOException;
+
+    /**
+     * Deserialize into the specified DataInputStream instance in the format
+     * suited for SSTables.
+     * @param dis DataInput from which deserialization needs to happen.
+     * @param sstableVersion the version for the sstable we're reading from
+     * @throws IOException
+     * @return the type that was deserialized
+     */
+    public T deserializeFromSSTable(DataInput dis, Descriptor.Version version) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index d1bb7b6..31ac072 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -44,53 +44,143 @@ public class Descriptor
     // we always incremented the major version.  In particular, versions g and h are
     // forwards-compatible with version f, so if the above convention had been followed,
     // we would have labeled them fb and fc.
-    public static final String LEGACY_VERSION = "a"; // "pre-history"
-    // b (0.7.0): added version to sstable filenames
-    // c (0.7.0): bloom filter component computes hashes over raw key bytes instead of strings
-    // d (0.7.0): row size in data component becomes a long instead of int
-    // e (0.7.0): stores undecorated keys in data and index components
-    // f (0.7.0): switched bloom filter implementations in data component
-    // g (0.8): tracks flushed-at context in metadata component
-    // h (1.0): tracks max client timestamp in metadata component
-    // hb (1.0.3): records compression ration in metadata component
-    // hc (1.0.4): records partitioner in metadata component
-    // hd (1.0.10): includes row tombstones in maxtimestamp
-    // ia (1.2.0): column indexes are promoted to the index file
-    //             records estimated histogram of deletion times in tombstones
-    //             bloom filter (keys and columns) upgraded to Murmur3
-    public static final String CURRENT_VERSION = "ia";
+    public static class Version
+    {
+        // This needs to be at the begining for initialization sake
+        private static final String current_version = "ia";
+
+        public static final Version LEGACY = new Version("a"); // "pre-history"
+        // b (0.7.0): added version to sstable filenames
+        // c (0.7.0): bloom filter component computes hashes over raw key bytes instead of strings
+        // d (0.7.0): row size in data component becomes a long instead of int
+        // e (0.7.0): stores undecorated keys in data and index components
+        // f (0.7.0): switched bloom filter implementations in data component
+        // g (0.8): tracks flushed-at context in metadata component
+        // h (1.0): tracks max client timestamp in metadata component
+        // hb (1.0.3): records compression ration in metadata component
+        // hc (1.0.4): records partitioner in metadata component
+        // hd (1.0.10): includes row tombstones in maxtimestamp
+        // ia (1.2.0): column indexes are promoted to the index file
+        //             records estimated histogram of deletion times in tombstones
+        //             bloom filter (keys and columns) upgraded to Murmur3
+
+        public static final Version CURRENT = new Version(current_version);
+
+        private final String version;
+
+        public final boolean hasStringsInBloomFilter;
+        public final boolean hasIntRowSize;
+        public final boolean hasEncodedKeys;
+        public final boolean isLatestVersion;
+        public final boolean metadataIncludesReplayPosition;
+        public final boolean tracksMaxTimestamp;
+        public final boolean hasCompressionRatio;
+        public final boolean hasPartitioner;
+        public final boolean tracksTombstones;
+        public final boolean hasPromotedIndexes;
+        public final FilterFactory.Type filterType;
+
+        public Version(String version)
+        {
+            this.version = version;
+            hasStringsInBloomFilter = version.compareTo("c") < 0;
+            hasIntRowSize = version.compareTo("d") < 0;
+            hasEncodedKeys = version.compareTo("e") < 0;
+            metadataIncludesReplayPosition = version.compareTo("g") >= 0;
+            tracksMaxTimestamp = version.compareTo("hd") >= 0;
+            hasCompressionRatio = version.compareTo("hb") >= 0;
+            hasPartitioner = version.compareTo("hc") >= 0;
+            tracksTombstones = version.compareTo("ia") >= 0;
+            hasPromotedIndexes = version.compareTo("ia") >= 0;
+            isLatestVersion = version.compareTo(current_version) == 0;
+            if (version.compareTo("f") < 0)
+                filterType = FilterFactory.Type.SHA;
+            else if (version.compareTo("ia") < 0)
+                filterType = FilterFactory.Type.MURMUR2;
+            else
+                filterType = FilterFactory.Type.MURMUR3;
+        }
+
+        /**
+         * @param ver SSTable version
+         * @return True if the given version string matches the format.
+         * @see #version
+         */
+        static boolean validate(String ver)
+        {
+            return ver != null && ver.matches("[a-z]+");
+        }
+
+        public boolean isCompatible()
+        {
+            return version.charAt(0) <= CURRENT.version.charAt(0);
+        }
+
+        public boolean isStreamCompatible()
+        {
+            // we could add compatibility for earlier versions with the new single-pass streaming
+            // (see SSTableWriter.appendFromStream) but versions earlier than 0.7.1 don't have the
+            // MessagingService version awareness anyway so there's no point.
+            return isCompatible() && version.charAt(0) >= 'i';
+        }
+
+        /**
+         * Versions [h..hc] contained a timestamp value that was computed incorrectly, ignoring row tombstones.
+         * containsTimestamp returns true if there is a timestamp value in the metadata file; to know if it
+         * actually contains a *correct* timestamp, see tracksMaxTimestamp.
+         */
+        public boolean containsTimestamp()
+        {
+            return version.compareTo("h") >= 0;
+        }
+
+        @Override
+        public String toString()
+        {
+            return version;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (o == this)
+                return true;
+            if (!(o instanceof Version))
+                return false;
+            return version.equals(((Version)o).version);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return version.hashCode();
+        }
+    }
 
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
-    public final String version;
+    public final Version version;
     public final String ksname;
     public final String cfname;
     public final int generation;
     public final boolean temporary;
     private final int hashCode;
 
-    public final boolean hasStringsInBloomFilter;
-    public final boolean hasIntRowSize;
-    public final boolean hasEncodedKeys;
-    public final boolean isLatestVersion;
-    public final boolean metadataIncludesReplayPosition;
-    public final boolean tracksMaxTimestamp;
-    public final boolean hasCompressionRatio;
-    public final boolean hasPartitioner;
-    public final boolean tracksTombstones;
-    public final boolean hasPromotedIndexes;
-    public final FilterFactory.Type filterType;
-
     /**
      * A descriptor that assumes CURRENT_VERSION.
      */
     public Descriptor(File directory, String ksname, String cfname, int generation, boolean temp)
     {
-        this(CURRENT_VERSION, directory, ksname, cfname, generation, temp);
+        this(Version.CURRENT, directory, ksname, cfname, generation, temp);
     }
 
     public Descriptor(String version, File directory, String ksname, String cfname, int generation, boolean temp)
     {
+        this(new Version(version), directory, ksname, cfname, generation, temp);
+    }
+
+    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, boolean temp)
+    {
         assert version != null && directory != null && ksname != null && cfname != null;
         this.version = version;
         this.directory = directory;
@@ -99,23 +189,6 @@ public class Descriptor
         this.generation = generation;
         temporary = temp;
         hashCode = Objects.hashCode(directory, generation, ksname, cfname);
-
-        hasStringsInBloomFilter = version.compareTo("c") < 0;
-        hasIntRowSize = version.compareTo("d") < 0;
-        hasEncodedKeys = version.compareTo("e") < 0;
-        metadataIncludesReplayPosition = version.compareTo("g") >= 0;
-        tracksMaxTimestamp = version.compareTo("hd") >= 0;
-        hasCompressionRatio = version.compareTo("hb") >= 0;
-        hasPartitioner = version.compareTo("hc") >= 0;
-        tracksTombstones = version.compareTo("ia") >= 0;
-        hasPromotedIndexes = version.compareTo("ia") >= 0;
-        isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
-        if (version.compareTo("f") < 0)
-            filterType = FilterFactory.Type.SHA;
-        else if (version.compareTo("ia") < 0)
-            filterType = FilterFactory.Type.MURMUR2;
-        else
-            filterType = FilterFactory.Type.MURMUR3;
     }
 
     public String filenameFor(Component component)
@@ -131,7 +204,7 @@ public class Descriptor
         buff.append(cfname).append(separator);
         if (temporary)
             buff.append(SSTable.TEMPFILE_MARKER).append(separator);
-        if (!LEGACY_VERSION.equals(version))
+        if (!Version.LEGACY.equals(version))
             buff.append(version).append(separator);
         buff.append(generation);
         return buff.toString();
@@ -185,10 +258,10 @@ public class Descriptor
         }
 
         // optional version string
-        String version = LEGACY_VERSION;
-        if (versionValidate(nexttok))
+        Version version = Version.LEGACY;
+        if (Version.validate(nexttok))
         {
-            version = nexttok;
+            version = new Version(nexttok);
             nexttok = st.nextToken();
         }
         int generation = Integer.parseInt(nexttok);
@@ -209,21 +282,11 @@ public class Descriptor
     }
 
     /**
-     * @param ver SSTable version
-     * @return True if the given version string matches the format.
-     * @see #version
-     */
-    static boolean versionValidate(String ver)
-    {
-        return ver != null && ver.matches("[a-z]+");
-    }
-
-    /**
      * @return true if the current Cassandra version can read the given sstable version
      */
     public boolean isCompatible()
     {
-        return version.charAt(0) <= CURRENT_VERSION.charAt(0);
+        return version.isCompatible();
     }
 
     /**
@@ -234,20 +297,7 @@ public class Descriptor
      */
     public boolean isStreamCompatible()
     {
-        // we could add compatibility for earlier versions with the new single-pass streaming
-        // (see SSTableWriter.appendFromStream) but versions earlier than 0.7.1 don't have the
-        // MessagingService version awareness anyway so there's no point.
-        return isCompatible() && version.charAt(0) >= 'i';
-    }
-
-    /**
-     * Versions [h..hc] contained a timestamp value that was computed incorrectly, ignoring row tombstones.
-     * containsTimestamp returns true if there is a timestamp value in the metadata file; to know if it
-     * actually contains a *correct* timestamp, see tracksMaxTimestamp.
-     */
-    public boolean containsTimestamp()
-    {
-        return version.compareTo("h") >= 0;
+        return version.isStreamCompatible();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 46a454f..e0cb0f3 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -55,7 +55,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
             if (in.isEOF())
                 return endOfData();
             DecoratedKey key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(in));
-            RowIndexEntry.serializer.skip(in, desc); // skip remainder of the entry
+            RowIndexEntry.serializer.skip(in, desc.version); // skip remainder of the entry
             return key;
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index a5148eb..ad1a104 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -223,7 +223,7 @@ public abstract class SSTable
         while (ifile.getFilePointer() < BYTES_CAP && keys < SAMPLES_CAP)
         {
             ByteBufferUtil.skipShortLength(ifile);
-            RowIndexEntry.serializer.skip(ifile, descriptor);
+            RowIndexEntry.serializer.skip(ifile, descriptor.version);
             keys++;
         }
         assert keys > 0 && ifile.getFilePointer() > 0 && ifile.length() > 0 : "Unexpected empty index file: " + ifile;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index 22cce77..36886db 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -22,7 +22,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -73,15 +73,15 @@ public class SSTableBoundedScanner extends SSTableScanner
     public boolean hasNext()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new BoundedKeyScanningIterator();
+            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : new BoundedKeyScanningIterator();
         return iterator.hasNext();
     }
 
     @Override
-    public IColumnIterator next()
+    public OnDiskAtomIterator next()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new BoundedKeyScanningIterator();
+            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : new BoundedKeyScanningIterator();
         return iterator.next();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index a7574ba..67f95d7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -44,6 +44,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
     private final int columnCount;
     private final long columnPosition;
 
+    private final OnDiskAtom.Serializer atomSerializer;
+    private final Descriptor.Version dataVersion;
+
     private final BytesReadTracker inputWithTracker; // tracks bytes read
 
     // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
@@ -102,6 +105,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         this.expireBefore = (int)(System.currentTimeMillis() / 1000);
         this.flag = flag;
         this.validateColumns = checkData;
+        this.dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version;
 
         try
         {
@@ -112,11 +116,11 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
                 if (dataStart + dataSize > file.length())
                     throw new IOException(String.format("dataSize of %s starting at %s would be larger than file %s length %s",
                                           dataSize, dataStart, file.getPath(), file.length()));
-                if (checkData && !sstable.descriptor.hasPromotedIndexes)
+                if (checkData && !sstable.descriptor.version.hasPromotedIndexes)
                 {
                     try
                     {
-                        IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.filterType);
+                        IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.version.filterType);
                     }
                     catch (Exception e)
                     {
@@ -139,13 +143,14 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
                 }
             }
 
-            if (sstable != null && !sstable.descriptor.hasPromotedIndexes)
+            if (sstable != null && !sstable.descriptor.version.hasPromotedIndexes)
             {
                 IndexHelper.skipBloomFilter(inputWithTracker);
                 IndexHelper.skipIndex(inputWithTracker);
             }
             columnFamily = ColumnFamily.create(metadata);
-            ColumnFamily.serializer.deserializeFromSSTableNoColumns(columnFamily, inputWithTracker);
+            columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(inputWithTracker, dataVersion));
+            atomSerializer = columnFamily.getOnDiskSerializer();
             columnCount = inputWithTracker.readInt();
 
             columnPosition = dataStart + inputWithTracker.getBytesRead();
@@ -173,14 +178,14 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         return inputWithTracker.getBytesRead() < dataSize;
     }
 
-    public IColumn next()
+    public OnDiskAtom next()
     {
         try
         {
-            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, flag, expireBefore);
+            OnDiskAtom atom = atomSerializer.deserializeFromSSTable(inputWithTracker, flag, expireBefore, dataVersion);
             if (validateColumns)
-                column.validateFields(columnFamily.metadata());
-            return column;
+                atom.validateFields(columnFamily.metadata());
+            return atom;
         }
         catch (IOException e)
         {
@@ -232,7 +237,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         assert inputWithTracker.getBytesRead() == headerSize();
         ColumnFamily cf = columnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory(), false);
         // since we already read column count, just pass that value and continue deserialization
-        ColumnFamily.serializer.deserializeColumns(inputWithTracker, cf, columnCount, flag);
+        columnFamily.serializer.deserializeColumnsFromSSTable(inputWithTracker, cf, columnCount, flag, expireBefore, dataVersion);
         if (validateColumns)
         {
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index 8099f90..a0bc2cc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -250,17 +250,17 @@ public class SSTableMetadata
         {
             EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(dis);
             EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(dis);
-            ReplayPosition replayPosition = desc.metadataIncludesReplayPosition
+            ReplayPosition replayPosition = desc.version.metadataIncludesReplayPosition
                                           ? ReplayPosition.serializer.deserialize(dis)
                                           : ReplayPosition.NONE;
-            long maxTimestamp = desc.containsTimestamp() ? dis.readLong() : Long.MIN_VALUE;
-            if (!desc.tracksMaxTimestamp) // see javadoc to Descriptor.containsTimestamp
+            long maxTimestamp = desc.version.containsTimestamp() ? dis.readLong() : Long.MIN_VALUE;
+            if (!desc.version.tracksMaxTimestamp) // see javadoc to Descriptor.containsTimestamp
                 maxTimestamp = Long.MIN_VALUE;
-            double compressionRatio = desc.hasCompressionRatio
+            double compressionRatio = desc.version.hasCompressionRatio
                                     ? dis.readDouble()
                                               : Double.MIN_VALUE;
-            String partitioner = desc.hasPartitioner ? dis.readUTF() : null;
-            StreamingHistogram tombstoneHistogram = desc.tracksTombstones
+            String partitioner = desc.version.hasPartitioner ? dis.readUTF() : null;
+            StreamingHistogram tombstoneHistogram = desc.version.tracksTombstones
                                                    ? StreamingHistogram.serializer.deserialize(dis)
                                                    : defaultTombstoneDropTimeHistogram();
             return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio, partitioner, tombstoneHistogram);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a97f0b0..7783751 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -176,7 +176,7 @@ public class SSTableReader extends SSTable
         sstable.setTrackedBy(tracker);
 
         // versions before 'c' encoded keys as utf-16 before hashing to the filter
-        if (descriptor.hasStringsInBloomFilter)
+        if (descriptor.version.hasStringsInBloomFilter)
         {
             sstable.load(true, savedKeys);
         }
@@ -317,7 +317,7 @@ public class SSTableReader extends SSTable
         try
         {
             stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
-            bf = FilterFactory.deserialize(stream, descriptor.filterType);
+            bf = FilterFactory.deserialize(stream, descriptor.version.filterType);
         }
         finally
         {
@@ -361,7 +361,7 @@ public class SSTableReader extends SSTable
             while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
             {
                 ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
-                RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor);
+                RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
                 DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, key);
                 if(null == first)
                     first = decoratedKey;
@@ -506,7 +506,7 @@ public class SSTableReader extends SSTable
 
     public long getBloomFilterSerializedSize()
     {
-        return FilterFactory.serializedSize(bf, descriptor.filterType);
+        return FilterFactory.serializedSize(bf, descriptor.version.filterType);
     }
 
     /**
@@ -743,7 +743,7 @@ public class SSTableReader extends SSTable
                     int v = op.apply(comparison);
                     if (v == 0)
                     {
-                        RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(input, descriptor);
+                        RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(input, descriptor.version);
                         if (comparison == 0 && keyCache != null && keyCache.getCapacity() > 0)
                         {
                             assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
@@ -761,7 +761,7 @@ public class SSTableReader extends SSTable
                             bloomFilterTracker.addFalsePositive();
                         return null;
                     }
-                    RowIndexEntry.serializer.skip(input, descriptor);
+                    RowIndexEntry.serializer.skip(input, descriptor.version);
                 }
             }
             catch (IOException e)
@@ -916,7 +916,7 @@ public class SSTableReader extends SSTable
 
     public static long readRowSize(DataInput in, Descriptor d) throws IOException
     {
-        if (d.hasIntRowSize)
+        if (d.version.hasIntRowSize)
             return in.readInt();
         return in.readLong();
     }
@@ -936,7 +936,7 @@ public class SSTableReader extends SSTable
      */
     public static DecoratedKey decodeKey(IPartitioner p, Descriptor d, ByteBuffer bytes)
     {
-        if (d.hasEncodedKeys)
+        if (d.version.hasEncodedKeys)
             return p.convertFromDiskFormat(bytes);
         return p.decorateKey(bytes);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 820c0bb..94e8522 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -42,9 +42,9 @@ public class SSTableScanner implements ICompactionScanner
     protected final RandomAccessReader dfile;
     protected final RandomAccessReader ifile;
     public final SSTableReader sstable;
-    private IColumnIterator row;
+    private OnDiskAtomIterator row;
     protected boolean exhausted = false;
-    protected Iterator<IColumnIterator> iterator;
+    protected Iterator<OnDiskAtomIterator> iterator;
     private final QueryFilter filter;
 
     /**
@@ -118,7 +118,7 @@ public class SSTableScanner implements ICompactionScanner
                 }
                 else
                 {
-                    RowIndexEntry.serializer.skip(ifile, sstable.descriptor);
+                    RowIndexEntry.serializer.skip(ifile, sstable.descriptor.version);
                 }
             }
             exhausted = true;
@@ -155,14 +155,14 @@ public class SSTableScanner implements ICompactionScanner
     public boolean hasNext()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : createIterator();
+            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : createIterator();
         return iterator.hasNext();
     }
 
-    public IColumnIterator next()
+    public OnDiskAtomIterator next()
     {
         if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : createIterator();
+            iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : createIterator();
         return iterator.next();
     }
 
@@ -171,12 +171,12 @@ public class SSTableScanner implements ICompactionScanner
         throw new UnsupportedOperationException();
     }
 
-    private Iterator<IColumnIterator> createIterator()
+    private Iterator<OnDiskAtomIterator> createIterator()
     {
         return filter == null ? new KeyScanningIterator() : new FilteredKeyScanningIterator();
     }
 
-    protected class KeyScanningIterator implements Iterator<IColumnIterator>
+    protected class KeyScanningIterator implements Iterator<OnDiskAtomIterator>
     {
         protected long finishedAt;
 
@@ -195,7 +195,7 @@ public class SSTableScanner implements ICompactionScanner
             }
         }
 
-        public IColumnIterator next()
+        public OnDiskAtomIterator next()
         {
             try
             {
@@ -231,7 +231,7 @@ public class SSTableScanner implements ICompactionScanner
         }
     }
 
-    protected class FilteredKeyScanningIterator implements Iterator<IColumnIterator>
+    protected class FilteredKeyScanningIterator implements Iterator<OnDiskAtomIterator>
     {
         protected DecoratedKey nextKey;
         protected RowIndexEntry nextEntry;
@@ -251,7 +251,7 @@ public class SSTableScanner implements ICompactionScanner
             }
         }
 
-        public IColumnIterator next()
+        public OnDiskAtomIterator next()
         {
             try
             {
@@ -261,7 +261,7 @@ public class SSTableScanner implements ICompactionScanner
                 if (row == null)
                 {
                     currentKey = sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile));
-                    currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor);
+                    currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
                 }
                 else
                 {
@@ -277,7 +277,7 @@ public class SSTableScanner implements ICompactionScanner
                 else
                 {
                     nextKey = sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile));
-                    nextEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor);
+                    nextEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
                 }
 
                 assert !dfile.isEOF();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index b2f0c8e..43cd42c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.net.MessagingService;
 
 /**
  * A SSTable writer that doesn't assume rows are in sorted order.
@@ -88,7 +89,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
 
     protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
     {
-        currentSize += key.key.remaining() + ColumnFamily.serializer.serializedSize(columnFamily, TypeSizes.NATIVE) * 1.2;
+        currentSize += key.key.remaining() + ColumnFamily.serializer.serializedSize(columnFamily, MessagingService.current_version) * 1.2;
 
         if (currentSize > bufferSize)
             sync();
@@ -107,7 +108,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         {
             // We will reuse a CF that we have counted already. But because it will be easier to add the full size
             // of the CF in the next writeRow call than to find out the delta, we just remove the size until that next call
-            currentSize -= currentKey.key.remaining() + ColumnFamily.serializer.serializedSize(previous, TypeSizes.NATIVE) * 1.2;
+            currentSize -= currentKey.key.remaining() + ColumnFamily.serializer.serializedSize(previous, MessagingService.current_version) * 1.2;
         }
         return previous;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 8fb78ba..4219f64 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -170,13 +170,22 @@ public class SSTableWriter extends SSTable
         long startPosition = beforeAppend(decoratedKey);
         ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
 
-        // build column index
-        // TODO: build and serialization could be merged
-        ColumnIndex index = new ColumnIndex.Builder(cf.getComparator(), decoratedKey.key, cf.getColumnCount()).build(cf);
+        // Since the columnIndex may insert RangeTombstone marker, computing
+        // the size of the data is tricky.
+        DataOutputBuffer buffer = new DataOutputBuffer();
 
-        // write out row size + data
-        dataFile.stream.writeLong(ColumnFamily.serializer.serializedSizeForSSTable(cf, typeSizes));
-        ColumnFamily.serializer.serializeForSSTable(cf, dataFile.stream);
+        // build column index && write columns
+        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, decoratedKey.key, cf.getColumnCount(), buffer);
+        ColumnIndex index = builder.build(cf);
+
+        TypeSizes typeSizes = TypeSizes.NATIVE;
+        long delSize = DeletionTime.serializer.serializedSize(cf.deletionInfo().getTopLevelDeletion(), typeSizes);
+        dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0));
+
+        // Write deletion infos + column count
+        DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream);
+        dataFile.stream.writeInt(builder.writtenAtomCount());
+        dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
 
         afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
 
@@ -193,11 +202,8 @@ public class SSTableWriter extends SSTable
         dataFile.stream.writeLong(dataSize);
 
         // cf data
-        int lct = in.readInt();
-        long mfda = in.readLong();
-        DeletionInfo deletionInfo = new DeletionInfo(mfda, lct);
-        dataFile.stream.writeInt(lct);
-        dataFile.stream.writeLong(mfda);
+        DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(in, descriptor.version);
+        DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream);
 
         // column size
         int columnCount = in.readInt();
@@ -207,20 +213,22 @@ public class SSTableWriter extends SSTable
         long maxTimestamp = Long.MIN_VALUE;
         StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
         ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
-        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf.getComparator(), key.key, columnCount);
-        IColumnSerializer columnSerializer = cf.getColumnSerializer();
+        cf.delete(deletionInfo);
+
+        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, columnCount, dataFile.stream);
+        OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
         for (int i = 0; i < columnCount; i++)
         {
             // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
             // data size received, so we must reserialize the exact same data
-            IColumn column = columnSerializer.deserialize(in, IColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE);
-            if (column instanceof CounterColumn)
+            OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, IColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
+            if (atom instanceof CounterColumn)
             {
-                column = ((CounterColumn) column).markDeltaToBeCleared();
+                atom = ((CounterColumn) atom).markDeltaToBeCleared();
             }
-            else if (column instanceof SuperColumn)
+            else if (atom instanceof SuperColumn)
             {
-                SuperColumn sc = (SuperColumn) column;
+                SuperColumn sc = (SuperColumn) atom;
                 for (IColumn subColumn : sc.getSubColumns())
                 {
                     if (subColumn instanceof CounterColumn)
@@ -231,14 +239,13 @@ public class SSTableWriter extends SSTable
                 }
             }
 
-            int deletionTime = column.getLocalDeletionTime();
+            int deletionTime = atom.getLocalDeletionTime();
             if (deletionTime < Integer.MAX_VALUE)
             {
                 tombstones.update(deletionTime);
             }
-            maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
-            cf.getColumnSerializer().serialize(column, dataFile.stream);
-            columnIndexer.add(column);
+            maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
+            columnIndexer.add(atom); // This write the atom on disk too
         }
 
         assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
@@ -416,7 +423,7 @@ public class SSTableWriter extends SSTable
             // bloom filter
             FileOutputStream fos = new FileOutputStream(descriptor.filenameFor(SSTable.COMPONENT_FILTER));
             DataOutputStream stream = new DataOutputStream(fos);
-            FilterFactory.serialize(bf, stream, descriptor.filterType);
+            FilterFactory.serialize(bf, stream, descriptor.version.filterType);
             stream.flush();
             fos.getFD().sync();
             stream.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java
index 9f12aff..eb00e20 100644
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ b/src/java/org/apache/cassandra/streaming/StreamIn.java
@@ -74,12 +74,12 @@ public class StreamIn
         Descriptor remotedesc = remote.desc;
         if (!remotedesc.isStreamCompatible())
             throw new UnsupportedOperationException(String.format("SSTable %s is not compatible with current version %s",
-                                                                  remote.getFilename(), Descriptor.CURRENT_VERSION));
+                                                                  remote.getFilename(), Descriptor.Version.CURRENT));
 
         // new local sstable
         Table table = Table.open(remotedesc.ksname);
         ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
-        Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size, Descriptor.CURRENT_VERSION));
+        Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size, Descriptor.Version.CURRENT));
 
         return new PendingFile(localdesc, remote);
     }


Mime
View raw message