cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [1/4] git commit: Optimized data structure for range tombstones
Date Fri, 12 Jul 2013 14:41:37 GMT
Updated Branches:
  refs/heads/trunk daff1fce0 -> 88f65a182


Optimized data structure for range tombstones

patch by slebresne; reviewed by frousseau for CASSANDRA-5677


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6e81f816
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6e81f816
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6e81f816

Branch: refs/heads/trunk
Commit: 6e81f816c730de1798fede6073b0908bf49ee5c0
Parents: 39a38e6
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Fri Jun 28 18:02:37 2013 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Jul 12 16:36:52 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/AbstractColumnContainer.java   |   7 +-
 .../db/AbstractThreadUnsafeSortedColumns.java   |  11 +-
 .../cassandra/db/ArrayBackedSortedColumns.java  |   1 +
 .../cassandra/db/AtomicSortedColumns.java       |  32 +-
 .../org/apache/cassandra/db/ColumnFamily.java   |   4 +-
 .../cassandra/db/ColumnFamilySerializer.java    |   2 +-
 .../org/apache/cassandra/db/DeletionInfo.java   | 227 +++---
 .../org/apache/cassandra/db/ISortedColumns.java |   2 +
 .../apache/cassandra/db/RangeTombstoneList.java | 689 +++++++++++++++++++
 .../db/TreeMapBackedSortedColumns.java          |   1 +
 .../db/columniterator/IndexedSliceReader.java   |   6 +-
 .../db/columniterator/SSTableNamesIterator.java |   2 +-
 .../db/columniterator/SimpleSliceReader.java    |   4 +-
 .../db/compaction/LazilyCompactedRow.java       |   4 +-
 .../db/compaction/PrecompactedRow.java          |   4 +-
 .../io/sstable/SSTableIdentityIterator.java     |   2 +-
 .../cassandra/io/sstable/SSTableWriter.java     |  14 +-
 .../apache/cassandra/tools/SSTableExport.java   |   4 +-
 .../apache/cassandra/tools/SSTableImport.java   |   3 -
 test/unit/org/apache/cassandra/Util.java        |   2 +-
 .../cassandra/db/RangeTombstoneListTest.java    | 269 ++++++++
 .../cassandra/io/LazilyCompactedRowTest.java    |   4 +-
 23 files changed, 1113 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1e3154..50a9075 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
  * Don't keep ancestor information in memory (CASSANDRA-5342)
  * cqlsh: fix handling of semicolons inside BATCH queries (CASSANDRA-5697)
  * Expose native protocol server status in nodetool info (CASSANDRA-5735)
+ * Fix pathetic performance of range tombstones (CASSANDRA-5677)
 
 
 1.2.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
index 3bb8590..76e1547 100644
--- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
+++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
@@ -56,6 +56,11 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
         delete(new DeletionInfo(deletionTime));
     }
 
+    public void delete(RangeTombstone tombstone)
+    {
+        columns.delete(tombstone);
+    }
+
     // Contrarily to delete(), this will use the provided info even if those
     // are older that the current ones. Used for SuperColumn in QueryFilter.
     // delete() is probably the right method in all other cases.
@@ -197,7 +202,7 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
     public boolean hasIrrelevantData(int gcBefore)
     {
         // Do we have gcable deletion infos?
-        if (!deletionInfo().purge(gcBefore).equals(deletionInfo()))
+        if (deletionInfo().hasIrrelevantData(gcBefore))
             return true;
 
         // Do we have colums that are either deleted by the container or gcable tombstone?

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
index a4e5eb7..8a497ba 100644
--- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -27,7 +27,7 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn
 
     public AbstractThreadUnsafeSortedColumns()
     {
-        deletionInfo = DeletionInfo.LIVE;
+        deletionInfo = DeletionInfo.live();
     }
 
     public DeletionInfo getDeletionInfo()
@@ -37,7 +37,12 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn
 
     public void delete(DeletionInfo newInfo)
     {
-        deletionInfo = deletionInfo.add(newInfo);
+        deletionInfo.add(newInfo);
+    }
+
+    public void delete(RangeTombstone tombstone)
+    {
+        deletionInfo.add(tombstone, getComparator());
     }
 
     public void setDeletionInfo(DeletionInfo newInfo)
@@ -47,7 +52,7 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn
 
     public void maybeResetDeletionTimes(int gcBefore)
     {
-        deletionInfo = deletionInfo.purge(gcBefore);
+        deletionInfo.purge(gcBefore);
     }
 
     public void retainAll(ISortedColumns columns)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 8d813a3..d9e93d6 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -298,6 +298,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
 
     public void clear()
     {
+        setDeletionInfo(DeletionInfo.live());
         columns.clear();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index bdb2168..9803544 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -104,14 +104,22 @@ public class AtomicSortedColumns implements ISortedColumns
         return ref.get().deletionInfo;
     }
 
+    public void delete(RangeTombstone tombstone)
+    {
+        delete(new DeletionInfo(tombstone, getComparator()));
+    }
+
     public void delete(DeletionInfo info)
     {
+        if (info.isLive())
+            return;
+
         // Keeping deletion info for max markedForDeleteAt value
         while (true)
         {
             Holder current = ref.get();
-            DeletionInfo newDelInfo = current.deletionInfo.add(info);
-            if (newDelInfo == current.deletionInfo || ref.compareAndSet(current, current.with(newDelInfo)))
+            DeletionInfo newDelInfo = current.deletionInfo.copy().add(info);
+            if (ref.compareAndSet(current, current.with(newDelInfo)))
                 break;
         }
     }
@@ -126,8 +134,12 @@ public class AtomicSortedColumns implements ISortedColumns
         while (true)
         {
             Holder current = ref.get();
-            DeletionInfo purgedInfo = current.deletionInfo.purge(gcBefore);
-            if (purgedInfo == current.deletionInfo || ref.compareAndSet(current, current.with(DeletionInfo.LIVE)))
+            if (!current.deletionInfo.hasIrrelevantData(gcBefore))
+                break;
+
+            DeletionInfo purgedInfo = current.deletionInfo.copy();
+            purgedInfo.purge(gcBefore);
+            if (ref.compareAndSet(current, current.with(purgedInfo)))
                 break;
         }
     }
@@ -182,7 +194,7 @@ public class AtomicSortedColumns implements ISortedColumns
         {
             sizeDelta = 0;
             current = ref.get();
-            DeletionInfo newDelInfo = current.deletionInfo.add(cm.getDeletionInfo());
+            DeletionInfo newDelInfo = current.deletionInfo.copy().add(cm.getDeletionInfo());
             modified = new Holder(current.map.clone(), newDelInfo);
 
             for (IColumn column : cm.getSortedColumns())
@@ -297,17 +309,21 @@ public class AtomicSortedColumns implements ISortedColumns
 
     private static class Holder
     {
+        // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
+        // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
+        private static final DeletionInfo LIVE = DeletionInfo.live();
+
         final SnapTreeMap<ByteBuffer, IColumn> map;
         final DeletionInfo deletionInfo;
 
         Holder(AbstractType<?> comparator)
         {
-            this(new SnapTreeMap<ByteBuffer, IColumn>(comparator), DeletionInfo.LIVE);
+            this(new SnapTreeMap<ByteBuffer, IColumn>(comparator), LIVE);
         }
 
         Holder(SortedMap<ByteBuffer, IColumn> columns)
         {
-            this(new SnapTreeMap<ByteBuffer, IColumn>(columns), DeletionInfo.LIVE);
+            this(new SnapTreeMap<ByteBuffer, IColumn>(columns), LIVE);
         }
 
         Holder(SnapTreeMap<ByteBuffer, IColumn> map, DeletionInfo deletionInfo)
@@ -335,7 +351,7 @@ public class AtomicSortedColumns implements ISortedColumns
         // afterwards.
         Holder clear()
         {
-            return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), deletionInfo);
+            return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), LIVE);
         }
 
         long addColumn(IColumn column, Allocator allocator, SecondaryIndexManager.Updater indexer)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 6164900..02a23f8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -221,7 +221,7 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
         else
         {
             assert atom instanceof RangeTombstone;
-            delete(new DeletionInfo((RangeTombstone)atom, getComparator()));
+            delete((RangeTombstone)atom);
         }
     }
 
