cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [4/4] Add support for range tombstones
Date Fri, 18 May 2012 18:35:10 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 5b9e7e0..5ba3f6f 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -94,7 +94,7 @@ public class SSTableExport
      * @param comparator columns comparator
      * @param cfMetaData Column Family metadata (to get validator)
      */
-    private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType<?> comparator, CFMetaData cfMetaData)
+    private static void serializeColumns(Iterator<OnDiskAtom> columns, PrintStream out, AbstractType<?> comparator, CFMetaData cfMetaData)
     {
         while (columns.hasNext())
         {
@@ -105,6 +105,37 @@ public class SSTableExport
         }
     }
 
+    private static void serializeIColumns(Iterator<IColumn> columns, PrintStream out, AbstractType<?> comparator, CFMetaData cfMetaData)
+    {
+        while (columns.hasNext())
+        {
+            writeJSON(out, serializeColumn(columns.next(), comparator, cfMetaData));
+
+            if (columns.hasNext())
+                out.print(", ");
+        }
+    }
+
+    private static List<Object> serializeColumn(OnDiskAtom column, AbstractType<?> comparator, CFMetaData cfMetaData)
+    {
+        if (column instanceof IColumn)
+        {
+            return serializeColumn((IColumn)column, comparator, cfMetaData);
+        }
+        else
+        {
+            assert column instanceof RangeTombstone;
+            RangeTombstone rt = (RangeTombstone)column;
+            ArrayList<Object> serializedColumn = new ArrayList<Object>();
+            serializedColumn.add(comparator.getString(rt.min));
+            serializedColumn.add(comparator.getString(rt.max));
+            serializedColumn.add(rt.data.markedForDeleteAt);
+            serializedColumn.add("t");
+            serializedColumn.add(rt.data.localDeletionTime);
+            return serializedColumn;
+        }
+    }
+
     /**
      * Serialize a given column to the JSON format
      *
@@ -172,8 +203,9 @@ public class SSTableExport
         {
             while (row.hasNext())
             {
-                IColumn column = row.next();
-
+                OnDiskAtom scol = row.next();
+                assert scol instanceof IColumn;
+                IColumn column = (IColumn)scol;
                 writeKey(out, comparator.getString(column.name()));
                 out.print("{");
                 writeKey(out, "deletedAt");
@@ -181,7 +213,7 @@ public class SSTableExport
                 out.print(", ");
                 writeKey(out, "subColumns");
                 out.print("[");
-                serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData);
+                serializeIColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData);
                 out.print("]");
                 out.print("}");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index 414585b..bae198e 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -132,6 +132,10 @@ public class SSTableImport
                         {
                             timestampOfLastDelete = (long) ((Integer) fields.get(4));
                         }
+                        else if (isRangeTombstone())
+                        {
+                            localExpirationTime = (Integer) fields.get(4);
+                        }
                     }
                 }
 
@@ -155,6 +159,11 @@ public class SSTableImport
             return kind.equals("c");
         }
 
+        public boolean isRangeTombstone()
+        {
+            return kind.equals("t");
+        }
+
         public ByteBuffer getName()
         {
             return name.duplicate();
@@ -200,6 +209,10 @@ public class SSTableImport
             {
                 cfamily.addTombstone(path, col.getValue(), col.timestamp);
             }
+            else if (col.isRangeTombstone())
+            {
+                cfamily.addAtom(new RangeTombstone(col.getName(), col.getValue(), col.timestamp, col.localExpirationTime));
+            }
             else
             {
                 cfamily.addColumn(path, col.getValue(), col.timestamp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/utils/Interval.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Interval.java b/src/java/org/apache/cassandra/utils/Interval.java
new file mode 100644
index 0000000..c74f153
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Interval.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import com.google.common.base.Objects;
+
+public class Interval<C, D>
+{
+    public final C min;
+    public final C max;
+    public final D data;
+
+    public Interval(C min, C max, D data)
+    {
+        this.min = min;
+        this.max = max;
+        this.data = data;
+    }
+
+    public static <C, D> Interval<C, D> create(C min, C max)
+    {
+        return create(min, max, null);
+    }
+
+    public static <C, D> Interval<C, D> create(C min, C max, D data)
+    {
+        return new Interval(min, max, data);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("[%s, %s]%s", min, max, data == null ? "" : (String.format("(%s)", data)));
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        return Objects.hashCode(min, max, data);
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if(!(o instanceof Interval))
+            return false;
+
+        Interval that = (Interval)o;
+        // handles nulls properly
+        return Objects.equal(min, that.min) && Objects.equal(max, that.max) && Objects.equal(data, that.data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/utils/IntervalTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IntervalTree.java b/src/java/org/apache/cassandra/utils/IntervalTree.java
new file mode 100644
index 0000000..7ccc75f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/IntervalTree.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.IVersionedSerializer;
+
+public class IntervalTree<C, D, I extends Interval<C, D>> implements Iterable<I>
+{
+    private static final Logger logger = LoggerFactory.getLogger(IntervalTree.class);
+
+    @SuppressWarnings("unchecked")
+    private static final IntervalTree EMPTY_TREE = new IntervalTree(null, null);
+
+    private final IntervalNode head;
+    private final int count;
+    private final Comparator<C> comparator;
+
+    final Ordering<I> minOrdering;
+    final Ordering<I> maxOrdering;
+
+    protected IntervalTree(Collection<I> intervals, Comparator<C> comparator)
+    {
+        this.comparator = comparator;
+
+        final IntervalTree it = this;
+        this.minOrdering = new Ordering<I>()
+        {
+            public int compare(I interval1, I interval2)
+            {
+                return it.comparePoints(interval1.min, interval2.min);
+            }
+        };
+        this.maxOrdering = new Ordering<I>()
+        {
+            public int compare(I interval1, I interval2)
+            {
+                return it.comparePoints(interval1.max, interval2.max);
+            }
+        };
+
+        this.head = intervals == null || intervals.isEmpty() ? null : new IntervalNode(intervals);
+        this.count = intervals == null ? 0 : intervals.size();
+    }
+
+    public static <C, D, I extends Interval<C, D>> IntervalTree<C, D, I> build(Collection<I> intervals, Comparator<C> comparator)
+    {
+        if (intervals == null || intervals.isEmpty())
+            return emptyTree();
+
+        return new IntervalTree<C, D, I>(intervals, comparator);
+    }
+
+    public static <C extends Comparable<C>, D, I extends Interval<C, D>> IntervalTree<C, D, I> build(Collection<I> intervals)
+    {
+        if (intervals == null || intervals.isEmpty())
+            return emptyTree();
+
+        return new IntervalTree<C, D, I>(intervals, null);
+    }
+
+    public static <C, D, I extends Interval<C, D>> Serializer<C, D, I> serializer(ISerializer<C> pointSerializer, ISerializer<D> dataSerializer, Constructor<I> constructor)
+    {
+        return new Serializer(pointSerializer, dataSerializer, constructor);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <C, D, I extends Interval<C, D>> IntervalTree<C, D, I> emptyTree()
+    {
+        return (IntervalTree<C, D, I>)EMPTY_TREE;
+    }
+
+    public Comparator<C> comparator()
+    {
+        return comparator;
+    }
+
+    public int intervalCount()
+    {
+        return count;
+    }
+
+    public boolean isEmpty()
+    {
+        return head == null;
+    }
+
+    public C max()
+    {
+        if (head == null)
+            throw new IllegalStateException();
+
+        return head.high;
+    }
+
+    public C min()
+    {
+        if (head == null)
+            throw new IllegalStateException();
+
+        return head.low;
+    }
+
+    public List<D> search(Interval<C, D> searchInterval)
+    {
+        if (head == null)
+            return Collections.<D>emptyList();
+
+        List<D> results = new ArrayList<D>();
+        head.searchInternal(searchInterval, results);
+        return results;
+    }
+
+    public List<D> search(C point)
+    {
+        return search(Interval.<C, D>create(point, point, null));
+    }
+
+    public Iterator<I> iterator()
+    {
+        if (head == null)
+            return Iterators.<I>emptyIterator();
+
+        return new TreeIterator(head);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "<" + Joiner.on(", ").join(this) + ">";
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if(!(o instanceof IntervalTree))
+            return false;
+        IntervalTree that = (IntervalTree)o;
+        return Iterators.elementsEqual(iterator(), that.iterator());
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        int result = comparator.hashCode();
+        for (Interval<C, D> interval : this)
+            result = 31 * result + interval.hashCode();
+        return result;
+    }
+
+    private int comparePoints(C point1, C point2)
+    {
+        if (comparator != null)
+        {
+            return comparator.compare(point1, point2);
+        }
+        else
+        {
+            assert point1 instanceof Comparable;
+            assert point2 instanceof Comparable;
+            return ((Comparable<C>)point1).compareTo(point2);
+        }
+    }
+
+    private boolean encloses(Interval<C, D> enclosing, Interval<C, D> enclosed)
+    {
+        return comparePoints(enclosing.min, enclosed.min) <= 0
+            && comparePoints(enclosing.max, enclosed.max) >= 0;
+    }
+
+    private boolean contains(Interval<C, D> interval, C point)
+    {
+        return comparePoints(interval.min, point) <= 0
+            && comparePoints(interval.max, point) >= 0;
+    }
+
+    private boolean intersects(Interval<C, D> interval1, Interval<C, D> interval2)
+    {
+        return contains(interval1, interval2.min) || contains(interval1, interval2.max);
+    }
+
+    private class IntervalNode
+    {
+        final C center;
+        final C low;
+        final C high;
+
+        final List<I> intersectsLeft;
+        final List<I> intersectsRight;
+
+        final IntervalNode left;
+        final IntervalNode right;
+
+        public IntervalNode(Collection<I> toBisect)
+        {
+            assert !toBisect.isEmpty();
+            logger.debug("Creating IntervalNode from {}", toBisect);
+
+            // Building IntervalTree with one interval will be a reasonably
+            // common case for range tombstones, so it's worth optimizing
+            if (toBisect.size() == 1)
+            {
+                I interval = toBisect.iterator().next();
+                low = interval.min;
+                center = interval.max;
+                high = interval.max;
+                List<I> l = Collections.singletonList(interval);
+                intersectsLeft = l;
+                intersectsRight = l;
+                left = null;
+                right = null;
+            }
+            else
+            {
+                // Find min, median and max
+                List<C> allEndpoints = new ArrayList<C>(toBisect.size() * 2);
+                for (I interval : toBisect)
+                {
+                    assert (comparator == null ? ((Comparable)interval.min).compareTo(interval.max)
+                                               : comparator.compare(interval.min, interval.max)) <= 0 : "Interval min > max";
+                    allEndpoints.add(interval.min);
+                    allEndpoints.add(interval.max);
+                }
+                if (comparator == null)
+                    Collections.sort(allEndpoints, comparator);
+                else
+                    Collections.sort((List<Comparable>)allEndpoints);
+
+                low = allEndpoints.get(0);
+                center = allEndpoints.get(toBisect.size());
+                high = allEndpoints.get(allEndpoints.size() - 1);
+
+                // Separate interval in intersecting center, left of center and right of center
+                List<I> intersects = new ArrayList<I>();
+                List<I> leftSegment = new ArrayList<I>();
+                List<I> rightSegment = new ArrayList<I>();
+
+                for (I candidate : toBisect)
+                {
+                    if (comparePoints(candidate.max, center) < 0)
+                        leftSegment.add(candidate);
+                    else if (comparePoints(candidate.min, center) > 0)
+                        rightSegment.add(candidate);
+                    else
+                        intersects.add(candidate);
+                }
+
+                intersectsLeft = minOrdering.sortedCopy(intersects);
+                intersectsRight = maxOrdering.reverse().sortedCopy(intersects);
+                left = leftSegment.isEmpty() ? null : new IntervalNode(leftSegment);
+                right = rightSegment.isEmpty() ? null : new IntervalNode(rightSegment);
+
+                assert (intersects.size() + leftSegment.size() + rightSegment.size()) == toBisect.size() :
+                        "intersects (" + String.valueOf(intersects.size()) +
+                        ") + leftSegment (" + String.valueOf(leftSegment.size()) +
+                        ") + rightSegment (" + String.valueOf(rightSegment.size()) +
+                        ") != toBisect (" + String.valueOf(toBisect.size()) + ")";
+            }
+        }
+
+        void searchInternal(Interval<C, D> searchInterval, List<D> results)
+        {
+            if (contains(searchInterval, center))
+            {
+                // Adds every interval contained in this node to the result set then search left and right for further
+                // overlapping intervals
+                for (Interval<C, D> interval : intersectsLeft)
+                    results.add(interval.data);
+
+                if (left != null)
+                    left.searchInternal(searchInterval, results);
+                if (right != null)
+                    right.searchInternal(searchInterval, results);
+            }
+            else if (comparePoints(center, searchInterval.min) < 0)
+            {
+                // Adds intervals i in intersects right as long as i.max >= searchInterval.min
+                // then search right
+                for (Interval<C, D> interval : intersectsRight)
+                {
+                    if (comparePoints(interval.max, searchInterval.min) >= 0)
+                        results.add(interval.data);
+                    else
+                        break;
+                }
+                if (right != null)
+                    right.searchInternal(searchInterval, results);
+            }
+            else
+            {
+                assert comparePoints(center, searchInterval.max) > 0;
+                // Adds intervals i in intersects left as long as i.min >= searchInterval.max
+                // then search left
+                for (Interval<C, D> interval : intersectsLeft)
+                {
+                    if (comparePoints(interval.min, searchInterval.max) <= 0)
+                        results.add(interval.data);
+                    else
+                        break;
+                }
+                if (left != null)
+                    left.searchInternal(searchInterval, results);
+            }
+        }
+    }
+
+    private class TreeIterator extends AbstractIterator<I>
+    {
+        private final Deque<IntervalNode> stack = new ArrayDeque<IntervalNode>();
+        private Iterator<I> current;
+
+        TreeIterator(IntervalNode node)
+        {
+            super();
+            gotoMinOf(node);
+        }
+
+        protected I computeNext()
+        {
+            if (current != null && current.hasNext())
+                return current.next();
+
+            IntervalNode node = stack.pollFirst();
+            if (node == null)
+                return endOfData();
+
+            current = node.intersectsLeft.iterator();
+
+            // We know this is the smaller not returned yet, but before doing
+            // its parent, we must do everyone on it's right.
+            gotoMinOf(node.right);
+
+            return computeNext();
+        }
+
+        private void gotoMinOf(IntervalNode node)
+        {
+            while (node != null)
+            {
+                stack.offerFirst(node);
+                node = node.left;
+            }
+
+        }
+    }
+
+    public static class Serializer<C, D, I extends Interval<C, D>> implements IVersionedSerializer<IntervalTree<C, D, I>>
+    {
+        private final ISerializer<C> pointSerializer;
+        private final ISerializer<D> dataSerializer;
+        private final Constructor<I> constructor;
+
+        private Serializer(ISerializer<C> pointSerializer, ISerializer<D> dataSerializer, Constructor<I> constructor)
+        {
+            this.pointSerializer = pointSerializer;
+            this.dataSerializer = dataSerializer;
+            this.constructor = constructor;
+        }
+
+        public void serialize(IntervalTree<C, D, I> it, DataOutput dos, int version) throws IOException
+        {
+            dos.writeInt(it.count);
+            for (Interval<C, D> interval : it)
+            {
+                pointSerializer.serialize(interval.min, dos);
+                pointSerializer.serialize(interval.max, dos);
+                dataSerializer.serialize(interval.data, dos);
+            }
+        }
+
+        /**
+         * Deserialize an IntervalTree whose keys use the natural ordering.
+         * Use deserialize(DataInput, int, Comparator) instead if the interval
+         * tree is to use a custom comparator, as the comparator is *not*
+         * serialized.
+         */
+        public IntervalTree<C, D, I> deserialize(DataInput dis, int version) throws IOException
+        {
+            return deserialize(dis, version, null);
+        }
+
+        public IntervalTree<C, D, I> deserialize(DataInput dis, int version, Comparator<C> comparator) throws IOException
+        {
+            try
+            {
+                int count = dis.readInt();
+                List<Interval<C, D>> intervals = new ArrayList<Interval<C, D>>(count);
+                for (int i = 0; i < count; i++)
+                {
+                    C min = pointSerializer.deserialize(dis);
+                    C max = pointSerializer.deserialize(dis);
+                    D data = dataSerializer.deserialize(dis);
+                    intervals.add(constructor.newInstance(min, max, data));
+                }
+                return new IntervalTree(intervals, comparator);
+            }
+            catch (InstantiationException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (InvocationTargetException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (IllegalAccessException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public long serializedSize(IntervalTree<C, D, I> it, TypeSizes typeSizes, int version)
+        {
+            long size = typeSizes.sizeof(0);
+            for (Interval<C, D> interval : it)
+            {
+                size += pointSerializer.serializedSize(interval.min, typeSizes);
+                size += pointSerializer.serializedSize(interval.max, typeSizes);
+                size += dataSerializer.serializedSize(interval.data, typeSizes);
+            }
+            return size;
+        }
+
+        public long serializedSize(IntervalTree<C, D, I> it, int version)
+        {
+            return serializedSize(it, TypeSizes.NATIVE, version);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java b/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java
deleted file mode 100644
index 1f65be5..0000000
--- a/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.IntervalTree;
-
-import com.google.common.collect.Ordering;
-
-public class Interval<T>
-{
-    public final Comparable min;
-    public final Comparable max;
-    public final T Data;
-
-    public Interval(Comparable min, Comparable max)
-    {
-        this.min = min;
-        this.max = max;
-        this.Data = null;
-    }
-
-    public Interval(Comparable min, Comparable max, T data)
-    {
-        this.min = min;
-        this.max = max;
-        this.Data = data;
-    }
-
-    public boolean encloses(Interval interval)
-    {
-        return (this.min.compareTo(interval.min) <= 0
-                && this.max.compareTo(interval.max) >= 0);
-    }
-
-    public boolean contains(Comparable point)
-    {
-        return (this.min.compareTo(point) <= 0
-                && this.max.compareTo(point) >= 0);
-    }
-
-    public boolean intersects(Interval interval)
-    {
-        return this.contains(interval.min) || this.contains(interval.max);
-    }
-
-
-    public static final Ordering<Interval> minOrdering = new Ordering<Interval>()
-    {
-        public int compare(Interval interval, Interval interval1)
-        {
-            return interval.min.compareTo(interval1.min);
-        }
-    };
-
-    public static final Ordering<Interval> maxOrdering = new Ordering<Interval>()
-    {
-        public int compare(Interval interval, Interval interval1)
-        {
-            return interval.max.compareTo(interval1.max);
-        }
-    };
-
-    public String toString()
-    {
-        return String.format("Interval(%s, %s)", min, max);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java b/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java
deleted file mode 100644
index 1c8049d..0000000
--- a/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.IntervalTree;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IntervalNode
-{
-    private static final Logger logger = LoggerFactory.getLogger(IntervalNode.class);
-
-    Comparable v_pt;
-    Comparable v_min;
-    Comparable v_max;
-    List<Interval> intersects_left;
-    List<Interval> intersects_right;
-    IntervalNode left = null;
-    IntervalNode right = null;
-
-    public IntervalNode(List<Interval> toBisect)
-    {
-        logger.debug("Creating IntervalNode from {}", toBisect);
-
-        if (toBisect.size() > 0)
-        {
-            findMinMedianMax(toBisect);
-            List<Interval> intersects = getIntersectingIntervals(toBisect);
-            intersects_left = Interval.minOrdering.sortedCopy(intersects);
-            intersects_right = Interval.maxOrdering.reverse().sortedCopy(intersects);
-            //if i.max < v_pt then it goes to the left subtree
-            List<Interval> leftSegment = getLeftIntervals(toBisect);
-            List<Interval> rightSegment = getRightIntervals(toBisect);
-            assert (intersects.size() + leftSegment.size() + rightSegment.size()) == toBisect.size() :
-                    "intersects (" + String.valueOf(intersects.size()) +
-                    ") + leftSegment (" + String.valueOf(leftSegment.size()) +
-                    ") + rightSegment (" + String.valueOf(rightSegment.size()) +
-                    ") != toBisect (" + String.valueOf(toBisect.size()) + ")";
-            if (leftSegment.size() > 0)
-                this.left = new IntervalNode(leftSegment);
-            if (rightSegment.size() > 0)
-                this.right = new IntervalNode(rightSegment);
-        }
-    }
-
-    public List<Interval> getLeftIntervals(List<Interval> candidates)
-    {
-        List<Interval> retval = new ArrayList<Interval>();
-        for (Interval candidate : candidates)
-        {
-            if (candidate.max.compareTo(v_pt) < 0)
-                retval.add(candidate);
-        }
-        return retval;
-    }
-
-    public List<Interval> getRightIntervals(List<Interval> candidates)
-    {
-        List<Interval> retval = new ArrayList<Interval>();
-        for (Interval candidate : candidates)
-        {
-            if (candidate.min.compareTo(v_pt) > 0)
-                retval.add(candidate);
-        }
-        return retval;
-    }
-
-    public List<Interval> getIntersectingIntervals(List<Interval> candidates)
-    {
-        List<Interval> retval = new ArrayList<Interval>();
-        for (Interval candidate : candidates)
-        {
-            if (candidate.min.compareTo(v_pt) <= 0
-                && candidate.max.compareTo(v_pt) >= 0)
-                retval.add(candidate);
-        }
-        return retval;
-    }
-
-    public void findMinMedianMax(List<Interval> intervals)
-    {
-        if (intervals.size() > 0)
-        {
-            List<Comparable> allEndpoints = new ArrayList<Comparable>(intervals.size() * 2);
-
-            for (Interval interval : intervals)
-            {
-                assert interval.min.compareTo(interval.max) <= 0 : "Interval min > max";
-                allEndpoints.add(interval.min);
-                allEndpoints.add(interval.max);
-            }
-            Collections.sort(allEndpoints);
-            v_pt = allEndpoints.get(intervals.size());
-            v_min = allEndpoints.get(0);
-            v_max = allEndpoints.get(allEndpoints.size() - 1);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java b/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java
deleted file mode 100644
index ae73c26..0000000
--- a/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.IntervalTree;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-public class IntervalTree<T>
-{
-    private final IntervalNode head;
-
-    public IntervalTree()
-    {
-        head = null;
-    }
-
-    public IntervalTree(List<Interval> intervals)
-    {
-        head = new IntervalNode(intervals);
-    }
-
-    public Comparable max()
-    {
-        return head.v_max;
-    }
-
-    public Comparable min()
-    {
-        return head.v_min;
-    }
-
-    public List<T> search(Interval<T> searchInterval)
-    {
-        List<T> results = new ArrayList<T>();
-        searchInternal(head, searchInterval, results);
-        return results;
-    }
-
-    protected void searchInternal(IntervalNode node, Interval<T> searchInterval, List<T> results)
-    {
-        if (null == head)
-            return;
-        if (null == node || node.v_pt == null)
-            return;
-        //if searchInterval.contains(node.v_pt)
-        //then add every interval contained in this node to the result set then search left and right for further
-        //overlapping intervals
-        if (searchInterval.contains(node.v_pt))
-        {
-            for (Interval<T> interval : node.intersects_left)
-            {
-                results.add(interval.Data);
-            }
-
-            searchInternal(node.left, searchInterval, results);
-            searchInternal(node.right, searchInterval, results);
-            return;
-        }
-
-        //if v.pt < searchInterval.left
-        //add intervals in v with v[i].right >= searchInterval.left
-        //L contains no overlaps
-        //R May
-        if (node.v_pt.compareTo(searchInterval.min) < 0)
-        {
-            for (Interval<T> interval : node.intersects_right)
-            {
-                if (interval.max.compareTo(searchInterval.min) >= 0)
-                {
-                    results.add(interval.Data);
-                }
-                else break;
-            }
-            searchInternal(node.right, searchInterval, results);
-            return;
-        }
-
-        //if v.pt > searchInterval.right
-        //add intervals in v with [i].left <= searchInterval.right
-        //R contains no overlaps
-        //L May
-        if (node.v_pt.compareTo(searchInterval.max) > 0)
-        {
-            for (Interval<T> interval : node.intersects_left)
-            {
-                if (interval.min.compareTo(searchInterval.max) <= 0)
-                {
-                    results.add(interval.Data);
-                }
-                else break;
-            }
-            searchInternal(node.left, searchInterval, results);
-            return;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 2dd0e36..bc67e3a 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -276,9 +276,18 @@ public class Util
 
     public static ByteBuffer serializeForSSTable(ColumnFamily cf)
     {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        cf.serializer.serializeForSSTable(cf, dos);
-        return ByteBuffer.wrap(baos.toByteArray());
+        try
+        {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(baos);
+            DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dos);
+            dos.writeInt(cf.getColumnCount());
+            new ColumnIndex.Builder(cf, ByteBufferUtil.EMPTY_BYTE_BUFFER, cf.getColumnCount(), dos).build(cf);
+            return ByteBuffer.wrap(baos.toByteArray());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index f6f3001..e7fdead 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -42,7 +42,7 @@ public class CacheProviderTest extends SchemaLoader
     private String tableName = "Keyspace1";
     private String cfName = "Standard1";
 
-    private void simpleCase(ColumnFamily cf, ICache<String, ColumnFamily> cache)
+    private void simpleCase(ColumnFamily cf, ICache<String, IRowCacheEntry> cache)
     {
         cache.put(key1, cf);
         assert cache.get(key1) != null;
@@ -56,14 +56,15 @@ public class CacheProviderTest extends SchemaLoader
         assertEquals(CAPACITY, cache.size());
     }
 
-    private void assertDigests(ColumnFamily one, ColumnFamily two)
+    private void assertDigests(IRowCacheEntry one, ColumnFamily two)
     {
         // CF does not implement .equals
-        assert ColumnFamily.digest(one).equals(ColumnFamily.digest(two));
+        assert one instanceof ColumnFamily;
+        assert ColumnFamily.digest((ColumnFamily)one).equals(ColumnFamily.digest(two));
     }
 
     // TODO this isn't terribly useful
-    private void concurrentCase(final ColumnFamily cf, final ICache<String, ColumnFamily> cache) throws InterruptedException
+    private void concurrentCase(final ColumnFamily cf, final ICache<String, IRowCacheEntry> cache) throws InterruptedException
     {
         Runnable runable = new Runnable()
         {
@@ -102,7 +103,7 @@ public class CacheProviderTest extends SchemaLoader
     @Test
     public void testHeapCache() throws InterruptedException
     {
-        ICache<String, ColumnFamily> cache = ConcurrentLinkedHashCache.create(CAPACITY);
+        ICache<String, IRowCacheEntry> cache = ConcurrentLinkedHashCache.create(CAPACITY);
         ColumnFamily cf = createCF();
         simpleCase(cf, cache);
         concurrentCase(cf, cache);
@@ -111,7 +112,7 @@ public class CacheProviderTest extends SchemaLoader
     @Test
     public void testSerializingCache() throws InterruptedException
     {
-        ICache<String, ColumnFamily> cache = new SerializingCache<String, ColumnFamily>(CAPACITY, false, ColumnFamily.serializer);
+        ICache<String, IRowCacheEntry> cache = new SerializingCache<String, IRowCacheEntry>(CAPACITY, false, new SerializingCacheProvider.RowCacheSerializer());
         ColumnFamily cf = createCF();
         simpleCase(cf, cache);
         concurrentCase(cf, cache);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index b32f2dd..2da61f9 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -222,7 +222,7 @@ public class DefsTest extends SchemaLoader
         ColumnFamilyStore store = Table.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
         assert store != null;
         store.forceBlockingFlush();
-        store.getFlushPath(1024, Descriptor.CURRENT_VERSION);
+        store.getFlushPath(1024, Descriptor.Version.CURRENT);
         assert store.directories.sstableLister().list().size() > 0;
 
         MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
index 675b1cf..1b41958 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.net.MessagingService;
 import static org.apache.cassandra.Util.column;
 import static org.junit.Assert.assertEquals;
 
@@ -39,6 +40,8 @@ import org.apache.cassandra.utils.HeapAllocator;
 
 public class ColumnFamilyTest extends SchemaLoader
 {
+    static int version = MessagingService.current_version;
+
     // TODO test SuperColumns more
 
     @Test
@@ -49,10 +52,10 @@ public class ColumnFamilyTest extends SchemaLoader
         cf = ColumnFamily.create("Keyspace1", "Standard1");
         cf.addColumn(column("C", "v", 1));
         DataOutputBuffer bufOut = new DataOutputBuffer();
-        ColumnFamily.serializer.serialize(cf, bufOut);
+        ColumnFamily.serializer.serialize(cf, bufOut, version);
 
         ByteArrayInputStream bufIn = new ByteArrayInputStream(bufOut.getData(), 0, bufOut.getLength());
-        cf = ColumnFamily.serializer.deserialize(new DataInputStream(bufIn));
+        cf = ColumnFamily.serializer.deserialize(new DataInputStream(bufIn), version);
         assert cf != null;
         assert cf.metadata().cfName.equals("Standard1");
         assert cf.getSortedColumns().size() == 1;
@@ -76,11 +79,11 @@ public class ColumnFamilyTest extends SchemaLoader
         {
             cf.addColumn(column(cName, map.get(cName), 314));
         }
-        ColumnFamily.serializer.serialize(cf, bufOut);
+        ColumnFamily.serializer.serialize(cf, bufOut, version);
 
         // verify
         ByteArrayInputStream bufIn = new ByteArrayInputStream(bufOut.getData(), 0, bufOut.getLength());
-        cf = ColumnFamily.serializer.deserialize(new DataInputStream(bufIn));
+        cf = ColumnFamily.serializer.deserialize(new DataInputStream(bufIn), version);
         for (String cName : map.navigableKeySet())
         {
             ByteBuffer val = cf.getColumn(ByteBufferUtil.bytes(cName)).value();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/db/RowIterationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIterationTest.java b/test/unit/org/apache/cassandra/db/RowIterationTest.java
index 99e54ff..26f11c8 100644
--- a/test/unit/org/apache/cassandra/db/RowIterationTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIterationTest.java
@@ -72,7 +72,7 @@ public class RowIterationTest extends SchemaLoader
         RowMutation rm = new RowMutation(TABLE1, key.key);
         rm.delete(new QueryPath(CF_NAME, null, null), 0);
         rm.add(new QueryPath(CF_NAME, null, ByteBufferUtil.bytes("c")), ByteBufferUtil.bytes("values"), 0L);
-        int tstamp1 = rm.getColumnFamilies().iterator().next().getLocalDeletionTime();
+        DeletionInfo delInfo1 = rm.getColumnFamilies().iterator().next().deletionInfo();
         rm.apply();
         store.forceBlockingFlush();
 
@@ -80,13 +80,13 @@ public class RowIterationTest extends SchemaLoader
         rm = new RowMutation(TABLE1, key.key);
         rm.delete(new QueryPath(CF_NAME, null, null), 1);
         rm.add(new QueryPath(CF_NAME, null, ByteBufferUtil.bytes("c")), ByteBufferUtil.bytes("values"), 1L);
-        int tstamp2 = rm.getColumnFamilies().iterator().next().getLocalDeletionTime();
+        DeletionInfo delInfo2 = rm.getColumnFamilies().iterator().next().deletionInfo();
+        assert delInfo2.getTopLevelDeletion().markedForDeleteAt == 1L;
         rm.apply();
         store.forceBlockingFlush();
 
         ColumnFamily cf = Util.getRangeSlice(store).iterator().next().cf;
-        assert cf.getMarkedForDeleteAt() == 1L;
-        assert cf.getLocalDeletionTime() == tstamp2;
+        assert cf.deletionInfo().equals(delInfo2);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index b3bc516..c176abf 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -39,11 +39,12 @@ public class RowTest extends SchemaLoader
         cf1.addColumn(column("one", "onev", 0));
 
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf2.delete(0, 0);
+        DeletionInfo delInfo = new DeletionInfo(0, 0);
+        cf2.delete(delInfo);
 
         ColumnFamily cfDiff = cf1.diff(cf2);
         assertEquals(cfDiff.getColumnCount(), 0);
-        assertEquals(cfDiff.getMarkedForDeleteAt(), 0);
+        assertEquals(cfDiff.deletionInfo(), delInfo);
     }
 
     @Test
@@ -53,11 +54,12 @@ public class RowTest extends SchemaLoader
         sc1.addColumn(column("subcolumn", "A", 0));
 
         SuperColumn sc2 = new SuperColumn(ByteBufferUtil.bytes("one"), AsciiType.instance);
-        sc2.delete(0, 0);
+        DeletionInfo delInfo = new DeletionInfo(0, 0);
+        sc2.delete(delInfo);
 
         SuperColumn scDiff = (SuperColumn)sc1.diff(sc2);
         assertEquals(scDiff.getSubColumns().size(), 0);
-        assertEquals(scDiff.getMarkedForDeleteAt(), 0);
+        assertEquals(scDiff.deletionInfo(), delInfo);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 7cc431d..0d86ec9 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -136,7 +136,7 @@ public class ScrubTest extends SchemaLoader
         RowMutation rm;
         rm = new RowMutation(TABLE, ByteBufferUtil.bytes(1));
         ColumnFamily cf = ColumnFamily.create(TABLE, CF3);
-        cf.delete(0, 1); // expired tombstone
+        cf.delete(new DeletionInfo(0, 1)); // expired tombstone
         rm.add(cf);
         rm.applyUnsafe();
         cfs.forceBlockingFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/CompactSerializerTest.java b/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
index c3aa220..f32d553 100644
--- a/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
@@ -79,6 +79,8 @@ public class CompactSerializerTest extends SchemaLoader
         expectedClassNames.add("LeafSerializer");
         expectedClassNames.add("MerkleTreeSerializer");
         expectedClassNames.add("UUIDSerializer");
+        expectedClassNames.add("Serializer");
+        expectedClassNames.add("ColumnFamilySerializer");
 
         discoveredClassNames = new ArrayList<String>();
         String cp = System.getProperty("java.class.path");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
index ab1b9e3..2c56e0b 100644
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -120,17 +121,16 @@ public class LazilyCompactedRowTest extends SchemaLoader
             // cf metadata
             ColumnFamily cf1 = ColumnFamily.create(cfs.metadata);
             ColumnFamily cf2 = ColumnFamily.create(cfs.metadata);
-            ColumnFamily.serializer.deserializeFromSSTableNoColumns(cf1, in1);
-            ColumnFamily.serializer.deserializeFromSSTableNoColumns(cf2, in2);
-            assert cf1.getLocalDeletionTime() == cf2.getLocalDeletionTime();
-            assert cf1.getMarkedForDeleteAt() == cf2.getMarkedForDeleteAt();
+            cf1.delete(DeletionInfo.serializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT));
+            cf2.delete(DeletionInfo.serializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT));
+            assert cf1.deletionInfo().equals(cf2.deletionInfo());
             // columns
             int columns = in1.readInt();
             assert columns == in2.readInt();
             for (int i = 0; i < columns; i++)
             {
-                IColumn c1 = cf1.getColumnSerializer().deserialize(in1);
-                IColumn c2 = cf2.getColumnSerializer().deserialize(in2);
+                IColumn c1 = (IColumn)cf1.getOnDiskSerializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT);
+                IColumn c2 = (IColumn)cf2.getOnDiskSerializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT);
                 assert c1.equals(c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator);
             }
             // that should be everything

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
index ee40501..545a9ec 100644
--- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
@@ -31,8 +31,8 @@ public class DescriptorTest
     {
         Descriptor descriptor = Descriptor.fromFilename("Keyspace1-userActionUtilsKey-9-Data.db");
 
-        assert descriptor.version.equals(Descriptor.LEGACY_VERSION);
-        assert descriptor.filterType == FilterFactory.Type.SHA;
+        assert descriptor.version.equals(Descriptor.Version.LEGACY);
+        assert descriptor.version.filterType == FilterFactory.Type.SHA;
     }
 
     @Test
@@ -40,27 +40,27 @@ public class DescriptorTest
     {
         // letter only
         Descriptor desc = Descriptor.fromFilename("Keyspace1-Standard1-h-1-Data.db");
-        assert "h".equals(desc.version);
+        assert "h".equals(desc.version.toString());
 
         // multiple letters
         desc = Descriptor.fromFilename("Keyspace1-Standard1-ha-1-Data.db");
-        assert "ha".equals(desc.version);
+        assert "ha".equals(desc.version.toString());
 
         // hypothetical two-letter g version
         desc = Descriptor.fromFilename("Keyspace1-Standard1-gz-1-Data.db");
-        assert "gz".equals(desc.version);
-        assert !desc.tracksMaxTimestamp;
+        assert "gz".equals(desc.version.toString());
+        assert !desc.version.tracksMaxTimestamp;
     }
 
     @Test
     public void testMurmurBloomFilter()
     {
         Descriptor desc = Descriptor.fromFilename("Keyspace1-Standard1-hz-1-Data.db");
-        assertEquals("hz", desc.version);
-        assertEquals(desc.filterType, FilterFactory.Type.MURMUR2);
+        assertEquals("hz", desc.version.toString());
+        assertEquals(desc.version.filterType, FilterFactory.Type.MURMUR2);
 
         desc = Descriptor.fromFilename("Keyspace1-Standard1-ia-1-Data.db");
-        assertEquals("ia", desc.version);
-        assertEquals(desc.filterType, FilterFactory.Type.MURMUR3);
+        assertEquals("ia", desc.version.toString());
+        assertEquals(desc.version.filterType, FilterFactory.Type.MURMUR3);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index ac1e497..a31ebb6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -89,7 +89,7 @@ public class LegacySSTableTest extends SchemaLoader
     public void testVersions() throws Throwable
     {
         for (File version : LEGACY_SSTABLE_ROOT.listFiles())
-            if (Descriptor.versionValidate(version.getName()))
+            if (Descriptor.Version.validate(version.getName()))
                 testVersion(version.getName());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
index a549352..562be07 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
@@ -59,7 +59,7 @@ public class SSTableMetadataSerializerTest
 
         ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
         DataInputStream dis = new DataInputStream(byteInput);
-        Descriptor desc = new Descriptor(Descriptor.CURRENT_VERSION, new File("."), "", "", 0, false);
+        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, new File("."), "", "", 0, false);
         SSTableMetadata stats = SSTableMetadata.serializer.deserialize(dis, desc);
 
         assert stats.estimatedRowSize.equals(originalMetadata.estimatedRowSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 72239dd..d78d839 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -30,8 +30,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.apache.cassandra.Util;
@@ -48,7 +49,7 @@ public class SSTableUtils
     public static ColumnFamily createCF(long mfda, int ldt, IColumn... cols)
     {
         ColumnFamily cf = ColumnFamily.create(TABLENAME, CFNAME);
-        cf.delete(ldt, mfda);
+        cf.delete(new DeletionInfo(mfda, ldt));
         for (IColumn col : cols)
             cf.addColumn(col);
         return cf;
@@ -81,15 +82,15 @@ public class SSTableUtils
         SSTableScanner srhs = rhs.getDirectScanner();
         while (slhs.hasNext())
         {
-            IColumnIterator ilhs = slhs.next();
+            OnDiskAtomIterator ilhs = slhs.next();
             assert srhs.hasNext() : "LHS contained more rows than RHS";
-            IColumnIterator irhs = srhs.next();
+            OnDiskAtomIterator irhs = srhs.next();
             assertContentEquals(ilhs, irhs);
         }
         assert !srhs.hasNext() : "RHS contained more rows than LHS";
     }
 
-    public static void assertContentEquals(IColumnIterator lhs, IColumnIterator rhs) throws IOException
+    public static void assertContentEquals(OnDiskAtomIterator lhs, OnDiskAtomIterator rhs) throws IOException
     {
         assertEquals(lhs.getKey(), rhs.getKey());
         // check metadata
@@ -103,14 +104,13 @@ public class SSTableUtils
         }
         else if (rcf == null)
             throw new AssertionError("RHS had no content for " + lhs.getKey());
-        assertEquals(lcf.getMarkedForDeleteAt(), rcf.getMarkedForDeleteAt());
-        assertEquals(lcf.getLocalDeletionTime(), rcf.getLocalDeletionTime());
+        assertEquals(lcf.deletionInfo(), rcf.deletionInfo());
         // iterate columns
         while (lhs.hasNext())
         {
-            IColumn clhs = lhs.next();
+            IColumn clhs = (IColumn)lhs.next();
             assert rhs.hasNext() : "LHS contained more columns than RHS for " + lhs.getKey();
-            IColumn crhs = rhs.next();
+            IColumn crhs = (IColumn)rhs.next();
 
             assertEquals("Mismatched columns for " + lhs.getKey(), clhs, crhs);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/service/RowResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RowResolverTest.java b/test/unit/org/apache/cassandra/service/RowResolverTest.java
index caacfa0..3c530f1 100644
--- a/test/unit/org/apache/cassandra/service/RowResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/RowResolverTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DeletionInfo;
 import org.apache.cassandra.db.SuperColumn;
 
 import static junit.framework.Assert.*;
@@ -104,25 +105,25 @@ public class RowResolverTest extends SchemaLoader
         cf1.addColumn(column("one", "A", 0));
 
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf2.delete((int) (System.currentTimeMillis() / 1000), 1);
+        cf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000)));
 
         ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
         // no columns in the cf
         assertColumns(resolved);
         assertTrue(resolved.isMarkedForDelete());
-        assertEquals(1, resolved.getMarkedForDeleteAt());
+        assertEquals(1, resolved.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 
         ColumnFamily scf1 = ColumnFamily.create("Keyspace1", "Super1");
         scf1.addColumn(superColumn(scf1, "super-foo", column("one", "A", 0)));
 
         ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");
-        scf2.delete((int) (System.currentTimeMillis() / 1000), 1);
+        scf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000)));
 
         ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2));
         // no columns in the cf
         assertColumns(superResolved);
         assertTrue(superResolved.isMarkedForDelete());
-        assertEquals(1, superResolved.getMarkedForDeleteAt());
+        assertEquals(1, superResolved.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
     }
 
     @Test
@@ -131,17 +132,17 @@ public class RowResolverTest extends SchemaLoader
         // subcolumn is newer than a tombstone on its parent, but not newer than the row deletion
         ColumnFamily scf1 = ColumnFamily.create("Keyspace1", "Super1");
         SuperColumn sc = superColumn(scf1, "super-foo", column("one", "A", 1));
-        sc.delete((int) (System.currentTimeMillis() / 1000), 0);
+        sc.delete(new DeletionInfo(0L, (int) (System.currentTimeMillis() / 1000)));
         scf1.addColumn(sc);
 
         ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");
-        scf2.delete((int) (System.currentTimeMillis() / 1000), 2);
+        scf2.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
 
         ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2));
         // no columns in the cf
         assertColumns(superResolved);
         assertTrue(superResolved.isMarkedForDelete());
-        assertEquals(2, superResolved.getMarkedForDeleteAt());
+        assertEquals(2, superResolved.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
     }
 
     @Test
@@ -150,7 +151,7 @@ public class RowResolverTest extends SchemaLoader
         // deletes and columns with interleaved timestamp, with out of order return sequence
 
         ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf1.delete((int) (System.currentTimeMillis() / 1000), 0);
+        cf1.delete(new DeletionInfo(0L, (int) (System.currentTimeMillis() / 1000)));
 
         // these columns created after the previous deletion
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
@@ -162,18 +163,18 @@ public class RowResolverTest extends SchemaLoader
         cf3.addColumn(column("two", "B", 3));
 
         ColumnFamily cf4 = ColumnFamily.create("Keyspace1", "Standard1");
-        cf4.delete((int) (System.currentTimeMillis() / 1000), 2);
+        cf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
 
         ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4));
         // will have deleted marker and one column
         assertColumns(resolved, "two");
         assertColumn(resolved, "two", "B", 3);
         assertTrue(resolved.isMarkedForDelete());
-        assertEquals(2, resolved.getMarkedForDeleteAt());
+        assertEquals(2, resolved.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 
 
         ColumnFamily scf1 = ColumnFamily.create("Keyspace1", "Super1");
-        scf1.delete((int) (System.currentTimeMillis() / 1000), 0);
+        scf1.delete(new DeletionInfo(0L, (int) (System.currentTimeMillis() / 1000)));
 
         // these columns created after the previous deletion
         ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");
@@ -185,7 +186,7 @@ public class RowResolverTest extends SchemaLoader
         scf3.addColumn(superColumn(scf3, "super2", column("three", "A", 3), column("four", "A", 3)));
 
         ColumnFamily scf4 = ColumnFamily.create("Keyspace1", "Super1");
-        scf4.delete((int) (System.currentTimeMillis() / 1000), 2);
+        scf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
 
         ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2, scf3, scf4));
         // will have deleted marker and two super cols
@@ -199,6 +200,6 @@ public class RowResolverTest extends SchemaLoader
         assertSubColumn(superResolved, "super2", "four", "A", 3);
 
         assertTrue(superResolved.isMarkedForDelete());
-        assertEquals(2, superResolved.getMarkedForDeleteAt());
+        assertEquals(2, superResolved.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index 7aa43fe..ff76ef2 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.Collections;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamily;
@@ -207,6 +208,7 @@ public class SSTableExportTest extends SchemaLoader
         reader = SSTableReader.open(Descriptor.fromFilename(tempSS2.getPath()));
         QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), new QueryPath("Standard1", null, null), ByteBufferUtil.bytes("name"));
         ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
+        qf.collateOnDiskAtom(cf, Collections.singletonList(qf.getSSTableColumnIterator(reader)), Integer.MIN_VALUE);
         assertTrue(cf != null);
         assertTrue(cf.getColumn(ByteBufferUtil.bytes("name")).value().equals(hexToBytes("76616c")));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableImportTest.java b/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
index a092511..1b59696 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collections;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,7 +33,7 @@ import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -60,9 +61,9 @@ public class SSTableImportTest extends SchemaLoader
         // Verify results
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
         QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), new QueryPath("Standard1"));
-        IColumnIterator iter = qf.getSSTableColumnIterator(reader);
+        OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         ColumnFamily cf = iter.getColumnFamily();
-        while (iter.hasNext()) cf.addColumn(iter.next());
+        while (iter.hasNext()) cf.addAtom(iter.next());
         assert cf.getColumn(ByteBufferUtil.bytes("colAA")).value().equals(hexToBytes("76616c4141"));
         assert !(cf.getColumn(ByteBufferUtil.bytes("colAA")) instanceof DeletedColumn);
         IColumn expCol = cf.getColumn(ByteBufferUtil.bytes("colAC"));
@@ -89,9 +90,9 @@ public class SSTableImportTest extends SchemaLoader
         // Verify results
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
         QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), new QueryPath("Standard1"));
-        IColumnIterator iter = qf.getSSTableColumnIterator(reader);
+        OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         ColumnFamily cf = iter.getColumnFamily();
-        while (iter.hasNext()) cf.addColumn(iter.next());
+        while (iter.hasNext()) cf.addAtom(iter.next());
         assert cf.getColumn(ByteBufferUtil.bytes("colAA")).value().equals(hexToBytes("76616c4141"));
         assert !(cf.getColumn(ByteBufferUtil.bytes("colAA")) instanceof DeletedColumn);
         IColumn expCol = cf.getColumn(ByteBufferUtil.bytes("colAC"));
@@ -111,6 +112,7 @@ public class SSTableImportTest extends SchemaLoader
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
         QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), new QueryPath("Super4", null, null), ByteBufferUtil.bytes("superA"));
         ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
