cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject cassandra git commit: Fix handling of range tombstones and add test for merging
Date Thu, 02 Jul 2015 16:03:49 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk f708c1e41 -> 7813dee0c


Fix handling of range tombstones and add test for merging

patch by blambov & slebresne; reviewed by blambov & slebresne for CASSANDRA-8099-followup


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7813dee0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7813dee0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7813dee0

Branch: refs/heads/trunk
Commit: 7813dee0cf7f9aa8107aba92b40b7caee79f3528
Parents: f708c1e
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Jul 1 08:24:35 2015 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Jul 2 18:02:23 2015 +0200

----------------------------------------------------------------------
 .../cassandra/db/ClusteringComparator.java      |   2 +-
 .../apache/cassandra/db/ClusteringPrefix.java   |  33 +-
 .../org/apache/cassandra/db/RangeTombstone.java |   5 +
 .../apache/cassandra/db/RangeTombstoneList.java |  17 +-
 .../db/rows/RangeTombstoneBoundMarker.java      |  14 +
 .../db/rows/RangeTombstoneBoundaryMarker.java   |  24 +-
 .../cassandra/db/rows/RangeTombstoneMarker.java |  73 +-
 .../db/rows/RowAndTombstoneMergeIterator.java   |   7 +-
 .../cassandra/db/RangeTombstoneListTest.java    |  35 +-
 .../rows/UnfilteredRowIteratorsMergeTest.java   | 679 +++++++++++++++++++
 10 files changed, 790 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index b0e8e5c..8b01d6f 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -162,7 +162,7 @@ public class ClusteringComparator implements Comparator<Clusterable>
         if (s1 == s2)
             return ClusteringPrefix.Kind.compare(c1.kind(), c2.kind());
 