@@ -385,7 +385,7 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
 
     public ColumnStats getColumnStats()
     {
-        long minTimestampSeen = deletionInfo() == DeletionInfo.LIVE ? Long.MAX_VALUE : deletionInfo().minTimestamp();
+        long minTimestampSeen = deletionInfo().isLive() ? Long.MAX_VALUE : deletionInfo().minTimestamp();
         long maxTimestampSeen = deletionInfo().maxTimestamp();
         StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index bac1e8b..2cb1dab 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -151,7 +151,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
 
     public void deserializeFromSSTable(DataInput dis, ColumnFamily cf, IColumnSerializer.Flag flag, Descriptor.Version version) throws IOException
     {
-        cf.delete(DeletionInfo.serializer().deserializeFromSSTable(dis, version));
+        cf.delete(DeletionTime.serializer.deserialize(dis));
         int size = dis.readInt();
         int expireBefore = (int) (System.currentTimeMillis() / 1000);
         deserializeColumnsFromSSTable(dis, cf, size, flag, expireBefore, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 9cd4545..e486eeb 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -26,6 +26,7 @@ import java.util.*;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ISerializer;
@@ -43,36 +44,38 @@ public class DeletionInfo
 
     // We don't have way to represent the full interval of keys (Interval don't support the minimum token as the right bound),
     // so we keep the topLevel deletion info separatly. This also slightly optimize the case of full row deletion which is rather common.
-    private final DeletionTime topLevel;
-    private final IntervalTree<ByteBuffer, DeletionTime, RangeTombstone> ranges;
-
-    public static final DeletionInfo LIVE = new DeletionInfo(DeletionTime.LIVE, IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree());
+    private DeletionTime topLevel;
+    private RangeTombstoneList ranges; // null if no range tombstones (to save an allocation since it's a common case).
 
     public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
     {
         // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
         // (see CASSANDRA-3872)
-        this(new DeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime),
-             IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree());
+        this(new DeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime));
+    }
+
+    public DeletionInfo(DeletionTime topLevel)
+    {
+        this(topLevel, null);
     }
 
     public DeletionInfo(ByteBuffer start, ByteBuffer end, Comparator<ByteBuffer> comparator, long markedForDeleteAt, int localDeletionTime)
     {
-        this(new RangeTombstone(start, end, new DeletionTime(markedForDeleteAt, localDeletionTime)), comparator);
+        this(DeletionTime.LIVE, new RangeTombstoneList(comparator, 1));
+        ranges.add(start, end, markedForDeleteAt, localDeletionTime);
     }
 
     public DeletionInfo(RangeTombstone rangeTombstone, Comparator<ByteBuffer> comparator)
     {
-        this(DeletionTime.LIVE, IntervalTree.build(Collections.<RangeTombstone>singletonList(rangeTombstone), comparator));
-        assert comparator != null;
+        this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime);
     }
 
-    public DeletionInfo(DeletionTime topLevel)
+    public static DeletionInfo live()
     {
-        this(topLevel, IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree());
+        return new DeletionInfo(DeletionTime.LIVE);
     }
 
-    private DeletionInfo(DeletionTime topLevel, IntervalTree<ByteBuffer, DeletionTime, RangeTombstone> ranges)
+    private DeletionInfo(DeletionTime topLevel, RangeTombstoneList ranges)
     {
         this.topLevel = topLevel;
         this.ranges = ranges;
@@ -83,6 +86,11 @@ public class DeletionInfo
         return serializer;
     }
 
+    public DeletionInfo copy()
+    {
+        return new DeletionInfo(topLevel, ranges == null ? null : ranges.copy());
+    }
+
     /**
      * Returns whether this DeletionInfo is live, that is deletes no columns.
      */
@@ -90,7 +98,7 @@ public class DeletionInfo
     {
         return topLevel.markedForDeleteAt == Long.MIN_VALUE
             && topLevel.localDeletionTime == Integer.MAX_VALUE
-            && ranges.isEmpty();
+            && (ranges == null || ranges.isEmpty());
     }
 
     /**
@@ -107,91 +115,78 @@ public class DeletionInfo
 
     public boolean isDeleted(ByteBuffer name, long timestamp)
     {
+        // We do rely on this test: if topLevel.markedForDeleteAt is MIN_VALUE, we should not
+        // consider the column deleted even if timestamp=MIN_VALUE, otherwise this break QueryFilter.isRelevant
         if (isLive())
             return false;
         if (timestamp <= topLevel.markedForDeleteAt)
             return true;
 
-        for (DeletionTime d : ranges.search(name))
-        {
-            if (timestamp <= d.markedForDeleteAt)
-                return true;
-        }
-        return false;
+        return ranges != null && ranges.isDeleted(name, timestamp);
     }
 
     /**
-     * Return a new DeletionInfo correspond to purging every tombstones that
-     * are older than {@code gcbefore}.
+     * Purge every tombstones that are older than {@code gcbefore}.
      *
      * @param gcBefore timestamp (in seconds) before which tombstones should
      * be purged
-     * @return a new DeletionInfo with the purged info remove. Should return
-     * DeletionInfo.LIVE if no tombstones remain.
      */
-    public DeletionInfo purge(int gcBefore)
+    public void purge(int gcBefore)
     {
-        if (ranges.isEmpty())
+        topLevel = topLevel.localDeletionTime < gcBefore ? DeletionTime.LIVE : topLevel;
+
+        if (ranges != null)
         {
-            return topLevel.localDeletionTime < gcBefore ? LIVE : this;
-        }
-        else
-        {
-            // We rebuild a new intervalTree that contains only non expired range tombstones
-            List<RangeTombstone> nonExpired = new ArrayList<RangeTombstone>();
-            for (RangeTombstone range : ranges)
-            {
-                if (range.data.localDeletionTime >= gcBefore)
-                    nonExpired.add(range);
-            }
-            IntervalTree<ByteBuffer, DeletionTime, RangeTombstone> newRanges = nonExpired.size() == ranges.intervalCount()
-                                                                             ? ranges
-                                                                             : IntervalTree.build(nonExpired, ranges.comparator());
-            return topLevel.localDeletionTime < gcBefore
-                 ? new DeletionInfo(DeletionTime.LIVE, newRanges)
-                 : new DeletionInfo(topLevel, newRanges);
+            ranges.purge(gcBefore);
+            if (ranges.isEmpty())
+                ranges = null;
         }
     }
 
