Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A760F1852D for ; Thu, 2 Jul 2015 16:03:49 +0000 (UTC) Received: (qmail 25433 invoked by uid 500); 2 Jul 2015 16:03:49 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 25402 invoked by uid 500); 2 Jul 2015 16:03:49 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 25391 invoked by uid 99); 2 Jul 2015 16:03:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2015 16:03:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 31CFDE0504; Thu, 2 Jul 2015 16:03:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: cassandra git commit: Fix handling of range tombstones and add test for merging Date: Thu, 2 Jul 2015 16:03:49 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/trunk f708c1e41 -> 7813dee0c Fix handling of range tombstones and add test for merging patch by blambov & slebresne; reviewed by blambov & slebresne for CASSANDRA-8099-followup Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7813dee0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7813dee0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7813dee0 Branch: refs/heads/trunk Commit: 7813dee0cf7f9aa8107aba92b40b7caee79f3528 Parents: f708c1e Author: Sylvain Lebresne Authored: Wed Jul 1 08:24:35 2015 +0200 Committer: Sylvain Lebresne Committed: Thu Jul 2 18:02:23 2015 +0200 ---------------------------------------------------------------------- .../cassandra/db/ClusteringComparator.java | 2 +- .../apache/cassandra/db/ClusteringPrefix.java | 33 +- .../org/apache/cassandra/db/RangeTombstone.java | 5 + .../apache/cassandra/db/RangeTombstoneList.java | 17 +- .../db/rows/RangeTombstoneBoundMarker.java | 14 + .../db/rows/RangeTombstoneBoundaryMarker.java | 24 +- .../cassandra/db/rows/RangeTombstoneMarker.java | 73 +- .../db/rows/RowAndTombstoneMergeIterator.java | 7 +- .../cassandra/db/RangeTombstoneListTest.java | 35 +- .../rows/UnfilteredRowIteratorsMergeTest.java | 679 +++++++++++++++++++ 10 files changed, 790 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/ClusteringComparator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java index b0e8e5c..8b01d6f 100644 --- a/src/java/org/apache/cassandra/db/ClusteringComparator.java +++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java @@ -162,7 +162,7 @@ public class ClusteringComparator implements Comparator if (s1 == s2) return ClusteringPrefix.Kind.compare(c1.kind(), c2.kind()); - return s1 < s2 ? c1.kind().prefixComparisonResult : -c2.kind().prefixComparisonResult; + return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering; } public int compare(Clustering c1, Clustering c2) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/ClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index 36d91e7..3bc7ff8 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -56,26 +56,27 @@ public interface ClusteringPrefix extends Aliasable, IMeasurab { // WARNING: the ordering of that enum matters because we use ordinal() in the serialization - EXCL_END_BOUND(0, -1), - INCL_START_BOUND(1, -1), - EXCL_END_INCL_START_BOUNDARY(1, -1), - STATIC_CLUSTERING(2, -1), - CLUSTERING(3, 0), - INCL_END_EXCL_START_BOUNDARY(4, -1), - INCL_END_BOUND(4, 1), - EXCL_START_BOUND(5, 1); + EXCL_END_BOUND (0, -1), + INCL_START_BOUND (0, -1), + EXCL_END_INCL_START_BOUNDARY(0, -1), + STATIC_CLUSTERING (1, -1), + CLUSTERING (2, 0), + INCL_END_EXCL_START_BOUNDARY(3, 1), + INCL_END_BOUND (3, 1), + EXCL_START_BOUND (3, 1); private final int comparison; - // If clusterable c1 has this Kind and is a strict prefix of clusterable c2, then this - // is the result of compare(c1, c2). Basically, this is the same as comparing the kind of c1 to - // CLUSTERING. - public final int prefixComparisonResult; + /** + * Return the comparison of this kind to CLUSTERING. + * For bounds/boundaries, this basically tells us if we sort before or after our clustering values. + */ + public final int comparedToClustering; - private Kind(int comparison, int prefixComparisonResult) + private Kind(int comparison, int comparedToClustering) { this.comparison = comparison; - this.prefixComparisonResult = prefixComparisonResult; + this.comparedToClustering = comparedToClustering; } /** @@ -441,7 +442,7 @@ public interface ClusteringPrefix extends Aliasable, IMeasurab for (int i = 0; i < bound.size(); i++) { if (!hasComponent(i)) - return nextKind.prefixComparisonResult; + return nextKind.comparedToClustering; int cmp = comparator.compareComponent(i, nextValues[i], bound.get(i)); if (cmp != 0) @@ -452,7 +453,7 @@ public interface ClusteringPrefix extends Aliasable, IMeasurab return nextKind.compareTo(bound.kind()); // We know that we'll have exited already if nextSize < bound.size - return -bound.kind().prefixComparisonResult; + return -bound.kind().comparedToClustering; } private boolean hasComponent(int i) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index de21950..df60933 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -71,6 +71,11 @@ public class RangeTombstone return deletion; } + public String toString(ClusteringComparator comparator) + { + return slice.toString(comparator) + "@" + deletion; + } + @Override public boolean equals(Object other) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/RangeTombstoneList.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java index c377d10..64f0978 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java +++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java @@ -34,6 +34,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.memory.AbstractAllocator; +import org.apache.cassandra.utils.ByteBufferUtil; /** * Data structure holding the range tombstones of a ColumnFamily. @@ -163,7 +164,7 @@ public class RangeTombstoneList implements Iterable, IMeasurable int c = comparator.compare(ends[size-1], start); // Fast path if we add in sorted order - if (c < 0) + if (c <= 0) { addInternal(size, start, end, markedAt, delTime); } @@ -171,7 +172,7 @@ public class RangeTombstoneList implements Iterable, IMeasurable { // Note: insertFrom expect i to be the insertion point in term of interval ends int pos = Arrays.binarySearch(ends, 0, size, start, comparator); - insertFrom((pos >= 0 ? pos : -pos-1), start, end, markedAt, delTime); + insertFrom((pos >= 0 ? pos+1 : -pos-1), start, end, markedAt, delTime); } boundaryHeapSize += start.unsharedHeapSize() + end.unsharedHeapSize(); } @@ -504,12 +505,14 @@ public class RangeTombstoneList implements Iterable, IMeasurable /* * Inserts a new element starting at index i. This method assumes that: - * ends[i-1] < start < ends[i] - * (note that we cannot have start == end since both will at least have a different bound "kind") + * ends[i-1] <= start < ends[i] + * (note that start can be equal to ends[i-1] in the case where we have a boundary, i.e. for instance + * ends[i-1] is the exclusive end of X and start is the inclusive start of X). * * A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that: - * - s_i <= e_i - * - e_i < s_i+1 + * - s_i is a start bound and e_i is a end bound + * - s_i < e_i + * - e_i <= s_i+1 * Basically, range are non overlapping and in order. */ private void insertFrom(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime) @@ -517,7 +520,7 @@ public class RangeTombstoneList implements Iterable, IMeasurable while (i < size) { assert start.isStart() && end.isEnd(); - assert i == 0 || comparator.compare(ends[i-1], start) < 0; + assert i == 0 || comparator.compare(ends[i-1], start) <= 0; assert comparator.compare(start, ends[i]) < 0; if (Slice.isEmpty(comparator, start, end)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java index b5ac19b..8b52b0b 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java @@ -114,6 +114,20 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker return deletion; } + public boolean openIsInclusive(boolean reversed) + { + if (!isOpen(reversed)) + throw new IllegalStateException(); + return bound.isInclusive(); + } + + public boolean closeIsInclusive(boolean reversed) + { + if (isOpen(reversed)) + throw new IllegalStateException(); + return bound.isInclusive(); + } + public void copyTo(RangeTombstoneMarker.Writer writer) { copyBoundTo(writer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java index 1140d40..f17515d 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java @@ -87,6 +87,16 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker return reversed ? endDeletion : startDeletion; } + public boolean openIsInclusive(boolean reversed) + { + return (bound.kind() == ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY) ^ reversed; + } + + public boolean closeIsInclusive(boolean reversed) + { + return (bound.kind() == ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY) ^ reversed; + } + public boolean isOpen(boolean reversed) { // A boundary always open one side @@ -99,21 +109,9 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker return true; } - public static boolean isBoundary(ClusteringComparator comparator, Slice.Bound close, Slice.Bound open) - { - if (!comparator.isOnSameClustering(close, open)) - return false; - - // If both bound are exclusive, then it's not a boundary, otherwise it is one. - // Note that most code should never call this with 2 inclusive bound: this would mean we had - // 2 RTs that were overlapping and RangeTombstoneList don't create that. However, old - // code was generating that so supporting this case helps dealing with backward compatibility. - return close.isInclusive() || open.isInclusive(); - } - - // Please note that isBoundary *must* have been called (and returned true) before this is called. public static RangeTombstoneBoundaryMarker makeBoundary(boolean reversed, Slice.Bound close, Slice.Bound open, DeletionTime closeDeletion, DeletionTime openDeletion) { + assert RangeTombstone.Bound.Kind.compare(close.kind(), open.kind()) == 0 : "Both bound don't form a boundary"; boolean isExclusiveClose = close.isExclusive() || (close.isInclusive() && open.isInclusive() && openDeletion.supersedes(closeDeletion)); return isExclusiveClose ? exclusiveCloseInclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java index 1a506d5..380e6b0 100644 --- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java @@ -41,6 +41,8 @@ public interface RangeTombstoneMarker extends Unfiltered public boolean isClose(boolean reversed); public DeletionTime openDeletionTime(boolean reversed); public DeletionTime closeDeletionTime(boolean reversed); + public boolean openIsInclusive(boolean reversed); + public boolean closeIsInclusive(boolean reversed); public interface Writer extends Slice.Bound.Writer { @@ -121,33 +123,6 @@ public interface RangeTombstoneMarker extends Unfiltered */ public static class Merger { - // Boundaries sorts like the bound that have their equivalent "inclusive" part and that's the main action we - // care about as far as merging goes. So MergedKind just group those as the same case, and tell us whether - // we're dealing with an open or a close (based on whether we're dealing with reversed iterators or not). - // Really this enum is just a convenience for merging. - private enum MergedKind - { - INCL_OPEN, EXCL_CLOSE, EXCL_OPEN, INCL_CLOSE; - - public static MergedKind forBound(RangeTombstone.Bound bound, boolean reversed) - { - switch (bound.kind()) - { - case INCL_START_BOUND: - case EXCL_END_INCL_START_BOUNDARY: - return reversed ? INCL_CLOSE : INCL_OPEN; - case EXCL_END_BOUND: - return reversed ? EXCL_OPEN : EXCL_CLOSE; - case EXCL_START_BOUND: - return reversed ? EXCL_CLOSE : EXCL_OPEN; - case INCL_END_EXCL_START_BOUNDARY: - case INCL_END_BOUND: - return reversed ? INCL_OPEN : INCL_CLOSE; - } - throw new AssertionError(); - } - } - private final CFMetaData metadata; private final UnfilteredRowIterators.MergeListener listener; private final DeletionTime partitionDeletion; @@ -202,33 +177,29 @@ public interface RangeTombstoneMarker extends Unfiltered if (previousDeletionTimeInMerged.equals(newDeletionTimeInMerged)) return null; - ByteBuffer[] values = bound.getRawValues(); + boolean isBeforeClustering = bound.kind().comparedToClustering < 0; + if (reversed) + isBeforeClustering = !isBeforeClustering; + ByteBuffer[] values = bound.getRawValues(); RangeTombstoneMarker merged; - switch (MergedKind.forBound(bound, reversed)) + if (previousDeletionTimeInMerged.isLive()) + { + merged = isBeforeClustering + ? RangeTombstoneBoundMarker.inclusiveOpen(reversed, values, newDeletionTimeInMerged) + : RangeTombstoneBoundMarker.exclusiveOpen(reversed, values, newDeletionTimeInMerged); + } + else if (newDeletionTimeInMerged.isLive()) + { + merged = isBeforeClustering + ? RangeTombstoneBoundMarker.exclusiveClose(reversed, values, previousDeletionTimeInMerged) + : RangeTombstoneBoundMarker.inclusiveClose(reversed, values, previousDeletionTimeInMerged); + } + else { - case INCL_OPEN: - merged = previousDeletionTimeInMerged.isLive() - ? RangeTombstoneBoundMarker.inclusiveOpen(reversed, values, newDeletionTimeInMerged) - : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); - break; - case EXCL_CLOSE: - merged = newDeletionTimeInMerged.isLive() - ? RangeTombstoneBoundMarker.exclusiveClose(reversed, values, previousDeletionTimeInMerged) - : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); - break; - case EXCL_OPEN: - merged = previousDeletionTimeInMerged.isLive() - ? RangeTombstoneBoundMarker.exclusiveOpen(reversed, values, newDeletionTimeInMerged) - : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); - break; - case INCL_CLOSE: - merged = newDeletionTimeInMerged.isLive() - ? RangeTombstoneBoundMarker.inclusiveClose(reversed, values, previousDeletionTimeInMerged) - : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); - break; - default: - throw new AssertionError(); + merged = isBeforeClustering + ? RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged) + : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); } if (listener != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7813dee0/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java index 51383a2..f923329 100644 --- a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java +++ b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java @@ -27,7 +27,6 @@ import org.apache.cassandra.db.*; public class RowAndTombstoneMergeIterator extends UnmodifiableIterator implements PeekingIterator { - private final ClusteringComparator clusteringComparator; private final Comparator comparator; private final boolean reversed; @@ -42,7 +41,6 @@ public class RowAndTombstoneMergeIterator extends UnmodifiableIterator comparator = new ClusteringComparator(Int32Type.instance); + static int nowInSec = FBUtilities.nowInSeconds(); + + static final int RANGE = 3000; + static final int DEL_RANGE = 100; + static final int ITERATORS = 15; + static final int ITEMS = 300; + + boolean reversed; + + public UnfilteredRowIteratorsMergeTest() + { + } + + @Test + public void testTombstoneMerge() + { + testTombstoneMerge(false, false); + } + + @Test + public void testTombstoneMergeReversed() + { + testTombstoneMerge(true, false); + } + + @Test + public void testTombstoneMergeIterative() + { + testTombstoneMerge(false, true); + } + + @Test + public void testTombstoneMergeReversedIterative() + { + testTombstoneMerge(true, true); + } + + @Test + public void testDuplicateRangeCase() + { + testForInput("67<=[98] [98]<=67", + "66<=[11] [11]<71", + "66<[13] [13]<67"); + } + + @SuppressWarnings("unused") + public void testTombstoneMerge(boolean reversed, boolean iterations) + { + for (int seed = 1; seed <= 100; ++seed) + { + this.reversed = reversed; + if (ITEMS <= 20) + System.out.println("\nSeed " + seed); + + Random r = new Random(seed); + List> timeGenerators = ImmutableList.of( + x -> -1, + x -> DEL_RANGE, + x -> r.nextInt(DEL_RANGE) + ); + List> sources = new ArrayList<>(ITERATORS); + if (ITEMS <= 20) + System.out.println("Merging"); + for (int i=0; i merged = merge(sources, iterations); + + if (ITEMS <= 20) + System.out.println("results in"); + if (ITEMS <= 20) + dumpList(merged); + verifyEquivalent(sources, merged); + verifyValid(merged); + if (reversed) + { + Collections.reverse(merged); + this.reversed = false; + verifyValid(merged); + } + } + } + + private List merge(List> sources, boolean iterations) + { + List us = sources.stream().map(l -> new Source(l.iterator())).collect(Collectors.toList()); + List merged = new ArrayList<>(); + Iterators.addAll(merged, safeIterator(mergeIterators(us, iterations))); + return merged; + } + + public UnfilteredRowIterator mergeIterators(List us, boolean iterations) + { + int now = FBUtilities.nowInSeconds(); + if (iterations) + { + UnfilteredRowIterator mi = us.get(0); + int i; + for (i = 1; i + 2 <= ITERATORS; i += 2) + mi = UnfilteredRowIterators.merge(ImmutableList.of(mi, us.get(i), us.get(i+1)), now); + if (i + 1 <= ITERATORS) + mi = UnfilteredRowIterators.merge(ImmutableList.of(mi, us.get(i)), now); + return mi; + } + else + { + return UnfilteredRowIterators.merge(us, now); + } + } + + @SuppressWarnings("unused") + private List generateSource(Random r, Function timeGenerator) + { + int[] positions = new int[ITEMS + 1]; + for (int i=0; i content = new ArrayList<>(ITEMS); + int prev = -1; + for (int i=0; i prev) + { + span = r.nextInt(sz + 1); + includesStart = span > 0 ? r.nextBoolean() : true; + includesEnd = span > 0 ? r.nextBoolean() : true; + } + else + { + span = 1 + r.nextInt(sz); + includesStart = false; + includesEnd = r.nextBoolean(); + } + int deltime = r.nextInt(DEL_RANGE); + DeletionTime dt = new SimpleDeletionTime(deltime, deltime); + content.add(new RangeTombstoneBoundMarker(boundFor(pos, true, includesStart), dt)); + content.add(new RangeTombstoneBoundMarker(boundFor(pos + span, false, includesEnd), dt)); + prev = pos + span - (includesEnd ? 0 : 1); + } + else + { + content.add(emptyRowAt(pos, timeGenerator)); + prev = pos; + } + } + + attachBoundaries(content); + if (reversed) + { + Collections.reverse(content); + } + verifyValid(content); + if (ITEMS <= 20) + dumpList(content); + return content; + } + + static void attachBoundaries(List content) + { + int di = 0; + RangeTombstoneMarker prev = null; + for (int si = 0; si < content.size(); ++si) + { + Unfiltered currUnfiltered = content.get(si); + RangeTombstoneMarker curr = currUnfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER ? + (RangeTombstoneMarker) currUnfiltered : + null; + if (prev != null && curr != null && prev.isClose(false) && curr.isOpen(false) && prev.clustering().invert().equals(curr.clustering())) + { + // Join. Prefer not to use merger to check its correctness. + RangeTombstone.Bound b = prev.clustering(); + b = b.withNewKind(b.isInclusive() ? RangeTombstone.Bound.Kind.INCL_END_EXCL_START_BOUNDARY : RangeTombstone.Bound.Kind.EXCL_END_INCL_START_BOUNDARY); + prev = new RangeTombstoneBoundaryMarker(b, prev.closeDeletionTime(false), curr.openDeletionTime(false)); + currUnfiltered = prev; + --di; + } + content.set(di++, currUnfiltered); + prev = curr; + } + for (int pos = content.size() - 1; pos >= di; --pos) + content.remove(pos); + } + + void verifyValid(List list) + { + int reversedAsMultiplier = reversed ? -1 : 1; + try { + RangeTombstoneMarker prev = null; + Unfiltered prevUnfiltered = null; + for (Unfiltered unfiltered : list) + { + Assert.assertTrue("Order violation prev " + str(prevUnfiltered) + " curr " + str(unfiltered), + prevUnfiltered == null || comparator.compare(prevUnfiltered, unfiltered) * reversedAsMultiplier < 0); + prevUnfiltered = unfiltered; + + if (unfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER) + { + RangeTombstoneMarker curr = (RangeTombstoneMarker) unfiltered; + if (prev != null) + { + if (curr.isClose(reversed)) + { + Assert.assertTrue(str(unfiltered) + " follows another close marker " + str(prev), prev.isOpen(reversed)); + Assert.assertEquals("Deletion time mismatch for open " + str(prev) + " and close " + str(unfiltered), + prev.openDeletionTime(reversed), + curr.closeDeletionTime(reversed)); + } + else + Assert.assertFalse(str(curr) + " follows another open marker " + str(prev), prev.isOpen(reversed)); + } + + prev = curr; + } + } + Assert.assertFalse("Cannot end in open marker " + str(prev), prev != null && prev.isOpen(reversed)); + + } catch (AssertionError e) { + System.out.println(e); + dumpList(list); + throw e; + } + } + + void verifyEquivalent(List> sources, List merged) + { + try + { + for (int i=0; i source : sources) + { + dt = deletionFor(c, source, dt); + } + Assert.assertEquals("Deletion time mismatch for position " + str(c), dt, deletionFor(c, merged)); + if (dt == DeletionTime.LIVE) + { + Optional sourceOpt = sources.stream().map(source -> rowFor(c, source)).filter(x -> x != null).findAny(); + Unfiltered mergedRow = rowFor(c, merged); + Assert.assertEquals("Content mismatch for position " + str(c), str(sourceOpt.orElse(null)), str(mergedRow)); + } + } + } + catch (AssertionError e) + { + System.out.println(e); + for (List list : sources) + dumpList(list); + System.out.println("merged"); + dumpList(merged); + throw e; + } + } + + private Unfiltered rowFor(Clusterable pointer, List list) + { + int index = Collections.binarySearch(list, pointer, reversed ? comparator.reversed() : comparator); + return index >= 0 ? list.get(index) : null; + } + + DeletionTime deletionFor(Clusterable pointer, List list) + { + return deletionFor(pointer, list, DeletionTime.LIVE); + } + + DeletionTime deletionFor(Clusterable pointer, List list, DeletionTime def) + { + if (list.isEmpty()) + return def; + + int index = Collections.binarySearch(list, pointer, reversed ? comparator.reversed() : comparator); + if (index < 0) + index = -1 - index; + else + { + Row row = (Row) list.get(index); + if (row.deletion() != null && row.deletion().supersedes(def)) + def = row.deletion(); + } + + if (index >= list.size()) + return def; + + while (--index >= 0) + { + Unfiltered unfiltered = list.get(index); + if (unfiltered.kind() == Kind.ROW) + continue; + RangeTombstoneMarker lower = (RangeTombstoneMarker) unfiltered; + if (!lower.isOpen(reversed)) + return def; + return lower.openDeletionTime(reversed).supersedes(def) ? lower.openDeletionTime(reversed) : def; + } + return def; + } + + private static Bound boundFor(int pos, boolean start, boolean inclusive) + { + return Bound.create(Bound.boundKind(start, inclusive), new ByteBuffer[] {Int32Type.instance.decompose(pos)}); + } + + private static SimpleClustering clusteringFor(int i) + { + return new SimpleClustering(Int32Type.instance.decompose(i)); + } + + static Row emptyRowAt(int pos, Function timeGenerator) + { + final Clustering clustering = clusteringFor(pos); + final LivenessInfo live = SimpleLivenessInfo.forUpdate(timeGenerator.apply(pos), 0, nowInSec, metadata); + return emptyRowAt(clustering, live, DeletionTime.LIVE); + } + + public static class TestCell extends AbstractCell + { + private final ColumnDefinition column; + private final ByteBuffer value; + private final LivenessInfo info; + + public TestCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info) + { + this.column = column; + this.value = value; + this.info = info.takeAlias(); + } + + @Override + public ColumnDefinition column() + { + return column; + } + + @Override + public boolean isCounterCell() + { + return false; + } + + @Override + public ByteBuffer value() + { + return value; + } + + @Override + public LivenessInfo livenessInfo() + { + return info; + } + + @Override + public CellPath path() + { + return null; + } + } + + static Row emptyRowAt(final Clustering clustering, final LivenessInfo live, final DeletionTime deletion) + { + final ColumnDefinition columnDef = metadata.getColumnDefinition(new ColumnIdentifier("data", true)); + final Cell cell = new TestCell(columnDef, clustering.get(0), live); + + return new AbstractRow() + { + @Override + public Columns columns() + { + return Columns.of(columnDef); + } + + @Override + public LivenessInfo primaryKeyLivenessInfo() + { + return live; + } + + @Override + public DeletionTime deletion() + { + return deletion; + } + + @Override + public boolean isEmpty() + { + return true; + } + + @Override + public boolean hasComplexDeletion() + { + return false; + } + + @Override + public Clustering clustering() + { + return clustering; + } + + @Override + public Cell getCell(ColumnDefinition c) + { + return c == columnDef ? cell : null; + } + + @Override + public Cell getCell(ColumnDefinition c, CellPath path) + { + return null; + } + + @Override + public Iterator getCells(ColumnDefinition c) + { + return Iterators.singletonIterator(cell); + } + + @Override + public DeletionTime getDeletion(ColumnDefinition c) + { + return DeletionTime.LIVE; + } + + @Override + public Iterator iterator() + { + return Iterators.emptyIterator(); + } + + @Override + public SearchIterator searchIterator() + { + return new SearchIterator() + { + @Override + public boolean hasNext() + { + return false; + } + + @Override + public ColumnData next(ColumnDefinition column) + { + return null; + } + }; + } + + @Override + public Kind kind() + { + return Unfiltered.Kind.ROW; + } + + @Override + public Row takeAlias() + { + return this; + } + + @Override + public String toString() + { + return Int32Type.instance.getString(clustering.get(0)); + } + }; + + } + + private void dumpList(List list) + { + for (Unfiltered u : list) + System.out.print(str(u) + " "); + System.out.println(); + } + + private String str(Clusterable curr) + { + if (curr == null) + return "null"; + String val = Int32Type.instance.getString(curr.clustering().get(0)); + if (curr instanceof RangeTombstoneMarker) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) curr; + if (marker.isClose(reversed)) + val = "[" + marker.closeDeletionTime(reversed).markedForDeleteAt() + "]" + (marker.closeIsInclusive(reversed) ? "<=" : "<") + val; + if (marker.isOpen(reversed)) + val = val + (marker.openIsInclusive(reversed) ? "<=" : "<") + "[" + marker.openDeletionTime(reversed).markedForDeleteAt() + "]"; + } + return val; + } + + class Source extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator + { + Iterator content; + + protected Source(Iterator content) + { + super(UnfilteredRowIteratorsMergeTest.metadata, + UnfilteredRowIteratorsMergeTest.partitionKey, + UnfilteredRowIteratorsMergeTest.partitionLevelDeletion, + UnfilteredRowIteratorsMergeTest.metadata.partitionColumns(), + null, + reversed, + RowStats.NO_STATS); + this.content = content; + } + + @Override + protected Unfiltered computeNext() + { + return content.hasNext() ? content.next() : endOfData(); + } + } + + static RangeTombstoneMarker safeMarker(RangeTombstoneMarker marker) + { + RangeTombstoneMarker.Builder writer = new RangeTombstoneMarker.Builder(1); + marker.copyTo(writer); + return writer.build(); + } + + private static Row safeRow(Row row) + { + return emptyRowAt(new SimpleClustering(row.clustering().get(0)), row.primaryKeyLivenessInfo(), row.deletion()); + } + + public static UnfilteredRowIterator safeIterator(UnfilteredRowIterator iterator) + { + return new WrappingUnfilteredRowIterator(iterator) + { + @Override + public Unfiltered next() + { + Unfiltered next = super.next(); + return next.kind() == Unfiltered.Kind.ROW + ? safeRow((Row) next) + : safeMarker((RangeTombstoneMarker) next); + } + }; + } + + public void testForInput(String... inputs) + { + List> sources = new ArrayList<>(); + for (String input : inputs) + { + List source = parse(input); + attachBoundaries(source); + dumpList(source); + verifyValid(source); + sources.add(source); + } + + List merged = merge(sources, false); + System.out.println("Merge to:"); + dumpList(merged); + verifyEquivalent(sources, merged); + verifyValid(merged); + System.out.println(); + } + + List parse(String input) + { + String[] split = input.split(" "); + Pattern open = Pattern.compile("(\\d+)<(=)?\\[(\\d+)\\]"); + Pattern close = Pattern.compile("\\[(\\d+)\\]<(=)?(\\d+)"); + Pattern row = Pattern.compile("(\\d+)(\\[(\\d+)\\])?"); + List out = new ArrayList<>(split.length); + for (String s : split) + { + Matcher m = open.matcher(s); + if (m.matches()) + { + out.add(openMarker(Integer.parseInt(m.group(1)), Integer.parseInt(m.group(3)), m.group(2) != null)); + continue; + } + m = close.matcher(s); + if (m.matches()) + { + out.add(closeMarker(Integer.parseInt(m.group(3)), Integer.parseInt(m.group(1)), m.group(2) != null)); + continue; + } + m = row.matcher(s); + if (m.matches()) + { + int live = m.group(3) != null ? Integer.parseInt(m.group(3)) : DEL_RANGE; + out.add(emptyRowAt(Integer.parseInt(m.group(1)), x -> live)); + continue; + } + Assert.fail("Can't parse " + s); + } + return out; + } + + private RangeTombstoneMarker openMarker(int pos, int delTime, boolean inclusive) + { + return marker(pos, delTime, true, inclusive); + } + + private RangeTombstoneMarker closeMarker(int pos, int delTime, boolean inclusive) + { + return marker(pos, delTime, false, inclusive); + } + + private RangeTombstoneMarker marker(int pos, int delTime, boolean isStart, boolean inclusive) + { + return new RangeTombstoneBoundMarker(Bound.create(Bound.boundKind(isStart, inclusive), + new ByteBuffer[] {clusteringFor(pos).get(0)}), + new SimpleDeletionTime(delTime, delTime)); + } +}