+        qf.collateOnDiskAtom(cf, Collections.singletonList(qf.getSSTableColumnIterator(reader)), Integer.MIN_VALUE);
         IColumn superCol = cf.getColumn(ByteBufferUtil.bytes("superA"));
         assert superCol != null;
         assert superCol.getSubColumns().size() > 0;
@@ -143,9 +145,9 @@ public class SSTableImportTest extends SchemaLoader
         // Verify results
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
         QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), new QueryPath("Counter1"));
-        IColumnIterator iter = qf.getSSTableColumnIterator(reader);
+        OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         ColumnFamily cf = iter.getColumnFamily();
-        while (iter.hasNext()) cf.addColumn(iter.next());
+        while (iter.hasNext()) cf.addAtom(iter.next());
         IColumn c = cf.getColumn(ByteBufferUtil.bytes("colAA"));
         assert c instanceof CounterColumn: c;
         assert ((CounterColumn) c).total() == 42;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java b/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
index f7ab26f..7fa4395 100644
--- a/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
+++ b/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.vint.EncodedDataInputStream;
 import org.apache.cassandra.utils.vint.EncodedDataOutputStream;
 
@@ -41,6 +42,8 @@ public class EncodedStreamsTest extends SchemaLoader
     private String counterCFName = "Counter1";
     private String superCFName = "Super1";
 