+    public boolean hasIrrelevantData(int gcBefore)
+    {
+        if (topLevel.localDeletionTime < gcBefore)
+            return true;
+
+        return ranges != null && ranges.hasIrrelevantData(gcBefore);
+    }
+
+    public void add(DeletionTime newInfo)
+    {
+        if (topLevel.markedForDeleteAt < newInfo.markedForDeleteAt)
+            topLevel = newInfo;
+    }
+
+    public void add(RangeTombstone tombstone, Comparator<ByteBuffer> comparator)
+    {
+        if (ranges == null)
+            ranges = new RangeTombstoneList(comparator, 1);
+
+        ranges.add(tombstone);
+    }
+
     /**
-     * Returns a new DeletionInfo containing of this plus the provided {@code
-     * newInfo}.
+     * Adds the provided deletion infos to the current ones.
+     *
+     * @return this object.
      */
     public DeletionInfo add(DeletionInfo newInfo)
     {
-        if (ranges.isEmpty())
-        {
-            return topLevel.markedForDeleteAt < newInfo.topLevel.markedForDeleteAt
-                 ? newInfo
-                 : newInfo.ranges.isEmpty() ? this : new DeletionInfo(topLevel, newInfo.ranges);
-        }
-        else
-        {
-            if (newInfo.ranges.isEmpty())
-            {
-                return topLevel.markedForDeleteAt < newInfo.topLevel.markedForDeleteAt
-                     ? new DeletionInfo(newInfo.topLevel, ranges)
-                     : this;
-            }
-            else
-            {
-                // Need to merge both ranges
-                Set<RangeTombstone> merged = new HashSet<RangeTombstone>();
-                Iterables.addAll(merged, Iterables.concat(ranges, newInfo.ranges));
-                return new DeletionInfo(topLevel.markedForDeleteAt < newInfo.topLevel.markedForDeleteAt ? newInfo.topLevel : topLevel,
-                                        IntervalTree.build(merged, ranges.comparator()));
-            }
-        }
+        add(newInfo.topLevel);
+
+        if (ranges == null)
+            ranges = newInfo.ranges == null ? null : newInfo.ranges.copy();
+        else if (newInfo.ranges != null)
+            ranges.addAll(newInfo.ranges);
+
+        return this;
     }
 
     public long minTimestamp()
     {
-        long minTimestamp = topLevel.markedForDeleteAt;
-        for (RangeTombstone i : ranges)
-        {
-            minTimestamp = Math.min(minTimestamp, i.data.markedForDeleteAt);
-        }
-        return minTimestamp;
+        return ranges == null
+             ? topLevel.markedForDeleteAt
+             : Math.min(topLevel.markedForDeleteAt, ranges.minMarkedAt());
     }
 
     /**
@@ -199,12 +194,9 @@ public class DeletionInfo
      */
     public long maxTimestamp()
     {
-        long maxTimestamp = topLevel.markedForDeleteAt;
-        for (RangeTombstone i : ranges)
-        {
-            maxTimestamp = Math.max(maxTimestamp, i.data.markedForDeleteAt);
-        }
-        return maxTimestamp;
+        return ranges == null
+             ? topLevel.markedForDeleteAt
+             : Math.max(topLevel.markedForDeleteAt, ranges.maxMarkedAt());
     }
 
     public DeletionTime getTopLevelDeletion()
@@ -212,26 +204,22 @@ public class DeletionInfo
         return topLevel;
     }
 
+    // Use sparingly, not the most efficient thing
     public Iterator<RangeTombstone> rangeIterator()
     {
-        return ranges.iterator();
+        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator();
     }
 
     public int dataSize()
     {
         int size = TypeSizes.NATIVE.sizeof(topLevel.markedForDeleteAt);
-        for (RangeTombstone r : ranges)
-        {
-            size += r.min.remaining() + r.max.remaining();
-            size += TypeSizes.NATIVE.sizeof(r.data.markedForDeleteAt);
-        }
-        return size;
+        return size + (ranges == null ? 0 : ranges.dataSize());
     }
 
     @Override
     public String toString()
     {
-        if (ranges.isEmpty())
+        if (ranges == null || ranges.isEmpty())
             return String.format("{%s}", topLevel);
         else
             return String.format("{%s, ranges=%s}", topLevel, rangesAsString());
@@ -243,8 +231,10 @@ public class DeletionInfo
         StringBuilder sb = new StringBuilder();
         AbstractType at = (AbstractType)ranges.comparator();
         assert at != null;
-        for (RangeTombstone i : ranges)
+        Iterator<RangeTombstone> iter = rangeIterator();
+        while (iter.hasNext())
         {
+            RangeTombstone i = iter.next();
             sb.append("[");
             sb.append(at.getString(i.min)).append("-");
             sb.append(at.getString(i.max)).append(", ");
@@ -260,7 +250,7 @@ public class DeletionInfo
         if(!(o instanceof DeletionInfo))
             return false;
         DeletionInfo that = (DeletionInfo)o;
-        return topLevel.equals(that.topLevel) && ranges.equals(that.ranges);
+        return topLevel.equals(that.topLevel) && Objects.equal(ranges, that.ranges);
     }
 
     @Override
@@ -269,63 +259,23 @@ public class DeletionInfo
         return Objects.hashCode(topLevel, ranges);
     }
 
