cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject cassandra git commit: Introduce a more efficient MergeIterator
Date Thu, 16 Jul 2015 11:42:31 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 43c3ab445 -> c29001b94


Introduce a more efficient MergeIterator

The implementation uses a binary heap with sorted initial section and
flagging of equality to performing advances by sinking the top elements
in the queue, which usually finishes in several times fewer comparisons
and swaps than poll + offer.

patch by branimir; reviewed by benedict for CASSANDRA-8915


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c29001b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c29001b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c29001b9

Branch: refs/heads/trunk
Commit: c29001b94df769fe428f414c042daffbb6dd209d
Parents: 43c3ab4
Author: Branimir Lambov <branimir.lambov@datastax.com>
Authored: Thu Jul 16 12:41:33 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Thu Jul 16 12:41:33 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/utils/MergeIterator.java   | 305 +++++++-
 .../apache/cassandra/utils/btree/BTreeSet.java  |   2 +-
 .../utils/MergeIteratorComparisonTest.java      | 733 +++++++++++++++++++
 3 files changed, 1004 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c29001b9/src/java/org/apache/cassandra/utils/MergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java b/src/java/org/apache/cassandra/utils/MergeIterator.java
index d0f116e..c903a0f 100644
--- a/src/java/org/apache/cassandra/utils/MergeIterator.java
+++ b/src/java/org/apache/cassandra/utils/MergeIterator.java
@@ -70,28 +70,87 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out>
implem
         reducer.close();
     }
 
-    /** A MergeIterator that consumes multiple input values per output value. */
-    private static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
+    /**
+     * A MergeIterator that consumes multiple input values per output value.
+     *
+     * The most straightforward way to implement this is to use a {@code PriorityQueue} of
iterators, {@code poll} it to
+     * find the next item to consume, then {@code add} the iterator back after advancing.
This is not very efficient as
+     * {@code poll} and {@code add} in all cases require at least {@code log(size)} comparisons
(usually more than
+     * {@code 2*log(size)}) per consumed item, even if the input is suitable for fast iteration.
+     *
+     * The implementation below makes use of the fact that replacing the top element in a
binary heap can be done much
+     * more efficiently than separately removing it and placing it back, especially in the
cases where the top iterator
+     * is to be used again very soon (e.g. when there are large sections of the output where
only a limited number of
+     * input iterators overlap, which is normally the case in many practically useful situations,
e.g. levelled
+     * compaction). To further improve this particular scenario, we also use a short sorted
section at the start of the
+     * queue.
+     *
+     * The heap is laid out as this (for {@code SORTED_SECTION_SIZE == 2}):
+     *                 0
+     *                 |
+     *                 1
+     *                 |
+     *                 2
+     *               /   \
+     *              3     4
+     *             / \   / \
+     *             5 6   7 8
+     *            .. .. .. ..
+     * Where each line is a <= relationship.
+     *
+     * In the sorted section we can advance with a single comparison per level, while advancing
a level within the heap
+     * requires two (so that we can find the lighter element to pop up).
+     * The sorted section adds a constant overhead when data is uniformly distributed among
the iterators, but may up
+     * to halve the iteration time when one iterator is dominant over sections of the merged
data (as is the case with
+     * non-overlapping iterators).
+     *
+     * The iterator is further complicated by the need to avoid advancing the input iterators
until an output is
+     * actually requested. To achieve this {@code consume} walks the heap to find equal items
without advancing the
+     * iterators, and {@code advance} moves them and restores the heap structure before any
items can be consumed.
+     * 
+     * To avoid having to do additional comparisons in consume to identify the equal items,
we keep track of equality
+     * between children and their parents in the heap. More precisely, the lines in the diagram
above define the
+     * following relationship:
+     *   parent <= child && (parent == child) == child.equalParent
+     * We can track, make use of and update the equalParent field without any additional
comparisons.
+     *
+     * For more formal definitions and proof of correctness, see CASSANDRA-8915.
+     */
+    static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
     {
-        // a queue for return: all candidates must be open and have at least one item
-        protected final PriorityQueue<Candidate<In>> queue;
-        // a stack of the last consumed candidates, so that we can lazily call 'advance()'
-        // TODO: if we had our own PriorityQueue implementation we could stash items
-        // at the end of its array, so we wouldn't need this storage
-        protected final ArrayDeque<Candidate<In>> candidates;
+        protected final Candidate<In>[] heap;
+
+        /** Number of non-exhausted iterators. */
+        int size;
+
+        /**
+         * Position of the deepest, right-most child that needs advancing before we can start
consuming.
+         * Because advancing changes the values of the items of each iterator, the parent-chain
from any position
+         * in this range that needs advancing is not in correct order. The trees rooted at
any position that does
+         * not need advancing, however, retain their prior-held binary heap property.
+         */
+        int needingAdvance;
+
+        /**
+         * The number of elements to keep in order before the binary heap starts, exclusive
of the top heap element.
+         */
+        static final int SORTED_SECTION_SIZE = 4;
+
         public ManyToOne(List<? extends Iterator<In>> iters, Comparator<?
super In> comp, Reducer<In, Out> reducer)
         {
             super(iters, reducer);
-            this.queue = new PriorityQueue<>(Math.max(1, iters.size()));
+
+            @SuppressWarnings("unchecked")
+            Candidate<In>[] heap = new Candidate[iters.size()];
+            this.heap = heap;
+            size = 0;
+
             for (int i = 0; i < iters.size(); i++)
             {
                 Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp);
-                if (!candidate.advance())
-                    // was empty
-                    continue;
-                this.queue.add(candidate);
+                heap[size++] = candidate;
             }
-            this.candidates = new ArrayDeque<>(queue.size());
+            needingAdvance = size;
         }
 
         protected final Out computeNext()