+    private int version = MessagingService.current_version;
+
     @Test
     public void testStreams() throws IOException
     {
@@ -120,13 +123,13 @@ public class EncodedStreamsTest extends SchemaLoader
     {
         ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
         EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
-        ColumnFamily.serializer.serialize(createCF(), odos);
+        ColumnFamily.serializer.serialize(createCF(), odos, version);
 
         ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
         EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
-        ColumnFamily cf = ColumnFamily.serializer.deserialize(odis);
+        ColumnFamily cf = ColumnFamily.serializer.deserialize(odis, version);
         Assert.assertEquals(cf, createCF());
-        Assert.assertEquals(byteArrayOStream1.size(), (int) ColumnFamily.serializer.serializedSize(cf, TypeSizes.VINT));
+        Assert.assertEquals(byteArrayOStream1.size(), (int) ColumnFamily.serializer.serializedSize(cf, TypeSizes.VINT, version));
     }
 
     @Test
@@ -134,13 +137,13 @@ public class EncodedStreamsTest extends SchemaLoader
     {
         ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
         EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
-        ColumnFamily.serializer.serialize(createCounterCF(), odos);
+        ColumnFamily.serializer.serialize(createCounterCF(), odos, version);
 
         ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
         EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
-        ColumnFamily cf = ColumnFamily.serializer.deserialize(odis);
+        ColumnFamily cf = ColumnFamily.serializer.deserialize(odis, version);
         Assert.assertEquals(cf, createCounterCF());
-        Assert.assertEquals(byteArrayOStream1.size(), (int) ColumnFamily.serializer.serializedSize(cf, TypeSizes.VINT));
+        Assert.assertEquals(byteArrayOStream1.size(), (int) ColumnFamily.serializer.serializedSize(cf, TypeSizes.VINT, version));
     }
 
     @Test
@@ -148,13 +151,13 @@ public class EncodedStreamsTest extends SchemaLoader
     {
         ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream();
         EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1);
-        ColumnFamily.serializer.serialize(createSuperCF(), odos);
+        ColumnFamily.serializer.serialize(createSuperCF(), odos, version);
 
         ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray());
         EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1));