-    public static class Serializer implements IVersionedSerializer<DeletionInfo>, ISSTableSerializer<DeletionInfo>
+    public static class Serializer implements IVersionedSerializer<DeletionInfo>
     {
-        private final static ISerializer<ByteBuffer> bbSerializer = new ISerializer<ByteBuffer>()
-        {
-            public void serialize(ByteBuffer bb, DataOutput dos) throws IOException
-            {
-                ByteBufferUtil.writeWithShortLength(bb, dos);
-            }
-
-            public ByteBuffer deserialize(DataInput dis) throws IOException
-            {
-                return ByteBufferUtil.readWithShortLength(dis);
-            }
-
-            public long serializedSize(ByteBuffer bb, TypeSizes typeSizes)
-            {
-                int bbSize = bb.remaining();
-                return typeSizes.sizeof((short)bbSize) + bbSize;
-            }
-        };
-
-        private final static IntervalTree.Serializer<ByteBuffer, DeletionTime, RangeTombstone> itSerializer;
-        static
-        {
-            try
-            {
-                Constructor<RangeTombstone> constructor = RangeTombstone.class.getConstructor(ByteBuffer.class, ByteBuffer.class, DeletionTime.class);
-                itSerializer = IntervalTree.serializer(bbSerializer, DeletionTime.serializer, constructor);
-            }
-            catch (NoSuchMethodException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
         public void serialize(DeletionInfo info, DataOutput out, int version) throws IOException
         {
             DeletionTime.serializer.serialize(info.topLevel, out);
-            // Pre-1.2 version don't know about range tombstones and thus users should upgrade all
-            // nodes before using them. If they didn't, better fail early that propagating bad info
             if (version < MessagingService.VERSION_12)
             {
-                if (!info.ranges.isEmpty())
+                if (info.ranges != null && !info.ranges.isEmpty())
                     throw new RuntimeException("Cannot send range tombstone to pre-1.2 node. You should upgrade all node to Cassandra 1.2+ before using range tombstone.");
                 // Otherwise we're done
             }
             else
             {
-                itSerializer.serialize(info.ranges, out, version);
+                RangeTombstoneList.serializer.serialize(info.ranges, out, version);
             }
         }
 
-        public void serializeForSSTable(DeletionInfo info, DataOutput out) throws IOException
-        {
-            DeletionTime.serializer.serialize(info.topLevel, out);
-        }
-
         /*
          * Range tombstones internally depend on the column family serializer, but it is not serialized.
          * Thus deserialize(DataInput, int, Comparator<ByteBuffer>) should be used instead of this method.
@@ -339,26 +289,21 @@ public class DeletionInfo
         {
             assert comparator != null;
             DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
+
             if (version < MessagingService.VERSION_12)
-                return new DeletionInfo(topLevel, IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree());
+                return new DeletionInfo(topLevel, null);
 
-            IntervalTree<ByteBuffer, DeletionTime, RangeTombstone> ranges = itSerializer.deserialize(in, version, comparator);
+            RangeTombstoneList ranges = RangeTombstoneList.serializer.deserialize(in, version, comparator);
             return new DeletionInfo(topLevel, ranges);
         }
 
-        public DeletionInfo deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException
-        {
-            DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
-            return new DeletionInfo(topLevel, IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree());
-        }
-
         public long serializedSize(DeletionInfo info, TypeSizes typeSizes, int version)
         {
             long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes);
             if (version < MessagingService.VERSION_12)
                 return size;
 
-            return size + itSerializer.serializedSize(info.ranges, typeSizes, version);
+            return size + RangeTombstoneList.serializer.serializedSize(info.ranges, typeSizes, version);
         }
 
         public long serializedSize(DeletionInfo info, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/ISortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java
index 5ccba19..4316d52 100644
--- a/src/java/org/apache/cassandra/db/ISortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -54,6 +54,8 @@ public interface ISortedColumns extends IIterableColumns
     public void setDeletionInfo(DeletionInfo info);
 
     public void delete(DeletionInfo info);
+    public void delete(RangeTombstone tombstone);
+
     public void maybeResetDeletionTimes(int gcBefore);
     public void retainAll(ISortedColumns columns);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
new file mode 100644
index 0000000..0a761cd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data structure holding the range tombstones of a ColumnFamily.
+ * <p>
+ * This is essentially a sorted list of non-overlapping (tombstone) ranges.
+ * <p>
+ * A range tombstone has 4 elements: the start and end of the range covered,
+ * and the deletion infos (markedAt timestamp and local deletion time). The
+ * markedAt timestamp is what define the priority of 2 overlapping tombstones.
+ * That is, given 2 tombstones [0, 10]@t1 and [5, 15]@t2, then if t2 > t1 (and
+ * are the tombstones markedAt values), the 2nd tombstone take precedence over
+ * the first one on [5, 10]. If such tombstones are added to a RangeTombstoneList,
+ * the range tombstone list will store them as [[0, 5]@t1, [5, 15]@t2].
+ * <p>
+ * The only use of the local deletion time is to know when a given tombstone can
+ * be purged, which will be done by the purge() method.
+ */
+public class RangeTombstoneList implements Iterable<RangeTombstone>
+{
+    private static final Logger logger = LoggerFactory.getLogger(RangeTombstoneList.class);
+
+    public static final Serializer serializer = new Serializer();
+
+    private final Comparator<ByteBuffer> comparator;
+
+    // Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could
+    // use a List for starts and ends, but having arrays everywhere is almost simpler.
+    private ByteBuffer[] starts;
+    private ByteBuffer[] ends;
+    private long[] markedAts;
+    private int[] delTimes;
+
+    private int size;
+
+    private RangeTombstoneList(Comparator<ByteBuffer> comparator, ByteBuffer[] starts, ByteBuffer[] ends, long[] markedAts, int[] delTimes, int size)
+    {
+        assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length;
+        this.comparator = comparator;
+        this.starts = starts;
+        this.ends = ends;
+        this.markedAts = markedAts;
+        this.delTimes = delTimes;
+        this.size = size;
+    }
+
+    public RangeTombstoneList(Comparator<ByteBuffer> comparator, int capacity)
+    {
+        this(comparator, new ByteBuffer[capacity], new ByteBuffer[capacity], new long[capacity], new int[capacity], 0);
+    }
+
+    public boolean isEmpty()
+    {
+        return size == 0;
+    }
+
+    public int size()
+    {
+        return size;
+    }
+
+    public Comparator<ByteBuffer> comparator()
+    {
+        return comparator;
+    }
+
+    public RangeTombstoneList copy()
+    {
+        return new RangeTombstoneList(comparator,
+                                      Arrays.copyOf(starts, size),
+                                      Arrays.copyOf(ends, size),
+                                      Arrays.copyOf(markedAts, size),
+                                      Arrays.copyOf(delTimes, size),
+                                      size);
+    }
+
+    public void add(RangeTombstone tombstone)
+    {
+        add(tombstone.min, tombstone.max, tombstone.data.markedForDeleteAt, tombstone.data.localDeletionTime);
+    }
+
+    /**
+     * Adds a new range tombstone.
+     *
+     * This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case), 
+     * but it doesn't assume it.
+     */
+    public void add(ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    {
+        if (isEmpty())
+        {
+            addInternal(0, start, end, markedAt, delTime);
+            return;
+        }
+
+        int c = comparator.compare(starts[size-1], start);
+
+        // Fast path if we add in sorted order
+        if (c <= 0)
+        {
+            // Note that we may still overlap the last range
+            insertFrom(size-1, start, end, markedAt, delTime);
+        }
+        else
+        {
+            int pos = Arrays.binarySearch(starts, 0, size, start, comparator);
+            if (pos >= 0)
+                insertFrom(pos, start, end, markedAt, delTime);
+            else
+                // Insertion point (-pos-1) is such start < start[-pos-1], so we should insert from the previous
+                insertFrom(-pos-2, start, end, markedAt, delTime);
+        }
+    }
+
+    /**
+     * Adds all the range tombstones of {@code tombstones} to this RangeTombstoneList.
+     */
+    public void addAll(RangeTombstoneList tombstones)
+    {
+        if (tombstones.isEmpty())
+            return;
+
+        if (isEmpty())
+        {
+            copyArrays(tombstones, this);
+            return;
+        }
+
+        /*
+         * We basically have 2 techniques we can use here: either we repeatedly call add() on tombstones values,
+         * or we do a merge of both (sorted) lists. If this lists is bigger enough than the one we add, the
+         * calling add() will be faster, otherwise it's merging that will be faster.
+         *
+         * Let's note that during memtables updates, it might not be uncommon that a new update has only a few range
+         * tombstones, while the CF we're adding it to (the on in the memtable) has many. In that case, using add() is
+         * likely going to be faster.
+         *
+         * In other cases however, like when diffing responses from multiple nodes, the tombstone lists we "merge" will
+         * be likely sized, so using add() might be a bit inefficient.
+         *
+         * Roughly speaking (this ignore the fact that updating an element is not exactly constant but that's not a big
+         * deal), if n is the size of this list and m is tombstones size, merging is O(n+m) while using add() is O(m*log(n)).
+         *
+         * But let's not crank up a logarithm computation for that. Long story short, merging will be a bad choice only
+         * if this list size is lot bigger that the other one, so let's keep it simple.
+         */
+        if (size > 10 * tombstones.size)
+        {
+            for (int i = 0; i < tombstones.size; i++)
+                add(tombstones.starts[i], tombstones.ends[i], tombstones.markedAts[i], tombstones.delTimes[i]);
+        }
+        else
+        {
+            int i = 0;
+            int j = 0;
+            while (i < size && j < tombstones.size)
+            {
+                if (comparator.compare(tombstones.starts[j], starts[i]) < 0)
+                {
+                    insertFrom(i-1, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]);
+                    j++;
+                }
+                else
+                {
+                    i++;
+                }
+            }
+            for (; j < tombstones.size; j++)
+                insertFrom((i++) - 1, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]);
+        }
+    }
+
+    /**
+     * Returns whether the given name/timestamp pair is deleted by one of the tombstone
+     * of this RangeTombstoneList.
+     */
+    public boolean isDeleted(ByteBuffer name, long timestamp)
+    {
+        int idx = searchInternal(name);
+        return idx >= 0 && markedAts[idx] >= timestamp;
+    }
+
+    /**
+     * Returns the DeletionTime for the tombstone overlapping {@code name} (there can't be more than one),
+     * or null if {@code name} is not covered by any tombstone.
+     */
+    public DeletionTime search(ByteBuffer name)
+    {
+        int idx = searchInternal(name);
+        return idx < 0 ? null : new DeletionTime(markedAts[idx], delTimes[idx]);
+    }
+
+    private int searchInternal(ByteBuffer name)
+    {
+        if (isEmpty())
+            return -1;
+
+        int pos = Arrays.binarySearch(starts, 0, size, name, comparator);
+        if (pos >= 0)
+        {
+            // We're exactly on an interval start. The one subtility is that we need to check if
+            // the previous is not equal to us and doesn't have a higher marked at
+            if (pos > 0 && comparator.compare(name, ends[pos-1]) == 0 && markedAts[pos-1] > markedAts[pos])
+                return pos-1;
+            else
+                return pos;
+        }
+        else
+        {
+            // We potentially intersect the range before our "insertion point"
+            int idx = -pos-2;
+            if (idx < 0)
+                return -1;
+
+            return comparator.compare(name, ends[idx]) <= 0 ? idx : -1;
+        }
+    }
+
+    public int dataSize()
+    {
+        int dataSize = TypeSizes.NATIVE.sizeof(size);
+        for (int i = 0; i < size; i++)
+        {
+            dataSize += starts[i].remaining() + ends[i].remaining();
+            dataSize += TypeSizes.NATIVE.sizeof(markedAts[i]);
+            dataSize += TypeSizes.NATIVE.sizeof(delTimes[i]);
+        }
+        return dataSize;
+    }
+
+    public long minMarkedAt()
+    {
+        long min = Long.MAX_VALUE;
+        for (int i = 0; i < size; i++)
+            min = Math.min(min, markedAts[i]);
+        return min;
+    }
+
+    public long maxMarkedAt()
+    {
+        long max = Long.MIN_VALUE;
+        for (int i = 0; i < size; i++)
+            max = Math.max(max, markedAts[i]);
+        return max;
+    }
+
+    /**
+     * Removes all range tombstones whose local deletion time is older than gcBefore.
+     */
+    public void purge(int gcBefore)
+    {
+        int j = 0;
+        for (int i = 0; i < size; i++)
+        {
+            if (delTimes[i] >= gcBefore)
+                setInternal(j++, starts[i], ends[i], markedAts[i], delTimes[i]);
+        }
+        size = j;
+    }
+
+    /**
+     * Returns whether {@code purge(gcBefore)} would remove something or not.
+     */
+    public boolean hasIrrelevantData(int gcBefore)
+    {
+        for (int i = 0; i < size; i++)
+        {
+            if (delTimes[i] < gcBefore)
+                return true;
+        }
+        return false;
+    }
+
+    public Iterator<RangeTombstone> iterator()
+    {
+        return new AbstractIterator<RangeTombstone>()
+        {
+            private int idx;
+
+            protected RangeTombstone computeNext()
+            {
+                if (idx >= size)
+                    return endOfData();
+
+                RangeTombstone t = new RangeTombstone(starts[idx], ends[idx], markedAts[idx], delTimes[idx]);
+                idx++;
+                return t;
+            }
+        };
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if(!(o instanceof RangeTombstoneList))
+            return false;
+        RangeTombstoneList that = (RangeTombstoneList)o;
+        if (size != that.size)
+            return false;
+
+        for (int i = 0; i < size; i++)
+        {
+            if (!starts[i].equals(that.starts[i]))
+                return false;
+            if (!ends[i].equals(that.ends[i]))
+                return false;
+            if (markedAts[i] != that.markedAts[i])
+                return false;
+            if (delTimes[i] != that.delTimes[i])
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        int result = size;
+        for (int i = 0; i < size; i++)
+        {
+            result += starts[i].hashCode() + ends[i].hashCode();
+            result += (int)(markedAts[i] ^ (markedAts[i] >>> 32));
+            result += delTimes[i];
+        }
+        return result;
+    }
+
+    private static void copyArrays(RangeTombstoneList src, RangeTombstoneList dst)
+    {
+        dst.grow(src.size);
+        System.arraycopy(src.starts, 0, dst.starts, 0, src.size);
+        System.arraycopy(src.ends, 0, dst.ends, 0, src.size);
+        System.arraycopy(src.markedAts, 0, dst.markedAts, 0, src.size);
+        System.arraycopy(src.delTimes, 0, dst.delTimes, 0, src.size);
+        dst.size = src.size;
+    }
+
+    /*
+     * Inserts a new element whose start should be inserted at index i. This method
+     * assumes that:
+     *   - starts[i] <= start.
+     *   - start < starts[i+1] or there is no i+1 element.
+     */
+    private void insertFrom(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    {
+        if (i < 0)
+        {
+            insertAfter(i, start, end, markedAt, delTime);
+            return;
+        }
+
+        /*
+         * We have elt(i) = [s_i, e_i]@t_i and want to insert X = [s, e]@t, knowing that s_i < s < s_i+1.
+         * We can have 3 cases:
+         *  - s <= e_i && e <= e_i: we're fully contained in i.
+         *  - s <= e_i && e > e_i: we rewrite X to X1=[s, e_i]@t + X2=[e_i, e]@t. X1 is fully contained
+         *             in i and X2 is the insertAfter() case for i.
+         *  - s > e_i: we're in the insertAfter() case for i.
+         */
+        if (comparator.compare(start, ends[i]) <= 0)
+        {
+            if (comparator.compare(end, ends[i]) <= 0)
+            {
+                update(i, start, end, markedAt, delTime);
+            }
+            else
+            {
+                insertAfter(i, ends[i], end, markedAt, delTime);
+                update(i, start, ends[i], markedAt, delTime);
+            }
+        }
+        else
+        {
+            insertAfter(i, start, end, markedAt, delTime);
+        }
+    }
+
+    /*
+     * Inserts a new element knowing that the new element start strictly after
+     * the one at index i, i.e that:
+     *   - ends[i] <= start (or i == -1)
+     */
+    private void insertAfter(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    {
+        if (i == size - 1)
+        {
+            addInternal(i+1, start, end, markedAt, delTime);
+            return;
+        }
+
+        /*
+         * We have the following intervals:
+         *           i            i+1
+         *   ..., [s1, e1]@t1, [s2, e2]@t2, ...
+         *
+         * And we want to insert X = [s, e]@t, knowing that e1 <= s.
+         * We can have 2 cases:
+         *  - s < s2: we rewrite X to X1=[s, s2]@t + X2=[s2, e]@t. X2 meet the weakInsertFrom() condition
+         *            for i+1, and X1 is a new element between i and i+1.
+         *  - s2 <= s: we're in the weakInsertFrom() case for i+1.
+         */
+        if (comparator.compare(start, starts[i+1]) < 0)
+        {
+            /*
+             * If it happens the new element is fully before the current one, we insert it and
+             * we're done
+             */
+            if (comparator.compare(end, starts[i+1]) <= 0)
+            {
+                addInternal(i+1, start, end, markedAt, delTime);
+                return;
+            }
+
+            weakInsertFrom(i+1, starts[i+1], end, markedAt, delTime);
+            addInternal(i+1, start, starts[i+1], markedAt, delTime);
+        }
+        else
+        {
+            weakInsertFrom(i+1, start, end, markedAt, delTime);
+        }
+    }
+
+    /*
+     * Weak version of insertFrom that only assumes the new element starts after index i,
+     * but without knowing about the 2nd condition, i.e. this only assume that:
+     *   - starts[i] <= start
+     */
+    private void weakInsertFrom(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    {
+        /*
+         * Either start is before the next element start, and we're in fact in the insertFrom()
+         * case, or it's not and it's an weakInsertFrom for the next index.
+         */
+        if (i == size - 1 || comparator.compare(start, starts[i+1]) < 0)
+            insertFrom(i, start, end, markedAt, delTime);
+        else
+            weakInsertFrom(i+1, start, end, markedAt, delTime);
+    }
+
+    /*
+     * Update index i with new element, assuming that new element is contained in the element i,
+     * i.e that:
+     *   - starts[i] <= s
+     *   - e <= end[i]
+     */
+    private void update(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    {
+        /*
+         * If the new markedAt is lower than the one of i, we can ignore the
+         * new element, otherwise we split the current element.
+         */
+        if (markedAts[i] < markedAt)
+        {
+            if (comparator.compare(ends[i], end) != 0)
+                addInternal(i+1, end, ends[i], markedAts[i], delTimes[i]);
+
+            if (comparator.compare(starts[i], start) == 0)
+            {
+                markedAts[i] = markedAt;
+                delTimes[i] = delTime;
+                ends[i] = end;
+            }
+            else
+            {
+                addInternal(i+1, start, end, markedAt, delTime);
+                ends[i] = start;
+            }
+        }
+    }
+
+    private int capacity()
+    {
+        return starts.length;
+    }
+
+    /*
+     * Adds the new tombstone at index i, growing and/or moving elements to make room for it.
+     */
+    private void addInternal(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    {
+        assert i >= 0;
+
+        if (size == capacity())
+            growToFree(i);
+        else if (i < size)
+            moveElements(i);
+
+        setInternal(i, start, end, markedAt, delTime);
+        size++;
+    }
+
+    /*
+     * Grow the arrays, leaving index i "free" in the process.
+     */
+    private void growToFree(int i)
+    {
+        int newLength = (capacity() * 3) / 2 + 1;
+        grow(i, newLength);
+    }
+
+    /*
+     * Grow the arrays to match newLength capacity.
+     */
+    private void grow(int newLength)
+    {
+        if (capacity() < newLength)
+            grow(-1, newLength);
+    }
+
+    private void grow(int i, int newLength)
+    {
+        starts = grow(starts, size, newLength, i);
+        ends = grow(ends, size, newLength, i);
+        markedAts = grow(markedAts, size, newLength, i);
+        delTimes = grow(delTimes, size, newLength, i);
+    }
+
+    private static ByteBuffer[] grow(ByteBuffer[] a, int size, int newLength, int i)
+    {
+        if (i < 0 || i >= size)
+            return Arrays.copyOf(a, newLength);
+
+        ByteBuffer[] newA = new ByteBuffer[newLength];
+        System.arraycopy(a, 0, newA, 0, i);
+        System.arraycopy(a, i, newA, i+1, size - i);
+        return newA;
+    }
+
+    private static long[] grow(long[] a, int size, int newLength, int i)
+    {
+        if (i < 0 || i >= size)
+            return Arrays.copyOf(a, newLength);
+
+        long[] newA = new long[newLength];
+        System.arraycopy(a, 0, newA, 0, i);
+        System.arraycopy(a, i, newA, i+1, size - i);
+        return newA;
+    }
+
+    private static int[] grow(int[] a, int size, int newLength, int i)
+    {
+        if (i < 0 || i >= size)
+            return Arrays.copyOf(a, newLength);
+
+        int[] newA = new int[newLength];
+        System.arraycopy(a, 0, newA, 0, i);
+        System.arraycopy(a, i, newA, i+1, size - i);
+        return newA;
+    }
+
+    /*
+     * Move elements so that index i is "free", assuming the arrays have at least one free slot at the end.
+     */
+    private void moveElements(int i)
+    {
+        if (i >= size)
+            return;
+
+        System.arraycopy(starts, i, starts, i+1, size - i);
+        System.arraycopy(ends, i, ends, i+1, size - i);
+        System.arraycopy(markedAts, i, markedAts, i+1, size - i);
+        System.arraycopy(delTimes, i, delTimes, i+1, size - i);
+    }
+
+    private void setInternal(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    {
+        starts[i] = start;
+        ends[i] = end;
+        markedAts[i] = markedAt;
+        delTimes[i] = delTime;
+    }
+
+    public static class Serializer implements IVersionedSerializer<RangeTombstoneList>
+    {
+        private Serializer() {}
+
+        public void serialize(RangeTombstoneList tombstones, DataOutput out, int version) throws IOException
+        {
+            if (tombstones == null)
+            {
+                out.writeInt(0);
+                return;
+            }
+
+            out.writeInt(tombstones.size);
+            for (int i = 0; i < tombstones.size; i++)
+            {
+                ByteBufferUtil.writeWithShortLength(tombstones.starts[i], out);
+                ByteBufferUtil.writeWithShortLength(tombstones.ends[i], out);
+                out.writeInt(tombstones.delTimes[i]);
+                out.writeLong(tombstones.markedAts[i]);
+            }
+        }
+
+        /*
+         * RangeTombstoneList depends on the column family comparator, but it is not serialized.
+         * Thus deserialize(DataInput, int, Comparator<ByteBuffer>) should be used instead of this method.
+         */
+        public RangeTombstoneList deserialize(DataInput in, int version) throws IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public RangeTombstoneList deserialize(DataInput in, int version, Comparator<ByteBuffer> comparator) throws IOException
+        {
+            int size = in.readInt();
+            if (size == 0)
+                return null;
+
+            RangeTombstoneList tombstones = new RangeTombstoneList(comparator, size);
+
+            for (int i = 0; i < size; i++)
+            {
+                ByteBuffer start = ByteBufferUtil.readWithShortLength(in);
+                ByteBuffer end = ByteBufferUtil.readWithShortLength(in);
+                int delTime =  in.readInt();
+                long markedAt = in.readLong();
+
+                /*
+                 * The previous implementation of this wasn't guaranteeing that range were not overlapping (only
+                 * that they were sorted by starting bound). So we can't just use setInternal (but we will be
+                 * able to in 2.0). Not a big deal though, add() is pretty fast, especially given that the
+                 * start is sorted.
+                 */
+                tombstones.add(start, end, markedAt, delTime);
+            }
+            return tombstones;
+        }
+
+        public long serializedSize(RangeTombstoneList tombstones, TypeSizes typeSizes, int version)
+        {
+            if (tombstones == null)
+                return typeSizes.sizeof(0);
+
+            long size = typeSizes.sizeof(tombstones.size);
+            for (int i = 0; i < tombstones.size; i++)
+            {
+                int startSize = tombstones.starts[i].remaining();
+                size += typeSizes.sizeof((short)startSize) + startSize;
+                int endSize = tombstones.ends[i].remaining();
+                size += typeSizes.sizeof((short)endSize) + endSize;
+                size += typeSizes.sizeof(tombstones.delTimes[i]);
+                size += typeSizes.sizeof(tombstones.markedAts[i]);
+            }
+            return size;
+        }
+
+        public long serializedSize(RangeTombstoneList tombstones, int version)
+        {
+            return serializedSize(tombstones, TypeSizes.NATIVE, version);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
index c4ec52f..9c70181 100644
--- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
@@ -182,6 +182,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
 
     public void clear()
     {
+        setDeletionInfo(DeletionInfo.live());
         map.clear();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index 21eb48b..bc8c7ca 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -27,7 +27,7 @@ import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.OnDiskAtom;
 import org.apache.cassandra.db.RangeTombstone;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -90,7 +90,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                 {
                     setToRowStart(sstable, indexEntry, input);
                     this.emptyColumnFamily = ColumnFamily.create(sstable.metadata);
-                    emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
+                    emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
                     fetcher = new SimpleBlockFetcher();
                 }
                 else
@@ -106,7 +106,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                 IndexHelper.skipBloomFilter(file);
                 this.indexes = IndexHelper.deserializeIndex(file);
                 this.emptyColumnFamily = ColumnFamily.create(sstable.metadata);
-                emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
+                emptyColumnFamily.delete(indexEntry.deletionTime());
                 fetcher = indexes.isEmpty()
                         ? new SimpleBlockFetcher()
                         : new IndexedBlockFetcher(file.getFilePointer() + 4); // We still have the column count to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index 32c0190..df28c46 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -137,7 +137,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
             try
             {
                 cf = ColumnFamily.create(sstable.metadata);
-                cf.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version));
+                cf.delete(DeletionTime.serializer.deserialize(file));
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index 19a73df..43b31fa 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -24,7 +24,7 @@ import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.OnDiskAtom;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -82,7 +82,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
             }
 
             emptyColumnFamily = ColumnFamily.create(sstable.metadata);
-            emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
+            emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
             atomSerializer = emptyColumnFamily.getOnDiskSerializer();
             columns = file.readInt();
             mark = file.mark();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 1433add..9575d41 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -116,7 +116,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         assert !closed;
 
         DataOutputBuffer clockOut = new DataOutputBuffer();
-        DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), clockOut);
+        DeletionTime.serializer.serialize(emptyColumnFamily.deletionInfo().getTopLevelDeletion(), clockOut);
 
         long dataSize = clockOut.getLength() + columnSerializedSize;
         if (logger.isDebugEnabled())
@@ -148,7 +148,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
 
         try
         {
-            DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), out);
+            DeletionTime.serializer.serialize(emptyColumnFamily.deletionInfo().getTopLevelDeletion(), out);
             out.writeInt(columnStats.columnCount);
             digest.update(out.getData(), 0, out.getLength());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index b1f639e..7cf6261 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -170,7 +170,7 @@ public class PrecompactedRow extends AbstractCompactedRow
         long delSize = DeletionTime.serializer.serializedSize(compactedCf.deletionInfo().getTopLevelDeletion(), typeSizes);
         long dataSize = buffer.getLength() + delSize + typeSizes.sizeof(0);
         out.writeLong(dataSize);
-        DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), out);
+        DeletionTime.serializer.serialize(compactedCf.deletionInfo().getTopLevelDeletion(), out);
         out.writeInt(builder.writtenAtomCount());
         out.write(buffer.getData(), 0, buffer.getLength());
         return dataSize;
@@ -182,7 +182,7 @@ public class PrecompactedRow extends AbstractCompactedRow
         DataOutputBuffer buffer = new DataOutputBuffer();
         try
         {
-            DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), buffer);
+            DeletionTime.serializer.serialize(compactedCf.deletionInfo().getTopLevelDeletion(), buffer);
             buffer.writeInt(compactedCf.getColumnCount());
             digest.update(buffer.getData(), 0, buffer.getLength());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 07d78e4..381fdb9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -155,7 +155,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
                 IndexHelper.skipIndex(inputWithTracker);
             }
             columnFamily = ColumnFamily.create(metadata);
-            columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(inputWithTracker, dataVersion));
+            columnFamily.delete(DeletionTime.serializer.deserialize(inputWithTracker));
             atomSerializer = columnFamily.getOnDiskSerializer();
             columnCount = inputWithTracker.readInt();
             columnPosition = dataStart + inputWithTracker.getBytesRead();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 54e154c..48b26d5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -134,7 +134,7 @@ public class SSTableWriter extends SSTable
         return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
     }
 
-    private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index)
+    private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionTime delInfo, ColumnIndex index)
     {
         lastWrittenKey = decoratedKey;
         last = lastWrittenKey;
@@ -144,7 +144,7 @@ public class SSTableWriter extends SSTable
         if (logger.isTraceEnabled())
             logger.trace("wrote " + decoratedKey + " at " + dataPosition);
         // range tombstones are part of the Atoms we write as the row contents, so RIE only gets row-level tombstones
-        RowIndexEntry entry = RowIndexEntry.create(dataPosition, delInfo.getTopLevelDeletion(), index);
+        RowIndexEntry entry = RowIndexEntry.create(dataPosition, delInfo, index);
         iwriter.append(decoratedKey, entry);
         dbuilder.addPotentialBoundary(dataPosition);
         return entry;
@@ -166,7 +166,7 @@ public class SSTableWriter extends SSTable
             throw new FSWriteError(e, dataFile.getPath());
         }
         sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