@@ -100,30 +159,191 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out>
implem
             return consume();
         }
 
-        /** Consume values by sending them to the reducer while they are equal. */
-        protected final Out consume()
+        /**
+         * Advance all iterators that need to be advanced and place them into suitable positions
in the heap.
+         *
+         * By walking the iterators backwards we know that everything after the point being
processed already forms
+         * correctly ordered subheaps, thus we can build a subheap rooted at the current
position by only sinking down
+         * the newly advanced iterator. Because all parents of a consumed iterator are also
consumed there is no way
+         * that we can process one consumed iterator but skip over its parent.
+         *
+         * The procedure is the same as the one used for the initial building of a heap in
the heapsort algorithm and
+         * has a maximum number of comparisons {@code (2 * log(size) + SORTED_SECTION_SIZE
/ 2)} multiplied by the
+         * number of iterators whose items were consumed at the previous step, but is also
at most linear in the size of
+         * the heap if the number of consumed elements is high (as it is in the initial heap
construction). With non- or
+         * lightly-overlapping iterators the procedure finishes after just one (resp. a couple
of) comparisons.
+         */
+        private void advance()
         {
-            reducer.onKeyChange();
-            Candidate<In> candidate = queue.peek();
-            if (candidate == null)
-                return endOfData();
-            do
+            // Turn the set of candidates into a heap.
+            for (int i = needingAdvance - 1; i >= 0; --i)
             {
-                candidate = queue.poll();
-                candidates.push(candidate);
-                reducer.reduce(candidate.idx, candidate.item);
+                Candidate<In> candidate = heap[i];
+                /**
+                 *  needingAdvance runs to the maximum index (and deepest-right node) that
may need advancing;
+                 *  since the equal items that were consumed at-once may occur in sub-heap
"veins" of equality,
+                 *  not all items above this deepest-right position may have been consumed;
these already form
+                 *  valid sub-heaps and can be skipped-over entirely
+                 */
+                if (candidate.needsAdvance())
+                    replaceAndSink(candidate.advance(), i);
             }
-            while (queue.peek() != null && queue.peek().compareTo(candidate) == 0);
+        }
+
+        /**
+         * Consume all items that sort like the current top of the heap. As we cannot advance
the iterators to let
+         * equivalent items pop up, we walk the heap to find them and mark them as needing
advance.
+         *
+         * This relies on the equalParent flag to avoid doing any comparisons.
+         */
+        private Out consume()
+        {
+            if (size == 0)
+                return endOfData();
+
+            reducer.onKeyChange();
+            assert !heap[0].equalParent;
+            reducer.reduce(heap[0].idx, heap[0].consume());
+            final int size = this.size;
+            final int sortedSectionSize = Math.min(size, SORTED_SECTION_SIZE);
+            int i;
+            consume: {
+                for (i = 1; i < sortedSectionSize; ++i)
+                {
+                    if (!heap[i].equalParent)
+                        break consume;
+                    reducer.reduce(heap[i].idx, heap[i].consume());
+                }
+                i = Math.max(i, consumeHeap(i) + 1);
+            }
+            needingAdvance = i;
             return reducer.getReduced();
         }
 