-        return s1 < s2 ? c1.kind().prefixComparisonResult : -c2.kind().prefixComparisonResult;
+        return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering;
     }
 
     public int compare(Clustering c1, Clustering c2)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 36d91e7..3bc7ff8 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -56,26 +56,27 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab
     {
         // WARNING: the ordering of that enum matters because we use ordinal() in the serialization
 
-        EXCL_END_BOUND(0, -1),
-        INCL_START_BOUND(1, -1),
-        EXCL_END_INCL_START_BOUNDARY(1, -1),
-        STATIC_CLUSTERING(2, -1),
-        CLUSTERING(3, 0),
-        INCL_END_EXCL_START_BOUNDARY(4, -1),
-        INCL_END_BOUND(4, 1),
-        EXCL_START_BOUND(5, 1);
+        EXCL_END_BOUND              (0, -1),
+        INCL_START_BOUND            (0, -1),
+        EXCL_END_INCL_START_BOUNDARY(0, -1),
+        STATIC_CLUSTERING           (1, -1),
+        CLUSTERING                  (2,  0),
+        INCL_END_EXCL_START_BOUNDARY(3,  1),
+        INCL_END_BOUND              (3,  1),
+        EXCL_START_BOUND            (3,  1);
 
         private final int comparison;
 
-        // If clusterable c1 has this Kind and is a strict prefix of clusterable c2, then this
-        // is the result of compare(c1, c2). Basically, this is the same as comparing the kind of c1 to
-        // CLUSTERING.
-        public final int prefixComparisonResult;
+        /**
+         * Return the comparison of this kind to CLUSTERING.
+         * For bounds/boundaries, this basically tells us if we sort before or after our clustering values.
+         */
+        public final int comparedToClustering;
 
-        private Kind(int comparison, int prefixComparisonResult)
+        private Kind(int comparison, int comparedToClustering)
         {
             this.comparison = comparison;
-            this.prefixComparisonResult = prefixComparisonResult;
+            this.comparedToClustering = comparedToClustering;
         }
 
         /**
@@ -441,7 +442,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab
             for (int i = 0; i < bound.size(); i++)
             {
                 if (!hasComponent(i))
-                    return nextKind.prefixComparisonResult;
+                    return nextKind.comparedToClustering;
 
                 int cmp = comparator.compareComponent(i, nextValues[i], bound.get(i));
                 if (cmp != 0)
@@ -452,7 +453,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab
                 return nextKind.compareTo(bound.kind());
 
             // We know that we'll have exited already if nextSize < bound.size
-            return -bound.kind().prefixComparisonResult;
+            return -bound.kind().comparedToClustering;
         }
 
         private boolean hasComponent(int i) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index de21950..df60933 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -71,6 +71,11 @@ public class RangeTombstone
         return deletion;
     }
 
+    public String toString(ClusteringComparator comparator)
+    {
+        return slice.toString(comparator) + "@" + deletion;
+    }
+
     @Override
     public boolean equals(Object other)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index c377d10..64f0978 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Data structure holding the range tombstones of a ColumnFamily.
@@ -163,7 +164,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         int c = comparator.compare(ends[size-1], start);
 
         // Fast path if we add in sorted order
-        if (c < 0)
+        if (c <= 0)
         {
             addInternal(size, start, end, markedAt, delTime);
         }
@@ -171,7 +172,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         {
             // Note: insertFrom expect i to be the insertion point in term of interval ends
             int pos = Arrays.binarySearch(ends, 0, size, start, comparator);
-            insertFrom((pos >= 0 ? pos : -pos-1), start, end, markedAt, delTime);
+            insertFrom((pos >= 0 ? pos+1 : -pos-1), start, end, markedAt, delTime);
         }
         boundaryHeapSize += start.unsharedHeapSize() + end.unsharedHeapSize();
     }
@@ -504,12 +505,14 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
 
     /*
      * Inserts a new element starting at index i. This method assumes that:
-     *    ends[i-1] < start < ends[i]
-     * (note that we cannot have start == end since both will at least have a different bound "kind")
+     *    ends[i-1] <= start < ends[i]
+     * (note that start can be equal to ends[i-1] in the case where we have a boundary, i.e. for instance
+     * ends[i-1] is the exclusive end of X and start is the inclusive start of X).
      *
      * A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that:
-     *   - s_i <= e_i
-     *   - e_i < s_i+1
+     *   - s_i is a start bound and e_i is a end bound
+     *   - s_i < e_i
+     *   - e_i <= s_i+1
      * Basically, range are non overlapping and in order.
      */
     private void insertFrom(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
@@ -517,7 +520,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         while (i < size)
         {
             assert start.isStart() && end.isEnd();
-            assert i == 0 || comparator.compare(ends[i-1], start) < 0;
+            assert i == 0 || comparator.compare(ends[i-1], start) <= 0;
             assert comparator.compare(start, ends[i]) < 0;
 
             if (Slice.isEmpty(comparator, start, end))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index b5ac19b..8b52b0b 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -114,6 +114,20 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker
         return deletion;
     }
 
+    public boolean openIsInclusive(boolean reversed)
+    {
+        if (!isOpen(reversed))
+            throw new IllegalStateException();
+        return bound.isInclusive();
+    }
+
+    public boolean closeIsInclusive(boolean reversed)
+    {
+        if (isOpen(reversed))
+            throw new IllegalStateException();
+        return bound.isInclusive();
+    }
+
     public void copyTo(RangeTombstoneMarker.Writer writer)
     {
         copyBoundTo(writer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 1140d40..f17515d 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -87,6 +87,16 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
         return reversed ? endDeletion : startDeletion;
     }
 
+    public boolean openIsInclusive(boolean reversed)
+    {
+        return (bound.kind() == ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY) ^ reversed;
+    }
+
+    public boolean closeIsInclusive(boolean reversed)
+    {
+        return (bound.kind() == ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY) ^ reversed;
+    }
+
     public boolean isOpen(boolean reversed)
     {
         // A boundary always open one side
@@ -99,21 +109,9 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
         return true;
     }
 
-    public static boolean isBoundary(ClusteringComparator comparator, Slice.Bound close, Slice.Bound open)
-    {
-        if (!comparator.isOnSameClustering(close, open))
-            return false;
-
-        // If both bound are exclusive, then it's not a boundary, otherwise it is one.
-        // Note that most code should never call this with 2 inclusive bound: this would mean we had
-        // 2 RTs that were overlapping and RangeTombstoneList don't create that. However, old
-        // code was generating that so supporting this case helps dealing with backward compatibility.
-        return close.isInclusive() || open.isInclusive();
-    }
-
-    // Please note that isBoundary *must* have been called (and returned true) before this is called.
     public static RangeTombstoneBoundaryMarker makeBoundary(boolean reversed, Slice.Bound close, Slice.Bound open, DeletionTime closeDeletion, DeletionTime openDeletion)
     {
+        assert RangeTombstone.Bound.Kind.compare(close.kind(), open.kind()) == 0 : "Both bound don't form a boundary";
         boolean isExclusiveClose = close.isExclusive() || (close.isInclusive() && open.isInclusive() && openDeletion.supersedes(closeDeletion));
         return isExclusiveClose
              ? exclusiveCloseInclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index 1a506d5..380e6b0 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -41,6 +41,8 @@ public interface RangeTombstoneMarker extends Unfiltered
     public boolean isClose(boolean reversed);
     public DeletionTime openDeletionTime(boolean reversed);
     public DeletionTime closeDeletionTime(boolean reversed);
+    public boolean openIsInclusive(boolean reversed);
+    public boolean closeIsInclusive(boolean reversed);
 
     public interface Writer extends Slice.Bound.Writer
     {
@@ -121,33 +123,6 @@ public interface RangeTombstoneMarker extends Unfiltered
      */
     public static class Merger
     {
-        // Boundaries sorts like the bound that have their equivalent "inclusive" part and that's the main action we
-        // care about as far as merging goes. So MergedKind just group those as the same case, and tell us whether
-        // we're dealing with an open or a close (based on whether we're dealing with reversed iterators or not).
-        // Really this enum is just a convenience for merging.
-        private enum MergedKind
-        {
-            INCL_OPEN, EXCL_CLOSE, EXCL_OPEN, INCL_CLOSE;
-
-            public static MergedKind forBound(RangeTombstone.Bound bound, boolean reversed)
-            {
-                switch (bound.kind())
-                {
-                    case INCL_START_BOUND:
-                    case EXCL_END_INCL_START_BOUNDARY:
-                        return reversed ? INCL_CLOSE : INCL_OPEN;
-                    case EXCL_END_BOUND:
-                        return reversed ? EXCL_OPEN : EXCL_CLOSE;
-                    case EXCL_START_BOUND:
-                        return reversed ? EXCL_CLOSE : EXCL_OPEN;
-                    case INCL_END_EXCL_START_BOUNDARY:
-                    case INCL_END_BOUND:
-                        return reversed ? INCL_OPEN : INCL_CLOSE;
-                }
-                throw new AssertionError();
-            }
-        }
-
         private final CFMetaData metadata;
         private final UnfilteredRowIterators.MergeListener listener;
         private final DeletionTime partitionDeletion;
@@ -202,33 +177,29 @@ public interface RangeTombstoneMarker extends Unfiltered
             if (previousDeletionTimeInMerged.equals(newDeletionTimeInMerged))
                 return null;
 
-            ByteBuffer[] values = bound.getRawValues();
+            boolean isBeforeClustering = bound.kind().comparedToClustering < 0;
+            if (reversed)
+                isBeforeClustering = !isBeforeClustering;
 
+            ByteBuffer[] values = bound.getRawValues();
             RangeTombstoneMarker merged;
-            switch (MergedKind.forBound(bound, reversed))
+            if (previousDeletionTimeInMerged.isLive())
+            {
+                merged = isBeforeClustering
+                       ? RangeTombstoneBoundMarker.inclusiveOpen(reversed, values, newDeletionTimeInMerged)
+                       : RangeTombstoneBoundMarker.exclusiveOpen(reversed, values, newDeletionTimeInMerged);
+            }
+            else if (newDeletionTimeInMerged.isLive())
+            {
+                merged = isBeforeClustering
+                       ? RangeTombstoneBoundMarker.exclusiveClose(reversed, values, previousDeletionTimeInMerged)
+                       : RangeTombstoneBoundMarker.inclusiveClose(reversed, values, previousDeletionTimeInMerged);
+            }
+            else
             {
-                case INCL_OPEN:
-                    merged = previousDeletionTimeInMerged.isLive()
-                           ? RangeTombstoneBoundMarker.inclusiveOpen(reversed, values, newDeletionTimeInMerged)
-                           : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
-                    break;
-                case EXCL_CLOSE:
-                    merged = newDeletionTimeInMerged.isLive()
-                           ? RangeTombstoneBoundMarker.exclusiveClose(reversed, values, previousDeletionTimeInMerged)
-                           : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
-                    break;
-                case EXCL_OPEN:
-                    merged = previousDeletionTimeInMerged.isLive()
-                           ? RangeTombstoneBoundMarker.exclusiveOpen(reversed, values, newDeletionTimeInMerged)
-                           : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
-                    break;
-                case INCL_CLOSE:
-                    merged = newDeletionTimeInMerged.isLive()
-                           ? RangeTombstoneBoundMarker.inclusiveClose(reversed, values, previousDeletionTimeInMerged)
-                           : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
-                    break;
-                default:
-                    throw new AssertionError();
+                merged = isBeforeClustering
+                       ? RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged)
+                       : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
             }
 
             if (listener != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
index 51383a2..f923329 100644
--- a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.*;
 
 public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltered> implements PeekingIterator<Unfiltered>
 {
-    private final ClusteringComparator clusteringComparator;
     private final Comparator<Clusterable> comparator;
     private final boolean reversed;
 
@@ -42,7 +41,6 @@ public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltere
 
     public RowAndTombstoneMergeIterator(ClusteringComparator comparator, boolean reversed)
     {
-        this.clusteringComparator = comparator;
         this.comparator = reversed ? comparator.reversed() : comparator;
         this.reversed = reversed;
     }
@@ -87,7 +85,8 @@ public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltere
             {
                 RangeTombstone rt = nextTombstone;
                 nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null;
-                if (nextTombstone != null && RangeTombstoneBoundaryMarker.isBoundary(clusteringComparator, rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)))
+                // An end and a start makes a boundary if they sort similarly
+                if (nextTombstone != null && RangeTombstone.Bound.Kind.compare(rt.deletedSlice().close(reversed).kind(), nextTombstone.deletedSlice().open(reversed).kind()) == 0)
                 {
                     next = RangeTombstoneBoundaryMarker.makeBoundary(reversed,
                                                                      rt.deletedSlice().close(reversed),
@@ -113,7 +112,7 @@ public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltere
             {
                 RangeTombstone rt = nextTombstone;
                 nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null;
-                if (nextTombstone != null && RangeTombstoneBoundaryMarker.isBoundary(clusteringComparator, rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)))
+                if (nextTombstone != null && RangeTombstone.Bound.Kind.compare(rt.deletedSlice().close(reversed).kind(), nextTombstone.deletedSlice().open(reversed).kind()) == 0)
                 {
                     next = RangeTombstoneBoundaryMarker.makeBoundary(reversed,
                                                                      rt.deletedSlice().close(reversed),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
index 73b91ea..8a697f4 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
@@ -26,12 +26,12 @@ import static org.junit.Assert.*;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class RangeTombstoneListTest
 {
-    private static final ClusteringComparator cmp = new ClusteringComparator(IntegerType.instance);
+    private static final ClusteringComparator cmp = new ClusteringComparator(Int32Type.instance);
 
     @Test
     public void sortedAdditionTest()
@@ -263,19 +263,35 @@ public class RangeTombstoneListTest
 
         int prevStart = -1;
         int prevEnd = 0;
+        boolean prevStartInclusive = false;
+        boolean prevEndInclusive = false;
         for (int i = 0; i < size; i++)
         {
             int nextStart = prevEnd + rand.nextInt(maxItDistance);
             int nextEnd = nextStart + rand.nextInt(maxItSize);
 
-            // We can have an interval [x, x], but not 2 consecutives ones for the same x
-            if (nextEnd == nextStart && prevEnd == prevStart && prevEnd == nextStart)
-                nextEnd += 1 + rand.nextInt(maxItDistance);
+            boolean startInclusive = rand.nextBoolean();
+            boolean endInclusive = rand.nextBoolean();
 
-            l.add(rt(nextStart, nextEnd, rand.nextInt(maxMarkedAt)));
+            // Now make sure we create meaningful ranges
+
+            if (prevEnd == nextStart)
+                startInclusive = !prevEndInclusive;
+
+            if (nextStart == nextEnd)
+            {
+                if (startInclusive)
+                    endInclusive = true;
+                else
+                    nextEnd += 1;
+            }
+
+            l.add(rt(nextStart, startInclusive, nextEnd, endInclusive, rand.nextInt(maxMarkedAt)));
 
             prevStart = nextStart;
             prevEnd = nextEnd;
+            prevStartInclusive = startInclusive;
+            prevEndInclusive = endInclusive;
         }
         return l;
     }
@@ -339,7 +355,7 @@ public class RangeTombstoneListTest
             Slice curr = iter.next().deletedSlice();
 
             assertFalse("Invalid empty slice " + curr.toString(cmp), curr.isEmpty(cmp));
-            assertTrue("Slice not in order or overlapping : " + prev.toString(cmp) + curr.toString(cmp), cmp.compare(prev.end(), curr.start()) < 0);
+            assertTrue("Slice not in order or overlapping : " + prev.toString(cmp) + curr.toString(cmp), cmp.compare(prev.end(), curr.start()) <= 0);
         }
     }
 
@@ -377,6 +393,11 @@ public class RangeTombstoneListTest
         return rt(start, end, tstamp, 0);
     }
 
+    private static RangeTombstone rt(int start, boolean startInclusive, int end, boolean endInclusive, long tstamp)
+    {
+        return new RangeTombstone(Slice.make(Slice.Bound.create(cmp, true, startInclusive, start), Slice.Bound.create(cmp, false, endInclusive, end)), new SimpleDeletionTime(tstamp, 0));
+    }
+
     private static RangeTombstone rt(int start, int end, long tstamp, int delTime)
     {
         return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.inclusiveEndOf(bb(end))), new SimpleDeletionTime(tstamp, delTime));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
new file mode 100644
index 0000000..a607dca
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Slice.Bound;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.rows.Unfiltered.Kind;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+
+public class UnfilteredRowIteratorsMergeTest
+{
+    static DecoratedKey partitionKey = Util.dk("key");
+    static DeletionTime partitionLevelDeletion = DeletionTime.LIVE;
+    static CFMetaData metadata = CFMetaData.Builder.create("UnfilteredRowIteratorsMergeTest", "Test").
+            addPartitionKey("key", AsciiType.instance).
+            addClusteringColumn("clustering", Int32Type.instance).
+            addRegularColumn("data", Int32Type.instance).
+            build();
+    static Comparator<Clusterable> comparator = new ClusteringComparator(Int32Type.instance);
+    static int nowInSec = FBUtilities.nowInSeconds();
+
+    static final int RANGE = 3000;
+    static final int DEL_RANGE = 100;
+    static final int ITERATORS = 15;
+    static final int ITEMS = 300;
+
+    boolean reversed;
+
+    public UnfilteredRowIteratorsMergeTest()
+    {
+    }
+
+    @Test
+    public void testTombstoneMerge()
+    {
+        testTombstoneMerge(false, false);
+    }
+
+    @Test
+    public void testTombstoneMergeReversed()
+    {
+        testTombstoneMerge(true, false);
+    }
+
+    @Test
+    public void testTombstoneMergeIterative()
+    {
+        testTombstoneMerge(false, true);
+    }
+
+    @Test
+    public void testTombstoneMergeReversedIterative()
+    {
+        testTombstoneMerge(true, true);
+    }
+
+    @Test
+    public void testDuplicateRangeCase()
+    {
+        testForInput("67<=[98] [98]<=67",
+                     "66<=[11] [11]<71",
+                     "66<[13] [13]<67");
+    }
+
+    @SuppressWarnings("unused")
+    public void testTombstoneMerge(boolean reversed, boolean iterations)
+    {
+        for (int seed = 1; seed <= 100; ++seed)
+        {
+            this.reversed = reversed;
+            if (ITEMS <= 20)
+                System.out.println("\nSeed " + seed);
+
+            Random r = new Random(seed);
+            List<Function<Integer, Integer>> timeGenerators = ImmutableList.of(
+                    x -> -1,
+                    x -> DEL_RANGE,
+                    x -> r.nextInt(DEL_RANGE)
+                );
+            List<List<Unfiltered>> sources = new ArrayList<>(ITERATORS);
+            if (ITEMS <= 20)
+                System.out.println("Merging");
+            for (int i=0; i<ITERATORS; ++i)
+                sources.add(generateSource(r, timeGenerators.get(r.nextInt(timeGenerators.size()))));
+            List<Unfiltered> merged = merge(sources, iterations);
+    
+            if (ITEMS <= 20)
+                System.out.println("results in");
+            if (ITEMS <= 20)
+                dumpList(merged);
+            verifyEquivalent(sources, merged);
+            verifyValid(merged);
+            if (reversed)
+            {
+                Collections.reverse(merged);
+                this.reversed = false;
+                verifyValid(merged);
+            }
+        }
+    }
+
+    private List<Unfiltered> merge(List<List<Unfiltered>> sources, boolean iterations)
+    {
+        List<UnfilteredRowIterator> us = sources.stream().map(l -> new Source(l.iterator())).collect(Collectors.toList());
+        List<Unfiltered> merged = new ArrayList<>();
+        Iterators.addAll(merged, safeIterator(mergeIterators(us, iterations)));
+        return merged;
+    }
+
+    public UnfilteredRowIterator mergeIterators(List<UnfilteredRowIterator> us, boolean iterations)
+    {
+        int now = FBUtilities.nowInSeconds();
+        if (iterations)
+        {
+            UnfilteredRowIterator mi = us.get(0);
+            int i;
+            for (i = 1; i + 2 <= ITERATORS; i += 2)
+                mi = UnfilteredRowIterators.merge(ImmutableList.of(mi, us.get(i), us.get(i+1)), now);
+            if (i + 1 <= ITERATORS)
+                mi = UnfilteredRowIterators.merge(ImmutableList.of(mi, us.get(i)), now);
+            return mi;
+        }
+        else
+        {
+            return UnfilteredRowIterators.merge(us, now);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    private List<Unfiltered> generateSource(Random r, Function<Integer, Integer> timeGenerator)
+    {
+        int[] positions = new int[ITEMS + 1];
+        for (int i=0; i<ITEMS; ++i)
+            positions[i] = r.nextInt(RANGE);
+        positions[ITEMS] = RANGE;
+        Arrays.sort(positions);
+
+        List<Unfiltered> content = new ArrayList<>(ITEMS);
+        int prev = -1;
+        for (int i=0; i<ITEMS; ++i)
+        {
+            int pos = positions[i];
+            int sz = positions[i + 1] - pos;
+            if (sz == 0 && pos == prev)
+                // Filter out more than two of the same position.
+                continue;
+            if (r.nextBoolean() || pos == prev)
+            {
+                int span;
+                boolean includesStart;
+                boolean includesEnd;
+                if (pos > prev)
+                {
+                    span = r.nextInt(sz + 1);
+                    includesStart = span > 0 ? r.nextBoolean() : true;
+                    includesEnd = span > 0 ? r.nextBoolean() : true;
+                }
+                else
+                {
+                    span = 1 + r.nextInt(sz);
+                    includesStart = false;
+                    includesEnd = r.nextBoolean();
+                }
+                int deltime = r.nextInt(DEL_RANGE);
+                DeletionTime dt = new SimpleDeletionTime(deltime, deltime);
+                content.add(new RangeTombstoneBoundMarker(boundFor(pos, true, includesStart), dt));
+                content.add(new RangeTombstoneBoundMarker(boundFor(pos + span, false, includesEnd), dt));
+                prev = pos + span - (includesEnd ? 0 : 1);
+            }
+            else
+            {
+                content.add(emptyRowAt(pos, timeGenerator));
+                prev = pos;
+            }
+        }
+
+        attachBoundaries(content);
+        if (reversed)
+        {
+            Collections.reverse(content);
+        }
+        verifyValid(content);
+        if (ITEMS <= 20)
+            dumpList(content);
+        return content;
+    }
+
+    static void attachBoundaries(List<Unfiltered> content)
+    {
+        int di = 0;
+        RangeTombstoneMarker prev = null;
+        for (int si = 0; si < content.size(); ++si)
+        {
+            Unfiltered currUnfiltered = content.get(si);
+            RangeTombstoneMarker curr = currUnfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER ?
+                                        (RangeTombstoneMarker) currUnfiltered :
+                                        null;
+            if (prev != null && curr != null && prev.isClose(false) && curr.isOpen(false) && prev.clustering().invert().equals(curr.clustering()))
+            {
+                // Join. Prefer not to use merger to check its correctness.
+                RangeTombstone.Bound b = prev.clustering();
+                b = b.withNewKind(b.isInclusive() ? RangeTombstone.Bound.Kind.INCL_END_EXCL_START_BOUNDARY : RangeTombstone.Bound.Kind.EXCL_END_INCL_START_BOUNDARY);
+                prev = new RangeTombstoneBoundaryMarker(b, prev.closeDeletionTime(false), curr.openDeletionTime(false));
+                currUnfiltered = prev;
+                --di;
+            }
+            content.set(di++, currUnfiltered);
+            prev = curr;
+        }
+        for (int pos = content.size() - 1; pos >= di; --pos)
+            content.remove(pos);
+    }
+
+    void verifyValid(List<Unfiltered> list)
+    {
+        int reversedAsMultiplier = reversed ? -1 : 1;
+        try {
+            RangeTombstoneMarker prev = null;
+            Unfiltered prevUnfiltered = null;
+            for (Unfiltered unfiltered : list)
+            {
+                Assert.assertTrue("Order violation prev " + str(prevUnfiltered) + " curr " + str(unfiltered),
+                                  prevUnfiltered == null || comparator.compare(prevUnfiltered, unfiltered) * reversedAsMultiplier < 0);
+                prevUnfiltered = unfiltered;
+
+                if (unfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER)
+                {
+                    RangeTombstoneMarker curr = (RangeTombstoneMarker) unfiltered;
+                    if (prev != null)
+                    {
+                        if (curr.isClose(reversed))
+                        {
+                            Assert.assertTrue(str(unfiltered) + " follows another close marker " + str(prev), prev.isOpen(reversed));
+                            Assert.assertEquals("Deletion time mismatch for open " + str(prev) + " and close " + str(unfiltered),
+                                                prev.openDeletionTime(reversed),
+                                                curr.closeDeletionTime(reversed));
+                        }
+                        else
+                            Assert.assertFalse(str(curr) + " follows another open marker " + str(prev), prev.isOpen(reversed));
+                    }
+
+                    prev = curr;
+                }
+            }
+            Assert.assertFalse("Cannot end in open marker " + str(prev), prev != null && prev.isOpen(reversed));
+
+        } catch (AssertionError e) {
+            System.out.println(e);
+            dumpList(list);
+            throw e;
+        }
+    }
+
+    void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged)
+    {
+        try
+        {
+            for (int i=0; i<RANGE; ++i)
+            {
+                Clusterable c = clusteringFor(i);
+                DeletionTime dt = DeletionTime.LIVE;
+                for (List<Unfiltered> source : sources)
+                {
+                    dt = deletionFor(c, source, dt);
+                }
+                Assert.assertEquals("Deletion time mismatch for position " + str(c), dt, deletionFor(c, merged));
+                if (dt == DeletionTime.LIVE)
+                {
+                    Optional<Unfiltered> sourceOpt = sources.stream().map(source -> rowFor(c, source)).filter(x -> x != null).findAny();
+                    Unfiltered mergedRow = rowFor(c, merged);
+                    Assert.assertEquals("Content mismatch for position " + str(c), str(sourceOpt.orElse(null)), str(mergedRow));
+                }
+            }
+        }
+        catch (AssertionError e)
+        {
+            System.out.println(e);
+            for (List<Unfiltered> list : sources)
+                dumpList(list);
+            System.out.println("merged");
+            dumpList(merged);
+            throw e;
+        }
+    }
+
+    private Unfiltered rowFor(Clusterable pointer, List<Unfiltered> list)
+    {
+        int index = Collections.binarySearch(list, pointer, reversed ? comparator.reversed() : comparator);
+        return index >= 0 ? list.get(index) : null;
+    }
+
+    DeletionTime deletionFor(Clusterable pointer, List<Unfiltered> list)
+    {
+        return deletionFor(pointer, list, DeletionTime.LIVE);
+    }
+
+    DeletionTime deletionFor(Clusterable pointer, List<Unfiltered> list, DeletionTime def)
+    {
+        if (list.isEmpty())
+            return def;
+
+        int index = Collections.binarySearch(list, pointer, reversed ? comparator.reversed() : comparator);
+        if (index < 0)
+            index = -1 - index;
+        else
+        {
+            Row row = (Row) list.get(index);
+            if (row.deletion() != null && row.deletion().supersedes(def))
+                def = row.deletion();
+        }
+
+        if (index >= list.size())
+            return def;
+
+        while (--index >= 0)
+        {
+            Unfiltered unfiltered = list.get(index);
+            if (unfiltered.kind() == Kind.ROW)
+                continue;
+            RangeTombstoneMarker lower = (RangeTombstoneMarker) unfiltered;
+            if (!lower.isOpen(reversed))
+                return def;
+            return lower.openDeletionTime(reversed).supersedes(def) ? lower.openDeletionTime(reversed) : def;
+        }
+        return def;
+    }
+
+    private static Bound boundFor(int pos, boolean start, boolean inclusive)
+    {
+        return Bound.create(Bound.boundKind(start, inclusive), new ByteBuffer[] {Int32Type.instance.decompose(pos)});
+    }
+
+    private static SimpleClustering clusteringFor(int i)
+    {
+        return new SimpleClustering(Int32Type.instance.decompose(i));
+    }
+
+    static Row emptyRowAt(int pos, Function<Integer, Integer> timeGenerator)
+    {
+        final Clustering clustering = clusteringFor(pos);
+        final LivenessInfo live = SimpleLivenessInfo.forUpdate(timeGenerator.apply(pos), 0, nowInSec, metadata);
+        return emptyRowAt(clustering, live, DeletionTime.LIVE);
+    }
+
+    public static class TestCell extends AbstractCell
+    {
+        private final ColumnDefinition column;
+        private final ByteBuffer value;
+        private final LivenessInfo info;
+
+        public TestCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info)
+        {
+            this.column = column;
+            this.value = value;
+            this.info = info.takeAlias();
+        }
+
+        @Override
+        public ColumnDefinition column()
+        {
+            return column;
+        }
+
+        @Override
+        public boolean isCounterCell()
+        {
+            return false;
+        }
+
+        @Override
+        public ByteBuffer value()
+        {
+            return value;
+        }
+
+        @Override
+        public LivenessInfo livenessInfo()
+        {
+            return info;
+        }
+
+        @Override
+        public CellPath path()
+        {
+            return null;
+        }
+    }
+
+    static Row emptyRowAt(final Clustering clustering, final LivenessInfo live, final DeletionTime deletion)
+    {
+        final ColumnDefinition columnDef = metadata.getColumnDefinition(new ColumnIdentifier("data", true));
+        final Cell cell = new TestCell(columnDef, clustering.get(0), live);
+
+        return new AbstractRow()
+        {
+            @Override
+            public Columns columns()
+            {
+                return Columns.of(columnDef);
+            }
+
+            @Override
+            public LivenessInfo primaryKeyLivenessInfo()
+            {
+                return live;
+            }
+
+            @Override
+            public DeletionTime deletion()
+            {
+                return deletion;
+            }
+
+            @Override
+            public boolean isEmpty()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean hasComplexDeletion()
+            {
+                return false;
+            }
+
+            @Override
+            public Clustering clustering()
+            {
+                return clustering;
+            }
+
+            @Override
+            public Cell getCell(ColumnDefinition c)
+            {
+                return c == columnDef ? cell : null;
+            }
+
+            @Override
+            public Cell getCell(ColumnDefinition c, CellPath path)
+            {
+                return null;
+            }
+
+            @Override
+            public Iterator<Cell> getCells(ColumnDefinition c)
+            {
+                return Iterators.singletonIterator(cell);
+            }
+
+            @Override
+            public DeletionTime getDeletion(ColumnDefinition c)
+            {
+                return DeletionTime.LIVE;
+            }
+
+            @Override
+            public Iterator<Cell> iterator()
+            {
+                return Iterators.<Cell>emptyIterator();
+            }
+
+            @Override
+            public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+            {
+                return new SearchIterator<ColumnDefinition, ColumnData>()
+                {
+                    @Override
+                    public boolean hasNext()
+                    {
+                        return false;
+                    }
+
+                    @Override
+                    public ColumnData next(ColumnDefinition column)
+                    {
+                        return null;
+                    }
+                };
+            }
+
+            @Override
+            public Kind kind()
+            {
+                return Unfiltered.Kind.ROW;
+            }
+
+            @Override
+            public Row takeAlias()
+            {
+                return this;
+            }
+
+            @Override
+            public String toString()
+            {
+                return Int32Type.instance.getString(clustering.get(0));
+            }
+        };
+
+    }
+
+    private void dumpList(List<Unfiltered> list)
+    {
+        for (Unfiltered u : list)
+            System.out.print(str(u) + " ");
+        System.out.println();
+    }
+
+    private String str(Clusterable curr)
+    {
+        if (curr == null)
+            return "null";
+        String val = Int32Type.instance.getString(curr.clustering().get(0));
+        if (curr instanceof RangeTombstoneMarker)
+        {
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) curr;
+            if (marker.isClose(reversed))
+                val = "[" + marker.closeDeletionTime(reversed).markedForDeleteAt() + "]" + (marker.closeIsInclusive(reversed) ? "<=" : "<") + val;
+            if (marker.isOpen(reversed)) 
+                val = val + (marker.openIsInclusive(reversed) ? "<=" : "<") + "[" + marker.openDeletionTime(reversed).markedForDeleteAt() + "]";
+        }
+        return val;
+    }
+
+    class Source extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator
+    {
+        Iterator<Unfiltered> content;
+
+        protected Source(Iterator<Unfiltered> content)
+        {
+            super(UnfilteredRowIteratorsMergeTest.metadata,
+                  UnfilteredRowIteratorsMergeTest.partitionKey,
+                  UnfilteredRowIteratorsMergeTest.partitionLevelDeletion,
+                  UnfilteredRowIteratorsMergeTest.metadata.partitionColumns(),
+                  null,
+                  reversed,
+                  RowStats.NO_STATS);
+            this.content = content;
+        }
+
+        @Override
+        protected Unfiltered computeNext()
+        {
+            return content.hasNext() ? content.next() : endOfData();
+        }
+    }
+
+    static RangeTombstoneMarker safeMarker(RangeTombstoneMarker marker)
+    {
+        RangeTombstoneMarker.Builder writer = new RangeTombstoneMarker.Builder(1);
+        marker.copyTo(writer);
+        return writer.build();
+    }
+
+    private static Row safeRow(Row row)
+    {
+        return emptyRowAt(new SimpleClustering(row.clustering().get(0)), row.primaryKeyLivenessInfo(), row.deletion());
+    }
+    
+    public static UnfilteredRowIterator safeIterator(UnfilteredRowIterator iterator)
+    {
+        return new WrappingUnfilteredRowIterator(iterator)
+        {
+            @Override
+            public Unfiltered next()
+            {
+                Unfiltered next = super.next();
+                return next.kind() == Unfiltered.Kind.ROW
+                     ? safeRow((Row) next)
+                     : safeMarker((RangeTombstoneMarker) next);
+            }
+        };
+    }
+
+    public void testForInput(String... inputs)
+    {
+        List<List<Unfiltered>> sources = new ArrayList<>();
+        for (String input : inputs)
+        {
+            List<Unfiltered> source = parse(input);
+            attachBoundaries(source);
+            dumpList(source);
+            verifyValid(source);
+            sources.add(source);
+        }
+
+        List<Unfiltered> merged = merge(sources, false);
+        System.out.println("Merge to:");
+        dumpList(merged);
+        verifyEquivalent(sources, merged);
+        verifyValid(merged);
+        System.out.println();
+    }
+
+    List<Unfiltered> parse(String input)
+    {
+        String[] split = input.split(" ");
+        Pattern open = Pattern.compile("(\\d+)<(=)?\\[(\\d+)\\]");
+        Pattern close = Pattern.compile("\\[(\\d+)\\]<(=)?(\\d+)");
+        Pattern row = Pattern.compile("(\\d+)(\\[(\\d+)\\])?");
+        List<Unfiltered> out = new ArrayList<>(split.length);
+        for (String s : split)
+        {
+            Matcher m = open.matcher(s);
+            if (m.matches())
+            {
+                out.add(openMarker(Integer.parseInt(m.group(1)), Integer.parseInt(m.group(3)), m.group(2) != null));
+                continue;
+            }
+            m = close.matcher(s);
+            if (m.matches())
+            {
+                out.add(closeMarker(Integer.parseInt(m.group(3)), Integer.parseInt(m.group(1)), m.group(2) != null));
+                continue;
+            }
+            m = row.matcher(s);
+            if (m.matches())
+            {
+                int live = m.group(3) != null ? Integer.parseInt(m.group(3)) : DEL_RANGE;
+                out.add(emptyRowAt(Integer.parseInt(m.group(1)), x -> live));
+                continue;
+            }
+            Assert.fail("Can't parse " + s);
+        }
+        return out;
+    }
+
+    private RangeTombstoneMarker openMarker(int pos, int delTime, boolean inclusive)
+    {
+        return marker(pos, delTime, true, inclusive);
+    }
+
+    private RangeTombstoneMarker closeMarker(int pos, int delTime, boolean inclusive)
+    {
+        return marker(pos, delTime, false, inclusive);
+    }
+
+    private RangeTombstoneMarker marker(int pos, int delTime, boolean isStart, boolean inclusive)
+    {
+        return new RangeTombstoneBoundMarker(Bound.create(Bound.boundKind(isStart, inclusive),
+                                                          new ByteBuffer[] {clusteringFor(pos).get(0)}),
+                                             new SimpleDeletionTime(delTime, delTime));
+    }
+}


Mime
View raw message