-        return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
+        return afterAppend(row.key, currentPosition, row.deletionInfo().getTopLevelDeletion(), row.index());
     }
 
     public void append(DecoratedKey decoratedKey, ColumnFamily cf)
@@ -189,10 +189,10 @@ public class SSTableWriter extends SSTable
             dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0));
 
             // Write deletion infos + column count
-            DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream);
+            DeletionTime.serializer.serialize(cf.deletionInfo().getTopLevelDeletion(), dataFile.stream);
             dataFile.stream.writeInt(builder.writtenAtomCount());
             dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
-            afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
+            afterAppend(decoratedKey, startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
         }
         catch (IOException e)
         {
@@ -221,12 +221,12 @@ public class SSTableWriter extends SSTable
             throw new FSWriteError(e, dataFile.getPath());
         }
 
-        DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(in, descriptor.version);
+        DeletionTime deletionInfo = DeletionTime.serializer.deserialize(in);
         int columnCount = in.readInt();
 
         try
         {
-            DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream);
+            DeletionTime.serializer.serialize(deletionInfo, dataFile.stream);
             dataFile.stream.writeInt(columnCount);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index fc64da6..d3fca4c 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -101,7 +101,7 @@ public class SSTableExport
         if (columnContainer instanceof ColumnFamily)
         {
             ColumnFamily columnFamily = (ColumnFamily) columnContainer;
-            if (!columnFamily.deletionInfo().equals(DeletionInfo.LIVE))
+            if (!columnFamily.deletionInfo().isLive())
             {
                 // begin meta
                 writeKey(out, "metadata");
@@ -116,7 +116,7 @@ public class SSTableExport
             SuperColumn superColumn = (SuperColumn) columnContainer;
             DeletionInfo deletionInfo = new DeletionInfo(superColumn.getMarkedForDeleteAt(),
                     superColumn.getLocalDeletionTime());
-            if (!deletionInfo.equals(DeletionInfo.LIVE))
+            if (!deletionInfo.isLive())
             {
                 writeKey(out, "metadata");
                 writeDeletionInfo(out, deletionInfo.getTopLevelDeletion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index 74a86ca..71b1b60 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -359,9 +359,6 @@ public class SSTableImport
             writer.append(row.getKey(), columnFamily);
             columnFamily.clear();
 
-            // ready the column family for the next row since we might have read deletionInfo metadata
-            columnFamily.delete(DeletionInfo.LIVE);
-
             importedKeys++;
 
             long current = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index b2414ce..0f4dbe0 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -292,7 +292,7 @@ public class Util
         {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream(baos);
-            DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dos);
+            DeletionTime.serializer.serialize(cf.deletionInfo().getTopLevelDeletion(), dos);
             dos.writeInt(cf.getColumnCount());
             new ColumnIndex.Builder(cf, ByteBufferUtil.EMPTY_BYTE_BUFFER, dos).build(cf);
             return ByteBuffer.wrap(baos.toByteArray());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
new file mode 100644
index 0000000..39f9e6c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
@@ -0,0 +1,269 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class RangeTombstoneListTest
+{
+    private static final Comparator<ByteBuffer> cmp = IntegerType.instance;
+
+    @Test
+    public void sortedAdditionTest()
+    {
+        sortedAdditionTest(0);
+        sortedAdditionTest(10);
+    }
+
+    private void sortedAdditionTest(int initialCapacity)
+    {
+        RangeTombstoneList l = new RangeTombstoneList(cmp, initialCapacity);
+        RangeTombstone rt1 = rt(1, 5, 3);
+        RangeTombstone rt2 = rt(7, 10, 2);
+        RangeTombstone rt3 = rt(10, 13, 1);
+
+        l.add(rt1);
+        l.add(rt2);
+        l.add(rt3);
+
+        Iterator<RangeTombstone> iter = l.iterator();
+        assertRT(rt1, iter.next());
+        assertRT(rt2, iter.next());
+        assertRT(rt3, iter.next());
+
+        assert !iter.hasNext();
+    }
+
+    @Test
+    public void nonSortedAdditionTest()
+    {
+        nonSortedAdditionTest(0);
+        nonSortedAdditionTest(10);
+    }
+
+    private void nonSortedAdditionTest(int initialCapacity)
+    {
+        RangeTombstoneList l = new RangeTombstoneList(cmp, initialCapacity);
+        RangeTombstone rt1 = rt(1, 5, 3);
+        RangeTombstone rt2 = rt(7, 10, 2);
+        RangeTombstone rt3 = rt(10, 13, 1);
+
+        l.add(rt2);
+        l.add(rt1);
+        l.add(rt3);
+
+        Iterator<RangeTombstone> iter = l.iterator();
+        assertRT(rt1, iter.next());
+        assertRT(rt2, iter.next());
+        assertRT(rt3, iter.next());
+
+        assert !iter.hasNext();
+    }
+
+    @Test
+    public void overlappingAdditionTest()
+    {
+        overlappingAdditionTest(0);
+        overlappingAdditionTest(10);
+    }
+
+    private void overlappingAdditionTest(int initialCapacity)
+    {
+        RangeTombstoneList l = new RangeTombstoneList(cmp, initialCapacity);
+
+        l.add(rt(4, 10, 3));
+        l.add(rt(1, 7, 2));
+        l.add(rt(8, 13, 4));
+        l.add(rt(0, 15, 1));
+
+        Iterator<RangeTombstone> iter = l.iterator();
+        assertRT(rt(0, 1, 1), iter.next());
+        assertRT(rt(1, 4, 2), iter.next());
+        assertRT(rt(4, 8, 3), iter.next());
+        assertRT(rt(8, 10, 4), iter.next());
+        assertRT(rt(10, 13, 4), iter.next());
+        assertRT(rt(13, 15, 1), iter.next());
+        assert !iter.hasNext();
+
+        RangeTombstoneList l2 = new RangeTombstoneList(cmp, initialCapacity);
+        l.add(rt(4, 10, 12L));
+        l.add(rt(0, 8, 25L));
+
+        assertEquals(25L, l.search(b(8)).markedForDeleteAt);
+    }
+
+    @Test
+    public void overlappingSearchTest()
+    {
+    }
+
+    @Test
+    public void simpleOverlapTest()
+    {
+        RangeTombstoneList l1 = new RangeTombstoneList(cmp, 0);
+        l1.add(rt(0, 10, 3));
+        l1.add(rt(3, 7, 5));
+
+        Iterator<RangeTombstone> iter1 = l1.iterator();
+        assertRT(rt(0, 3, 3), iter1.next());
+        assertRT(rt(3, 7, 5), iter1.next());
+        assertRT(rt(7, 10, 3), iter1.next());
+        assert !iter1.hasNext();
+
+        RangeTombstoneList l2 = new RangeTombstoneList(cmp, 0);
+        l2.add(rt(0, 10, 3));
+        l2.add(rt(3, 7, 2));
+
+        Iterator<RangeTombstone> iter2 = l2.iterator();
+        assertRT(rt(0, 10, 3), iter2.next());
+        assert !iter2.hasNext();
+    }
+
+    @Test
+    public void searchTest()
+    {
+        RangeTombstoneList l = new RangeTombstoneList(cmp, 0);
+        l.add(rt(0, 4, 5));
+        l.add(rt(4, 6, 2));
+        l.add(rt(9, 12, 1));
+        l.add(rt(14, 15, 3));
+        l.add(rt(15, 17, 6));
+
+        assertEquals(null, l.search(b(-1)));
+
+        assertEquals(5, l.search(b(0)).markedForDeleteAt);
+        assertEquals(5, l.search(b(3)).markedForDeleteAt);
+        assertEquals(5, l.search(b(4)).markedForDeleteAt);
+
+        assertEquals(2, l.search(b(5)).markedForDeleteAt);
+
+        assertEquals(null, l.search(b(7)));
+
+        assertEquals(3, l.search(b(14)).markedForDeleteAt);
+
+        assertEquals(6, l.search(b(15)).markedForDeleteAt);
+        assertEquals(null, l.search(b(18)));
+    }
+
+    @Test
+    public void addAllTest()
+    {
+        //addAllTest(false);
+        addAllTest(true);
+    }
+
+    private void addAllTest(boolean doMerge)
+    {
+        RangeTombstoneList l1 = new RangeTombstoneList(cmp, 0);
+        l1.add(rt(0, 4, 5));
+        l1.add(rt(6, 10, 2));
+        l1.add(rt(15, 17, 1));
+
+        RangeTombstoneList l2 = new RangeTombstoneList(cmp, 0);
+        l2.add(rt(3, 5, 7));
+        l2.add(rt(7, 8, 3));
+        l2.add(rt(8, 12, 1));
+        l2.add(rt(14, 17, 4));
+
+        l1.addAll(l2);
+
+        Iterator<RangeTombstone> iter = l1.iterator();
+        assertRT(rt(0, 3, 5), iter.next());
+        assertRT(rt(3, 4, 7), iter.next());
+        assertRT(rt(4, 5, 7), iter.next());
+        assertRT(rt(6, 7, 2), iter.next());
+        assertRT(rt(7, 8, 3), iter.next());
+        assertRT(rt(8, 10, 2), iter.next());
+        assertRT(rt(10, 12, 1), iter.next());
+        assertRT(rt(14, 15, 4), iter.next());
+        assertRT(rt(15, 17, 4), iter.next());
+
+        assert !iter.hasNext();
+    }
+
+    @Test
+    public void purgetTest()
+    {
+        RangeTombstoneList l = new RangeTombstoneList(cmp, 0);
+        l.add(rt(0, 4, 5, 110));
+        l.add(rt(4, 6, 2, 98));
+        l.add(rt(9, 12, 1, 200));
+        l.add(rt(14, 15, 3, 3));
+        l.add(rt(15, 17, 6, 45));
+
+        l.purge(100);
+
+        Iterator<RangeTombstone> iter = l.iterator();
+        assertRT(rt(0, 4, 5, 110), iter.next());
+        assertRT(rt(9, 12, 1, 200), iter.next());
+
+        assert !iter.hasNext();
+    }
+
+    @Test
+    public void minMaxTest()
+    {
+        RangeTombstoneList l = new RangeTombstoneList(cmp, 0);
+        l.add(rt(0, 4, 5, 110));
+        l.add(rt(4, 6, 2, 98));
+        l.add(rt(9, 12, 1, 200));
+        l.add(rt(14, 15, 3, 3));
+        l.add(rt(15, 17, 6, 45));
+
+        assertEquals(1, l.minMarkedAt());
+        assertEquals(6, l.maxMarkedAt());
+    }
+
+    private static void assertRT(RangeTombstone expected, RangeTombstone actual)
+    {
+        assertEquals(String.format("Expected %s but got %s", toString(expected), toString(actual)), expected, actual);
+    }
+
+    private static String toString(RangeTombstone rt)
+    {
+        return String.format("[%d, %d]@%d", i(rt.min), i(rt.max), rt.data.markedForDeleteAt);
+    }
+
+    private static ByteBuffer b(int i)
+    {
+        return ByteBufferUtil.bytes(i);
+    }
+
+    private static int i(ByteBuffer bb)
+    {
+        return ByteBufferUtil.toInt(bb);
+    }
+
+    private static RangeTombstone rt(int start, int end, long tstamp)
+    {
+        return rt(start, end, tstamp, 0);
+    }
+
+    private static RangeTombstone rt(int start, int end, long tstamp, int delTime)
+    {
+        return new RangeTombstone(b(start), b(end), tstamp, delTime);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e81f816/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
index 8355686..7f9ca18 100644
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
@@ -119,8 +119,8 @@ public class LazilyCompactedRowTest extends SchemaLoader
             // cf metadata
             ColumnFamily cf1 = ColumnFamily.create(cfs.metadata);
             ColumnFamily cf2 = ColumnFamily.create(cfs.metadata);
-            cf1.delete(DeletionInfo.serializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT));
-            cf2.delete(DeletionInfo.serializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT));
+            cf1.delete(DeletionTime.serializer.deserialize(in1));
+            cf2.delete(DeletionTime.serializer.deserialize(in2));
             assert cf1.deletionInfo().equals(cf2.deletionInfo());
             // columns
             int columns = in1.readInt();


Mime
View raw message