-        /** Advance and re-enqueue all items we consumed in the last iteration. */
-        protected final void advance()
+        /**
+         * Recursively consume all items equal to equalItem in the binary subheap rooted
at position idx.
+         *
+         * @return the largest equal index found in this search.
+         */
+        private int consumeHeap(int idx)
         {
-            Candidate<In> candidate;
-            while ((candidate = candidates.pollFirst()) != null)
-                if (candidate.advance())
-                    queue.add(candidate);
+            if (idx >= size || !heap[idx].equalParent)
+                return -1;
+
+            reducer.reduce(heap[idx].idx, heap[idx].consume());
+            int nextIdx = (idx << 1) - (SORTED_SECTION_SIZE - 1);
+            return Math.max(idx, Math.max(consumeHeap(nextIdx), consumeHeap(nextIdx + 1)));
+        }
+
+        /**
+         * Replace an iterator in the heap with the given position and move it down the heap
until it finds its proper
+         * position, pulling lighter elements up the heap.
+         *
+         * Whenever an equality is found between two elements that form a new parent-child
relationship, the child's
+         * equalParent flag is set to true if the elements are equal.
+         */
+        private void replaceAndSink(Candidate<In> candidate, int currIdx)
+        {
+            if (candidate == null)
+            {
+                // Drop iterator by replacing it with the last one in the heap.
+                candidate = heap[--size];
+                heap[size] = null; // not necessary but helpful for debugging
+            }
+            // The new element will be top of its heap, at this point there is no parent
to be equal to.
+            candidate.equalParent = false;
+
+            final int size = this.size;
+            final int sortedSectionSize = Math.min(size - 1, SORTED_SECTION_SIZE);
+
+            int nextIdx;
+
+            // Advance within the sorted section, pulling up items lighter than candidate.
+            while ((nextIdx = currIdx + 1) <= sortedSectionSize)
+            {
+                if (!heap[nextIdx].equalParent) // if we were greater then an (or were the)
equal parent, we are >= the child
+                {
+                    int cmp = candidate.compareTo(heap[nextIdx]);
+                    if (cmp <= 0)
+                    {
+                        heap[nextIdx].equalParent = cmp == 0;
+                        heap[currIdx] = candidate;
+                        return;
+                    }
+                }
+
+                heap[currIdx] = heap[nextIdx];
+                currIdx = nextIdx;
+            }
+            // If size <= SORTED_SECTION_SIZE, nextIdx below will be no less than size,
+            // because currIdx == sortedSectionSize == size - 1 and nextIdx becomes
+            // (size - 1) * 2) - (size - 1 - 1) == size.
+
+            // Advance in the binary heap, pulling up the lighter element from the two at
each level.
+            while ((nextIdx = (currIdx * 2) - (sortedSectionSize - 1)) + 1 < size)
+            {
+                if (!heap[nextIdx].equalParent)
+                {
+                    if (!heap[nextIdx + 1].equalParent)
+                    {
+                        // pick the smallest of the two children
+                        int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx]);
+                        if (siblingCmp < 0)
+                            ++nextIdx;
+
+                        // if we're smaller than this, we are done, and must only restore
the heap and equalParent properties
+                        int cmp = candidate.compareTo(heap[nextIdx]);
+                        if (cmp <= 0)
+                        {
+                            if (cmp == 0)
+                            {
+                                heap[nextIdx].equalParent = true;
+                                if (siblingCmp == 0) // siblingCmp == 0 => nextIdx is
the left child
+                                    heap[nextIdx + 1].equalParent = true;
+                            }
+
+                            heap[currIdx] = candidate;
+                            return;
+                        }
+
+                        if (siblingCmp == 0)
+                        {
+                            // siblingCmp == 0 => nextIdx is still the left child
+                            // if the two siblings were equal, and we are inserting something
greater, we will
+                            // pull up the left one; this means the right gets an equalParent
+                            heap[nextIdx + 1].equalParent = true;
+                        }
+                    }
+                    else
+                        ++nextIdx;  // descend down the path where we found the equal child
+                }
+
+                heap[currIdx] = heap[nextIdx];
+                currIdx = nextIdx;
+            }
+
+            // our loop guard ensures there are always two siblings to process; typically
when we exit the loop we will
+            // be well past the end of the heap and this next condition will match...
+            if (nextIdx >= size)
+            {
+                heap[currIdx] = candidate;
+                return;
+            }
+
+            // ... but sometimes we will have one last child to compare against, that has
no siblings
+            if (!heap[nextIdx].equalParent)
+            {
+                int cmp = candidate.compareTo(heap[nextIdx]);
+                if (cmp <= 0)
+                {
+                    heap[nextIdx].equalParent = cmp == 0;
+                    heap[currIdx] = candidate;
+                    return;
+                }
+            }
+
+            heap[currIdx] = heap[nextIdx];
+            heap[nextIdx] = candidate;
         }
     }
 
@@ -134,6 +354,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out>
implem
         private final Comparator<? super In> comp;
         private final int idx;
         private In item;
+        boolean equalParent;
 
         public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super
In> comp)
         {
@@ -142,19 +363,33 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out>
implem
             this.idx = idx;
         }
 