-        ColumnFamily cf = ColumnFamily.serializer.deserialize(odis);
+        ColumnFamily cf = ColumnFamily.serializer.deserialize(odis, version);
         Assert.assertEquals(cf, createSuperCF());
-        Assert.assertEquals(byteArrayOStream1.size(), (int) ColumnFamily.serializer.serializedSize(cf, TypeSizes.VINT));
+        Assert.assertEquals(byteArrayOStream1.size(), (int) ColumnFamily.serializer.serializedSize(cf, TypeSizes.VINT, version));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/utils/IntervalTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/IntervalTest.java b/test/unit/org/apache/cassandra/utils/IntervalTest.java
deleted file mode 100644
index 1141dd9..0000000
--- a/test/unit/org/apache/cassandra/utils/IntervalTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.cassandra.utils;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import junit.framework.TestCase;
-import org.apache.cassandra.utils.IntervalTree.Interval;
-import org.junit.Test;
-
-public class IntervalTest extends TestCase
-{
-    @Test
-    public void testEncloses() throws Exception
-    {
-        Interval interval = new Interval(0,5,null);
-        Interval interval1 = new Interval(0, 10, null);
-        Interval interval2 = new Interval(5,10,null);
-        Interval interval3 = new Interval(0, 11, null);
-
-
-        assertTrue(interval.encloses(interval));
-        assertTrue(interval1.encloses(interval));
-        assertFalse(interval.encloses(interval2));
-        assertTrue(interval1.encloses(interval2));
-        assertFalse(interval1.encloses(interval3));
-    }
-    @Test
-    public void testContains() throws Exception
-    {
-        Interval interval = new Interval(0, 5, null);
-        assertTrue(interval.contains(0));
-        assertTrue(interval.contains(5));
-        assertFalse(interval.contains(-1));
-        assertFalse(interval.contains(6));
-    }
-    @Test
-    public void testIntersects() throws Exception
-    {
-        Interval interval = new Interval(0,5,null);
-        Interval interval1 = new Interval(0, 10, null);
-        Interval interval2 = new Interval(5,10,null);
-        Interval interval3 = new Interval(0, 11, null);
-        Interval interval5 = new Interval(6,12,null);
-
-        assertTrue(interval.intersects(interval1));
-        assertTrue(interval.intersects(interval2));
-        assertTrue(interval.intersects(interval3));
-        assertFalse(interval.intersects(interval5));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d34917d/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
index cdf85dd..8ac13ec 100644
--- a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
@@ -22,73 +22,152 @@ package org.apache.cassandra.utils;
 
 
 import junit.framework.TestCase;
-import org.apache.cassandra.utils.IntervalTree.*;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.io.*;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.IVersionedSerializer;
 
 public class IntervalTreeTest extends TestCase
 {
     @Test
     public void testSearch() throws Exception
     {
-        List<Interval> intervals = new ArrayList<Interval>();
-
-        intervals.add(new Interval(-300, -200));
-        intervals.add(new Interval(-3, -2));
-        intervals.add(new Interval(1,2));
-        intervals.add(new Interval(3,6));
-        intervals.add(new Interval(2,4));
-        intervals.add(new Interval(5,7));
-        intervals.add(new Interval(1,3));
-        intervals.add(new Interval(4,6));
-        intervals.add(new Interval(8,9));
-        intervals.add(new Interval(15,20));
-        intervals.add(new Interval(40,50));
-        intervals.add(new Interval(49,60));
-
-
-        IntervalTree it = new IntervalTree(intervals);
-
-        assertEquals(3,it.search(new Interval(4,4)).size());
-
-        assertEquals(4, it.search(new Interval(4, 5)).size());
-
-        assertEquals(7, it.search(new Interval(-1,10)).size());
-
-        assertEquals(0, it.search(new Interval(-1,-1)).size());
-
-        assertEquals(5, it.search(new Interval(1,4)).size());
-
-        assertEquals(2, it.search(new Interval(0,1)).size());
-
-        assertEquals(0, it.search(new Interval(10,12)).size());
-
-        List<Interval> intervals2 = new ArrayList<Interval>();
+        List<Interval<Integer, Void>> intervals = new ArrayList<Interval<Integer, Void>>();
+
+        intervals.add(Interval.<Integer, Void>create(-300, -200));
+        intervals.add(Interval.<Integer, Void>create(-3, -2));
+        intervals.add(Interval.<Integer, Void>create(1,2));
+        intervals.add(Interval.<Integer, Void>create(3,6));
+        intervals.add(Interval.<Integer, Void>create(2,4));
+        intervals.add(Interval.<Integer, Void>create(5,7));
+        intervals.add(Interval.<Integer, Void>create(1,3));
+        intervals.add(Interval.<Integer, Void>create(4,6));
+        intervals.add(Interval.<Integer, Void>create(8,9));
+        intervals.add(Interval.<Integer, Void>create(15,20));
+        intervals.add(Interval.<Integer, Void>create(40,50));
+        intervals.add(Interval.<Integer, Void>create(49,60));
+
+
+        IntervalTree<Integer, Void, Interval<Integer, Void>> it = IntervalTree.build(intervals);
+
+        assertEquals(3, it.search(Interval.<Integer, Void>create(4,4)).size());
+        assertEquals(4, it.search(Interval.<Integer, Void>create(4, 5)).size());
+        assertEquals(7, it.search(Interval.<Integer, Void>create(-1,10)).size());
+        assertEquals(0, it.search(Interval.<Integer, Void>create(-1,-1)).size());
+        assertEquals(5, it.search(Interval.<Integer, Void>create(1,4)).size());
+        assertEquals(2, it.search(Interval.<Integer, Void>create(0,1)).size());
+        assertEquals(0, it.search(Interval.<Integer, Void>create(10,12)).size());
+
+        List<Interval<Integer, Void>> intervals2 = new ArrayList<Interval<Integer, Void>>();
 
         //stravinsky 1880-1971
-        intervals2.add(new Interval(1880, 1971));
+        intervals2.add(Interval.<Integer, Void>create(1880, 1971));
         //Schoenberg
-        intervals2.add(new Interval(1874, 1951));
+        intervals2.add(Interval.<Integer, Void>create(1874, 1951));
         //Grieg
-        intervals2.add(new Interval(1843, 1907));
+        intervals2.add(Interval.<Integer, Void>create(1843, 1907));
         //Schubert
-        intervals2.add(new Interval(1779, 1828));
+        intervals2.add(Interval.<Integer, Void>create(1779, 1828));
         //Mozart
-        intervals2.add(new Interval(1756, 1828));
+        intervals2.add(Interval.<Integer, Void>create(1756, 1828));
         //Schuetz
-        intervals2.add(new Interval(1585, 1672));
+        intervals2.add(Interval.<Integer, Void>create(1585, 1672));
 
-        IntervalTree it2 = new IntervalTree(intervals2);
+        IntervalTree<Integer, Void, Interval<Integer, Void>> it2 = IntervalTree.build(intervals2);
 
-        assertEquals(0, it2.search(new Interval(1829, 1842)).size());
+        assertEquals(0, it2.search(Interval.<Integer, Void>create(1829, 1842)).size());
 
-        List<Interval> intersection1 = it2.search(new Interval(1907, 1907));
+        List<Void> intersection1 = it2.search(Interval.<Integer, Void>create(1907, 1907));
         assertEquals(3, intersection1.size());
 
-        intersection1 = it2.search(new Interval(1780, 1790));
+        intersection1 = it2.search(Interval.<Integer, Void>create(1780, 1790));
         assertEquals(2, intersection1.size());
 
     }
+
+    @Test
+    public void testIteration()
+    {
+        List<Interval<Integer, Void>> intervals = new ArrayList<Interval<Integer, Void>>();
+
+        intervals.add(Interval.<Integer, Void>create(-300, -200));
+        intervals.add(Interval.<Integer, Void>create(-3, -2));
+        intervals.add(Interval.<Integer, Void>create(1,2));
+        intervals.add(Interval.<Integer, Void>create(3,6));
+        intervals.add(Interval.<Integer, Void>create(2,4));
+        intervals.add(Interval.<Integer, Void>create(5,7));
+        intervals.add(Interval.<Integer, Void>create(1,3));
+        intervals.add(Interval.<Integer, Void>create(4,6));
+        intervals.add(Interval.<Integer, Void>create(8,9));
+        intervals.add(Interval.<Integer, Void>create(15,20));
+        intervals.add(Interval.<Integer, Void>create(40,50));
+        intervals.add(Interval.<Integer, Void>create(49,60));
+
+        IntervalTree<Integer, Void, Interval<Integer, Void>> it = IntervalTree.build(intervals);
+
+        Collections.sort(intervals, it.minOrdering);
+
+        List<Interval<Integer, Void>> l = new ArrayList<Interval<Integer, Void>>();
+        for (Interval<Integer, Void> i : it)
+            l.add(i);
+
+        assertEquals(intervals, l);
+    }
+
+    @Test
+    public void testSerialization() throws Exception
+    {
+        List<Interval<Integer, String>> intervals = new ArrayList<Interval<Integer, String>>();
+
+        intervals.add(Interval.<Integer, String>create(-300, -200, "a"));
+        intervals.add(Interval.<Integer, String>create(-3, -2, "b"));
+        intervals.add(Interval.<Integer, String>create(1,2, "c"));
+        intervals.add(Interval.<Integer, String>create(1,3, "d"));
+        intervals.add(Interval.<Integer, String>create(2,4, "e"));
+        intervals.add(Interval.<Integer, String>create(3,6, "f"));
+        intervals.add(Interval.<Integer, String>create(4,6, "g"));
+        intervals.add(Interval.<Integer, String>create(5,7, "h"));
+        intervals.add(Interval.<Integer, String>create(8,9, "i"));
+        intervals.add(Interval.<Integer, String>create(15,20, "j"));
+        intervals.add(Interval.<Integer, String>create(40,50, "k"));
+        intervals.add(Interval.<Integer, String>create(49,60, "l"));
+
+        IntervalTree<Integer, String, Interval<Integer, String>> it = IntervalTree.build(intervals);
+
+        IVersionedSerializer<IntervalTree<Integer, String, Interval<Integer, String>>> serializer = IntervalTree.serializer(
+            new ISerializer<Integer>()
+            {
+                public void serialize(Integer i, DataOutput dos) throws IOException { dos.writeInt(i); }
+                public Integer deserialize(DataInput dis) throws IOException { return dis.readInt(); }
+                public long serializedSize(Integer i, TypeSizes ts) { return 4; }
+            },
+            new ISerializer<String>()
+            {
+                public void serialize(String v, DataOutput dos) throws IOException { dos.writeUTF(v); }
+                public String deserialize(DataInput dis) throws IOException { return dis.readUTF(); }
+                public long serializedSize(String v, TypeSizes ts) { return v.length(); }
+            },
+            Interval.class.getConstructor(Object.class, Object.class, Object.class)
+        );
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(baos);
+
+        serializer.serialize(it, out, 0);
+
+        DataInputStream in = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+        IntervalTree<Integer, String, Interval<Integer, String>> it2 = serializer.deserialize(in, 0);
+        List<Interval<Integer, String>> intervals2 = new ArrayList<Interval<Integer, String>>();
+        for (Interval<Integer, String> i : it2)
+            intervals2.add(i);
+
+        assertEquals(intervals, intervals2);
+    }
 }


Mime
View raw message