-        /** @return True if our iterator had an item, and it is now available */
-        protected boolean advance()
+        /** @return this if our iterator had an item, and it is now available, otherwise
null */
+        protected Candidate<In> advance()
         {
             if (!iter.hasNext())
-                return false;
+                return null;
             item = iter.next();
-            return true;
+            return this;
         }
 
         public int compareTo(Candidate<In> that)
         {
+            assert item != null && that.item != null;
             return comp.compare(this.item, that.item);
         }
+
+        public In consume()
+        {
+            In temp = item;
+            item = null;
+            assert temp != null;
+            return temp;
+        }
+
+        public boolean needsAdvance()
+        {
+            return item == null;
+        }
     }
 
     /** Accumulator that collects values of type A, and outputs a value of type B. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c29001b9/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
index d1271fc..7646693 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
@@ -175,7 +175,7 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
     @Override
     public BTreeSet<V> descendingSet()
     {
-        return new BTreeRange<>(this.tree, this.comparator).descendingSet();
+        return new BTreeRange<V>(this.tree, this.comparator).descendingSet();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c29001b9/test/unit/org/apache/cassandra/utils/MergeIteratorComparisonTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/MergeIteratorComparisonTest.java b/test/unit/org/apache/cassandra/utils/MergeIteratorComparisonTest.java
new file mode 100644
index 0000000..6a4bd2b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/MergeIteratorComparisonTest.java
@@ -0,0 +1,733 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.utils.MergeIterator.Candidate;
+import org.apache.cassandra.utils.MergeIterator.Reducer;
+
+public class MergeIteratorComparisonTest
+{
+    private static class CountingComparator<T> implements Comparator<T>
+    {
+        final Comparator<T> wrapped;
+        int count = 0;
+
+        protected CountingComparator(Comparator<T> wrapped)
+        {
+            this.wrapped = wrapped;
+        }
+
+        public int compare(T o1, T o2)
+        {
+            count++;
+            return wrapped.compare(o1, o2);
+        }
+    }
+
+    static int ITERATOR_COUNT = 15;
+    static int LIST_LENGTH = 15000;
+    static boolean BENCHMARK = false;
+
+    @Test
+    public void testRandomInts()
+    {
+        System.out.println("testRandomInts");
+        final Random r = new Random();
+        Reducer<Integer, Counted<Integer>> reducer = new Counter<Integer>();
+
+        List<List<Integer>> lists = new NaturalListGenerator<Integer>(ITERATOR_COUNT,
LIST_LENGTH) {
+            @Override
+            public Integer next()
+            {
+                return r.nextInt(5 * LIST_LENGTH);
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+    
+    @Test
+    public void testNonOverlapInts()
+    {
+        System.out.println("testNonOverlapInts");
+        Reducer<Integer, Counted<Integer>> reducer = new Counter<Integer>();
+
+        List<List<Integer>> lists = new NaturalListGenerator<Integer>(ITERATOR_COUNT,
LIST_LENGTH) {
+            int next = 1;
+            @Override
+            public Integer next()
+            {
+                return next++;
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+
+    @Test
+    public void testCombinationInts()
+    {
+        System.out.println("testCombinationInts");
+        final Random r = new Random();
+        Reducer<Integer, Counted<Integer>> reducer = new Counter<Integer>();
+
+        List<List<Integer>> lists = new NaturalListGenerator<Integer>(ITERATOR_COUNT,
LIST_LENGTH) {
+            int next = 1;
+            @Override
+            public Integer next()
+            {
+                return r.nextBoolean() ? r.nextInt(5 * LIST_LENGTH) : next++;
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+
+    @Test
+    public void testLCSTotalOverlap()
+    {
+        testLCS(2, LIST_LENGTH / 100, 1f);
+        testLCS(3, LIST_LENGTH / 100, 1f);
+        testLCS(3, LIST_LENGTH / 100, 1f, 10, LIST_LENGTH);
+        testLCS(4, LIST_LENGTH / 100, 1f);
+        testLCS(4, LIST_LENGTH / 100, 1f, 10, LIST_LENGTH);
+    }
+
+    @Test
+    public void testLCSPartialOverlap()
+    {
+        testLCS(2, LIST_LENGTH / 100, 0.5f);
+        testLCS(3, LIST_LENGTH / 100, 0.5f);
+        testLCS(3, LIST_LENGTH / 100, 0.5f, 10, LIST_LENGTH);
+        testLCS(4, LIST_LENGTH / 100, 0.5f);
+        testLCS(4, LIST_LENGTH / 100, 0.5f, 10, LIST_LENGTH);
+    }
+
+    @Test
+    public void testLCSNoOverlap()
+    {
+        testLCS(2, LIST_LENGTH / 100, 0f);
+        testLCS(3, LIST_LENGTH / 100, 0f);
+        testLCS(3, LIST_LENGTH / 100, 0f, 10, LIST_LENGTH);
+        testLCS(4, LIST_LENGTH / 100, 0f);
+        testLCS(4, LIST_LENGTH / 100, 0f, 10, LIST_LENGTH);
+    }
+
+    public void testLCS(int levelCount, int levelMultiplier, float levelOverlap)
+    {
+        testLCS(levelCount, levelMultiplier, levelOverlap, 0, 0);
+    }
+    public void testLCS(int levelCount, int levelMultiplier, float levelOverlap, int countOfL0,
int sizeOfL0)
+    {
+        System.out.printf("testLCS(lc=%d,lm=%d,o=%.2f,L0=%d*%d)\n", levelCount, levelMultiplier,
levelOverlap, countOfL0, countOfL0 == 0 ? 0 : sizeOfL0 / countOfL0);
+        final Random r = new Random();
+        Reducer<Integer, Counted<Integer>> reducer = new Counter<Integer>();
+        List<List<Integer>> lists = new LCSGenerator<Integer>(Ordering.<Integer>natural(),
levelCount, levelMultiplier, levelOverlap) {
+            @Override
+            public Integer newItem()
+            {
+                return r.nextInt();
+            }
+        }.result;
+        if (sizeOfL0 > 0 && countOfL0 > 0)
+            lists.addAll(new NaturalListGenerator<Integer>(countOfL0, sizeOfL0 / countOfL0)
+            {
+                Integer next()
+                {
+                    return r.nextInt();
+                }
+            }.result);
+        testMergeIterator(reducer, lists);
+    }
+
+    @Test
+    public void testRandomStrings()
+    {
+        System.out.println("testRandomStrings");
+        final Random r = new Random();
+        Reducer<String, Counted<String>> reducer = new Counter<String>();
+
+        List<List<String>> lists = new NaturalListGenerator<String>(ITERATOR_COUNT,
LIST_LENGTH) {
+            @Override
+            public String next()
+            {
+                return "longish_prefix_" + r.nextInt(5 * LIST_LENGTH);
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+    
+    @Test
+    public void testNonOverlapStrings()
+    {
+        System.out.println("testNonOverlapStrings");
+        Reducer<String, Counted<String>> reducer = new Counter<String>();
+
+        List<List<String>> lists = new NaturalListGenerator<String>(ITERATOR_COUNT,
LIST_LENGTH) {
+            int next = 1;
+            @Override
+            public String next()
+            {
+                return "longish_prefix_" + next++;
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+
+    @Test
+    public void testCombinationStrings()
+    {
+        System.out.println("testCombinationStrings");
+        final Random r = new Random();
+        Reducer<String, Counted<String>> reducer = new Counter<String>();
+
+        List<List<String>> lists = new NaturalListGenerator<String>(ITERATOR_COUNT,
LIST_LENGTH) {
+            int next = 1;
+            public String next()
+            {
+                return "longish_prefix_" + (r.nextBoolean() ? r.nextInt(5 * LIST_LENGTH)
: next++);
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+
+    @Test
+    public void testTimeUuids()
+    {
+        System.out.println("testTimeUuids");
+        Reducer<UUID, Counted<UUID>> reducer = new Counter<UUID>();
+
+        List<List<UUID>> lists = new NaturalListGenerator<UUID>(ITERATOR_COUNT,
LIST_LENGTH) {
+            @Override
+            public UUID next()
+            {
+                return UUIDGen.getTimeUUID();
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+
+    @Test
+    public void testRandomUuids()
+    {
+        System.out.println("testRandomUuids");
+        Reducer<UUID, Counted<UUID>> reducer = new Counter<UUID>();
+
+        List<List<UUID>> lists = new NaturalListGenerator<UUID>(ITERATOR_COUNT,
LIST_LENGTH) {
+            @Override
+            public UUID next()
+            {
+                return UUID.randomUUID();
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+
+    @Test
+    public void testTimeUuidType()
+    {
+        System.out.println("testTimeUuidType");
+        final AbstractType<UUID> type = TimeUUIDType.instance;
+        Reducer<ByteBuffer, Counted<ByteBuffer>> reducer = new Counter<ByteBuffer>();
+
+        List<List<ByteBuffer>> lists = new SimpleListGenerator<ByteBuffer>(type,
ITERATOR_COUNT, LIST_LENGTH) {
+            @Override
+            public ByteBuffer next()
+            {
+                return type.decompose(UUIDGen.getTimeUUID());
+            }
+        }.result;
+        testMergeIterator(reducer, lists, type);
+    }
+
+    @Test
+    public void testUuidType()
+    {
+        System.out.println("testUuidType");
+        final AbstractType<UUID> type = UUIDType.instance;
+        Reducer<ByteBuffer, Counted<ByteBuffer>> reducer = new Counter<ByteBuffer>();
+
+        List<List<ByteBuffer>> lists = new SimpleListGenerator<ByteBuffer>(type,
ITERATOR_COUNT, LIST_LENGTH) {
+            @Override
+            public ByteBuffer next()
+            {
+                return type.decompose(UUIDGen.getTimeUUID());
+            }
+        }.result;
+        testMergeIterator(reducer, lists, type);
+    }
+
+    
+    @Test
+    public void testSets()
+    {
+        System.out.println("testSets");
+        final Random r = new Random();
+
+        Reducer<KeyedSet<Integer, UUID>, KeyedSet<Integer, UUID>> reducer
= new Union<Integer, UUID>();
+
+        List<List<KeyedSet<Integer, UUID>>> lists = new NaturalListGenerator<KeyedSet<Integer,
UUID>>(ITERATOR_COUNT, LIST_LENGTH) {
+            @Override
+            public KeyedSet<Integer, UUID> next()
+            {
+                return new KeyedSet<>(r.nextInt(5 * LIST_LENGTH), UUIDGen.getTimeUUID());
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+    /* */
+
+    @Test
+    public void testLimitedOverlapStrings2()
+    {
+        System.out.println("testLimitedOverlapStrings2");
+        Reducer<String, Counted<String>> reducer = new Counter<String>();
+
+        List<List<String>> lists = new NaturalListGenerator<String>(ITERATOR_COUNT,
LIST_LENGTH) {
+            int next = 0;
+            @Override
+            public String next()
+            {
+                ++next;
+                int list = next / LIST_LENGTH;
+                int id = next % LIST_LENGTH;
+                return "longish_prefix_" + (id + list * LIST_LENGTH / 2);
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+
+    @Test
+    public void testLimitedOverlapStrings3()
+    {
+        System.out.println("testLimitedOverlapStrings3");
+        Reducer<String, Counted<String>> reducer = new Counter<String>();
+
+        List<List<String>> lists = new NaturalListGenerator<String>(ITERATOR_COUNT,
LIST_LENGTH) {
+            int next = 0;
+            @Override
+            public String next()
+            {
+                ++next;
+                int list = next / LIST_LENGTH;
+                int id = next % LIST_LENGTH;
+                return "longish_prefix_" + (id + list * LIST_LENGTH / 3);
+            }
+        }.result;
+        testMergeIterator(reducer, lists);
+    }
+
+    private static abstract class ListGenerator<T>
+    {
+        abstract boolean hasMoreItems();
+        abstract boolean hasMoreLists();
+        abstract T next();
+
+        final Comparator<T> comparator;
+        final List<List<T>> result = Lists.newArrayList();
+
+        protected ListGenerator(Comparator<T> comparator)
+        {
+            this.comparator = comparator;
+        }
+
+        void build()
+        {
+            while (hasMoreLists())
+            {
+                List<T> l = Lists.newArrayList();
+                while (hasMoreItems())
+                    l.add(next());
+                Collections.sort(l, comparator);
+                result.add(l);
+            }
+        }
+    }
+
+    private static abstract class NaturalListGenerator<T extends Comparable<T>>
extends SimpleListGenerator<T>
+    {
+        private NaturalListGenerator(int listCount, int perListCount)
+        {
+            super(Ordering.natural(), listCount, perListCount);
+        }
+    }
+    private static abstract class SimpleListGenerator<T> extends ListGenerator<T>
+    {
+        final int listCount;
+        final int perListCount;
+
+        int listIdx = 0, itemIdx = 0;
+
+        private SimpleListGenerator(Comparator<T> comparator, int listCount, int perListCount)
+        {
+            super(comparator);
+            this.listCount = listCount;
+            this.perListCount = perListCount;
+            build();
+        }
+
+        public boolean hasMoreItems()
+        {
+            return itemIdx++ < perListCount;
+        }
+
+        public boolean hasMoreLists()
+        {
+            itemIdx = 0;
+            return listIdx++ < listCount;
+        }
+    }
+
+    private static abstract class LCSGenerator<T> extends ListGenerator<T>
+    {
+        final int levelCount;
+        final int itemMultiplier;
+        final float levelOverlap;
+
+        int levelIdx, itemIdx;
+        int levelItems, overlapItems, runningTotalItems;
+        final Random random = new Random();
+
+        public LCSGenerator(Comparator<T> comparator, int levelCount, int l1Items,
float levelOverlap)
+        {
+            super(comparator);
+            this.levelCount = levelCount;
+            this.itemMultiplier = l1Items;
+            this.levelOverlap = levelOverlap;
+            build();
+        }
+
+        public boolean hasMoreItems()
+        {
+            return itemIdx++ < levelItems;
+        }
+
+        public boolean hasMoreLists()
+        {
+            if (result.size() > 0)
+                runningTotalItems += result.get(result.size() - 1).size();
+            itemIdx = 0;
+            levelItems = itemMultiplier * (int)Math.pow(10, levelCount - levelIdx);
+            overlapItems = levelIdx == 0 ? 0 : (int) (levelItems * levelOverlap);
+            return levelIdx++ < levelCount;
+        }
+
+        abstract T newItem();
+
+        T next()
+        {
+            if (itemIdx < overlapItems)
+            {
+                int item = random.nextInt(runningTotalItems);
+                for (List<T> list : result)
+                {
+                    if (item < list.size()) return list.get(item);
+                    else item -= list.size();
+                }
+            }
+            return newItem();
+        }
+    }
+
+    public <T extends Comparable<T>> void testMergeIterator(Reducer<T, ?>
reducer, List<List<T>> lists)
+    {
+        testMergeIterator(reducer, lists, Ordering.natural());
+    }
+    public <T> void testMergeIterator(Reducer<T, ?> reducer, List<List<T>>
lists, Comparator<T> comparator)
+    {
+        {
+            IMergeIterator<T,?> tested = MergeIterator.get(closeableIterators(lists),
comparator, reducer);
+            IMergeIterator<T,?> base = new MergeIteratorPQ<>(closeableIterators(lists),
comparator, reducer);
+            // If test fails, try the version below for improved reporting:
+            Object[] basearr = Iterators.toArray(base, Object.class);
+            Assert.assertArrayEquals(basearr, Iterators.toArray(tested, Object.class));
+            //Assert.assertTrue(Iterators.elementsEqual(base, tested));
+            if (!BENCHMARK)
+                return;
+        }
+
+        CountingComparator<T> cmp, cmpb;
+        cmp = new CountingComparator<>(comparator); cmpb = new CountingComparator<>(comparator);
+        System.out.println();
+        for (int i=0; i<10; ++i) {
+            benchmarkIterator(MergeIterator.get(closeableIterators(lists), cmp, reducer),
cmp);
+            benchmarkIterator(new MergeIteratorPQ<>(closeableIterators(lists), cmpb,
reducer), cmpb);
+        }
+        System.out.format("MI: %.2f\n", cmp.count / (double) cmpb.count);
+    }
+    
+    public <T> void benchmarkIterator(IMergeIterator<T, ?> it, CountingComparator<T>
comparator)
+    {
+        System.out.format("Testing %30s... ", it.getClass().getSimpleName());
+        long time = System.currentTimeMillis();
+        Object value = null;
+        while (it.hasNext())
+            value = it.next();
+        time = System.currentTimeMillis() - time;
+        String type = "";
+        if (value instanceof Counted<?>)
+        {
+            type = "type " + ((Counted<?>)value).item.getClass().getSimpleName();
+        }
+        System.out.format("%15s time %5dms; comparisons: %d\n", type, time, comparator.count);
+    }
+
+    public <T> List<CloseableIterator<T>> closeableIterators(List<List<T>>
iterators)
+    {
+        return Lists.transform(iterators, new Function<List<T>, CloseableIterator<T>>()
{
+
+            @Override
+            public CloseableIterator<T> apply(List<T> arg)
+            {
+                return new CLI<T>(arg.iterator());
+            }
+        });
+    }
+
+    static class Counted<T> {
+        T item;
+        int count;
+        
+        Counted(T item) {
+            this.item = item;
+            count = 0;
+        }
+
+        public boolean equals(Object obj)
+        {
+            if (obj == null || !(obj instanceof Counted))
+                return false;
+            Counted<?> c = (Counted<?>) obj;
+            return Objects.equal(item, c.item) && count == c.count;
+        }
+
+        @Override
+        public String toString()
+        {
+            return item.toString() + "x" + count;
+        }
+    }
+    
+    static class Counter<T> extends Reducer<T, Counted<T>> {
+        Counted<T> current = null;
+        boolean read = true;
+
+        @Override
+        public void reduce(int idx, T next)
+        {
+            if (current == null)
+                current = new Counted<T>(next);
+            assert current.item.equals(next);
+            ++current.count;
+        }
+
+        @Override
+        protected void onKeyChange()
+        {
+            assert read;
+            current = null;
+            read = false;
+        }
+
+        @Override
+        protected Counted<T> getReduced()
+        {
+            assert current != null;
+            read = true;
+            return current;
+        }
+    }
+    
+    static class KeyedSet<K extends Comparable<? super K>, V> extends Pair<K,
Set<V>> implements Comparable<KeyedSet<K, V>>
+    {
+        protected KeyedSet(K left, V right)
+        {
+            super(left, ImmutableSet.of(right));
+        }
+        
+        protected KeyedSet(K left, Collection<V> right)
+        {
+            super(left, Sets.newHashSet(right));
+        }
+
+        @Override
+        public int compareTo(KeyedSet<K, V> o)
+        {
+            return left.compareTo(o.left);
+        }
+    }
+    
+    static class Union<K extends Comparable<K>, V> extends Reducer<KeyedSet<K,
V>, KeyedSet<K, V>> {
+        KeyedSet<K, V> current = null;
+        boolean read = true;
+
+        @Override
+        public void reduce(int idx, KeyedSet<K, V> next)
+        {
+            if (current == null)
+                current = new KeyedSet<>(next.left, next.right);
+            else {
+                assert current.left.equals(next.left);
+                current.right.addAll(next.right);
+            }
+        }
+
+        @Override
+        protected void onKeyChange()
+        {
+            assert read;
+            current = null;
+            read = false;
+        }
+
+        @Override
+        protected KeyedSet<K, V> getReduced()
+        {
+            assert current != null;
+            read = true;
+            return current;
+        }
+    }
+    
+    // closeable list iterator
+    public static class CLI<E> extends AbstractIterator<E> implements CloseableIterator<E>
+    {
+        Iterator<E> iter;
+        boolean closed = false;
+        public CLI(Iterator<E> items)
+        {
+            this.iter = items;
+        }
+
+        protected E computeNext()
+        {
+            if (!iter.hasNext()) return endOfData();
+            return iter.next();
+        }
+
+        public void close()
+        {
+            assert !this.closed;
+            this.closed = true;
+        }
+    }
+
+    // Old MergeIterator implementation for comparison.
+    public class MergeIteratorPQ<In,Out> extends MergeIterator<In,Out> implements
IMergeIterator<In, Out>
+    {
+        // a queue for return: all candidates must be open and have at least one item
+        protected final PriorityQueue<CandidatePQ<In>> queue;
+        // a stack of the last consumed candidates, so that we can lazily call 'advance()'
+        // TODO: if we had our own PriorityQueue implementation we could stash items
+        // at the end of its array, so we wouldn't need this storage
+        protected final ArrayDeque<CandidatePQ<In>> candidates;
+        public MergeIteratorPQ(List<? extends Iterator<In>> iters, Comparator<In>
comp, Reducer<In, Out> reducer)
+        {
+            super(iters, reducer);
+            this.queue = new PriorityQueue<>(Math.max(1, iters.size()));
+            for (int i = 0; i < iters.size(); i++)
+            {
+                CandidatePQ<In> candidate = new CandidatePQ<>(i, iters.get(i),
comp);
+                if (!candidate.advance())
+                    // was empty
+                    continue;
+                this.queue.add(candidate);
+            }
+            this.candidates = new ArrayDeque<>(queue.size());
+        }
+
+        protected final Out computeNext()
+        {
+            advance();
+            return consume();
+        }
+
+        /** Consume values by sending them to the reducer while they are equal. */
+        protected final Out consume()
+        {
+            CandidatePQ<In> candidate = queue.peek();
+            if (candidate == null)
+                return endOfData();
+            reducer.onKeyChange();
+            do
+            {
+                candidate = queue.poll();
+                candidates.push(candidate);
+                reducer.reduce(candidate.idx, candidate.item);
+            }
+            while (queue.peek() != null && queue.peek().compareTo(candidate) == 0);
+            return reducer.getReduced();
+        }
+
+        /** Advance and re-enqueue all items we consumed in the last iteration. */
+        protected final void advance()
+        {
+            CandidatePQ<In> candidate;
+            while ((candidate = candidates.pollFirst()) != null)
+                if (candidate.advance())
+                    queue.add(candidate);
+        }
+    }
+
+    // Holds and is comparable by the head item of an iterator it owns
+    protected static final class CandidatePQ<In> implements Comparable<CandidatePQ<In>>
+    {
+        private final Iterator<? extends In> iter;
+        private final Comparator<? super In> comp;
+        private final int idx;
+        private In item;
+        boolean equalParent;
+
+        public CandidatePQ(int idx, Iterator<? extends In> iter, Comparator<? super
In> comp)
+        {
+            this.iter = iter;
+            this.comp = comp;
+            this.idx = idx;
+        }
+
+        /** @return true if our iterator had an item, and it is now available */
+        protected boolean advance()
+        {
+            if (!iter.hasNext())
+                return false;
+            item = iter.next();
+            return true;
+        }
+
+        public int compareTo(CandidatePQ<In> that)
+        {
+            return comp.compare(this.item, that.item);
+        }
+    }
+}


Mime
View raw message