cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject cassandra git commit: Introduce test-burn ant target
Date Wed, 06 May 2015 16:52:05 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk aa811c393 -> a3e041848


Introduce test-burn ant target

patch by ariel for CASSANDRA-9307


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3e04184
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3e04184
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3e04184

Branch: refs/heads/trunk
Commit: a3e041848bd6410f0627dce52725811602fd579f
Parents: aa811c3
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Wed May 6 17:51:48 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Wed May 6 17:51:48 2015 +0100

----------------------------------------------------------------------
 .../cassandra/concurrent/LongOpOrderTest.java   | 240 +++++++++
 .../concurrent/LongSharedExecutorPoolTest.java  | 226 +++++++++
 .../apache/cassandra/utils/LongBTreeTest.java   | 502 +++++++++++++++++++
 3 files changed, 968 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3e04184/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
new file mode 100644
index 0000000..d7105df
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -0,0 +1,240 @@
+package org.apache.cassandra.concurrent;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.junit.Assert.assertTrue;
+
+// TODO: we don't currently test SAFE functionality at all!
+// TODO: should also test markBlocking and SyncOrdered
+public class LongOpOrderTest
+{
+
+    private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
+
+    static final int CONSUMERS = 4;
+    static final int PRODUCERS = 32;
+
+    static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
+    static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
+
+    static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
+    {
+        @Override
+        public void uncaughtException(Thread t, Throwable e)
+        {
+            System.err.println(t.getName() + ": " + e.getMessage());
+            e.printStackTrace();
+        }
+    };
+
+    final OpOrder order = new OpOrder();
+    final AtomicInteger errors = new AtomicInteger();
+
+    class TestOrdering implements Runnable
+    {
+
+        final int[] waitNanos = new int[1 << 16];
+        volatile State state = new State();
+        final ScheduledExecutorService sched;
+
+        TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
+        {
+            this.sched = sched;
+            final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            for (int i = 0 ; i < waitNanos.length ; i++)
+                waitNanos[i] = rnd.nextInt(5000);
+            for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
+                exec.execute(new Producer());
+            exec.execute(this);
+        }
+
+        @Override
+        public void run()
+        {
+            final long until = System.currentTimeMillis() + RUNTIME;
+            long lastReport = System.currentTimeMillis();
+            long count = 0;
+            long opCount = 0;
+            while (true)
+            {
+                long now = System.currentTimeMillis();
+                if (now > until)
+                    break;
+                if (now > lastReport + REPORT_INTERVAL)
+                {
+                    lastReport = now;
+                    logger.info(String.format("%s: Executed %d barriers with %d operations.
%.0f%% complete.",
+                            Thread.currentThread().getName(), count, opCount, 100 * (1 -
((until - now) / (double) RUNTIME))));
+                }
+                try
+                {
+                    Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
+                } catch (InterruptedException e)
+                {
+                    e.printStackTrace();
+                }
+
+                final State s = state;
+                s.barrier = order.newBarrier();
+                s.replacement = new State();
+                s.barrier.issue();
+                s.barrier.await();
+                s.check();
+                opCount += s.totalCount();
+                state = s.replacement;
+                sched.schedule(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        s.check();
+                    }
+                }, 1, TimeUnit.SECONDS);
+                count++;
+            }
+        }
+
+        class State
+        {
+
+            volatile OpOrder.Barrier barrier;
+            volatile State replacement;
+            final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+            int checkCount = -1;
+
+            boolean accept(OpOrder.Group opGroup)
+            {
+                if (barrier != null && !barrier.isAfter(opGroup))
+                    return false;
+                AtomicInteger c;
+                if (null == (c = count.get(opGroup)))
+                {
+                    count.putIfAbsent(opGroup, new AtomicInteger());
+                    c = count.get(opGroup);
+                }
+                c.incrementAndGet();
+                return true;
+            }
+
+            int totalCount()
+            {
+                int c = 0;
+                for (AtomicInteger v : count.values())
+                    c += v.intValue();
+                return c;
+            }
+
+            void check()
+            {
+                boolean delete;
+                if (checkCount >= 0)
+                {
+                    if (checkCount != totalCount())
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Received size changed after barrier finished: {} vs
{}", checkCount, totalCount());
+                    }
+                    delete = true;
+                }
+                else
+                {
+                    checkCount = totalCount();
+                    delete = false;
+                }
+                for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
+                {
+                    if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Received an operation that was created after the barrier
was issued.");
+                    }
+                    if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(),
e.getValue().intValue());
+                    }
+                    if (delete)
+                        TestOrdering.this.count.remove(e.getKey());
+                }
+            }
+
+        }
+
+        final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+
+        class Producer implements Runnable
+        {
+            public void run()
+            {
+                while (true)
+                {
+                    AtomicInteger c;
+                    try (OpOrder.Group opGroup = order.start())
+                    {
+                        if (null == (c = count.get(opGroup)))
+                        {
+                            count.putIfAbsent(opGroup, new AtomicInteger());
+                            c = count.get(opGroup);
+                        }
+                        c.incrementAndGet();
+                        State s = state;
+                        while (!s.accept(opGroup))
+                            s = s.replacement;
+                    }
+                }
+            }
+        }
+
+    }
+
+    @Test
+    public void testOrdering() throws InterruptedException
+    {
+        errors.set(0);
+        Thread.setDefaultUncaughtExceptionHandler(handler);
+        final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
+        final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new
NamedThreadFactory("checker"));
+        for (int i = 0 ; i < CONSUMERS ; i++)
+            new TestOrdering(exec, checker);
+        exec.shutdown();
+        exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
+        assertTrue(exec.isShutdown());
+        assertTrue(errors.get() == 0);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3e04184/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
new file mode 100644
index 0000000..fe464c7
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.BitSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.math3.distribution.WeibullDistribution;
+import org.junit.Test;
+
+public class LongSharedExecutorPoolTest
+{
+
+    private static final class WaitTask implements Runnable
+    {
+        final long nanos;
+
+        private WaitTask(long nanos)
+        {
+            this.nanos = nanos;
+        }
+
+        public void run()
+        {
+            LockSupport.parkNanos(nanos);
+        }
+    }
+
+    private static final class Result implements Comparable<Result>
+    {
+        final Future<?> future;
+        final long forecastedCompletion;
+
+        private Result(Future<?> future, long forecastedCompletion)
+        {
+            this.future = future;
+            this.forecastedCompletion = forecastedCompletion;
+        }
+
+        public int compareTo(Result that)
+        {
+            int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
+            if (c != 0)
+                return c;
+            c = Integer.compare(this.hashCode(), that.hashCode());
+            if (c != 0)
+                return c;
+            return Integer.compare(this.future.hashCode(), that.future.hashCode());
+        }
+    }
+
+    private static final class Batch implements Comparable<Batch>
+    {
+        final TreeSet<Result> results;
+        final long timeout;
+        final int executorIndex;
+
+        private Batch(TreeSet<Result> results, long timeout, int executorIndex)
+        {
+            this.results = results;
+            this.timeout = timeout;
+            this.executorIndex = executorIndex;
+        }
+
+        public int compareTo(Batch that)
+        {
+            int c = Long.compare(this.timeout, that.timeout);
+            if (c != 0)
+                return c;
+            c = Integer.compare(this.results.size(), that.results.size());
+            if (c != 0)
+                return c;
+            return Integer.compare(this.hashCode(), that.hashCode());
+        }
+    }
+
+    @Test
+    public void testPromptnessOfExecution() throws InterruptedException, ExecutionException
+    {
+        testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
+    }
+
+    private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws
InterruptedException, ExecutionException
+    {
+        final int executorCount = 4;
+        int threadCount = 8;
+        int maxQueued = 1024;
+        final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
+        final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
+        final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
+
+        final int[] threadCounts = new int[executorCount];
+        final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
+        final ExecutorService[] executors = new ExecutorService[executorCount];
+        for (int i = 0 ; i < executors.length ; i++)
+        {
+            executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued,
"test" + i, "test" + i);
+            threadCounts[i] = threadCount;
+            workCount[i] = new WeibullDistribution(2, maxQueued);
+            threadCount *= 2;
+            maxQueued *= 2;
+        }
+
+        long runs = 0;
+        long events = 0;
+        final TreeSet<Batch> pending = new TreeSet<>();
+        final BitSet executorsWithWork = new BitSet(executorCount);
+        long until = 0;
+        // basic idea is to go through different levels of load on the executor service;
initially is all small batches
+        // (mostly within max queue size) of very short operations, moving to progressively
larger batches
+        // (beyond max queued size), and longer operations
+        for (float multiplier = 0f ; multiplier < 2.01f ; )
+        {
+            if (System.nanoTime() > until)
+            {
+                System.out.println(String.format("Completed %.0fK batches with %.1fM events",
runs * 0.001f, events * 0.000001f));
+                events = 0;
+                until = System.nanoTime() + intervalNanos;
+                multiplier += loadIncrement;
+                System.out.println(String.format("Running for %ds with load multiplier %.1f",
TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
+            }
+
+            // wait a random amount of time so we submit new tasks in various stages of
+            long timeout;
+            if (pending.isEmpty()) timeout = 0;
+            else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
+            else if (pending.size() == executorCount) timeout = pending.first().timeout;
+            else timeout = (long) (Math.random() * pending.last().timeout);
+
+            while (!pending.isEmpty() && timeout > System.nanoTime())
+            {
+                Batch first = pending.first();
+                boolean complete = false;
+                try
+                {
+                    for (Result result : first.results.descendingSet())
+                        result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
+                    complete = true;
+                }
+                catch (TimeoutException e)
+                {
+                }
+                if (!complete && System.nanoTime() > first.timeout)
+                {
+                    for (Result result : first.results)
+                        if (!result.future.isDone())
+                            throw new AssertionError();
+                    complete = true;
+                }
+                if (complete)
+                {
+                    pending.pollFirst();
+                    executorsWithWork.clear(first.executorIndex);
+                }
+            }
+
+            // if we've emptied the executors, give all our threads an opportunity to spin
down
+            if (timeout == Long.MAX_VALUE)
+                Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+
+            // submit a random batch to the first free executor service
+            int executorIndex = executorsWithWork.nextClearBit(0);
+            if (executorIndex >= executorCount)
+                continue;
+            executorsWithWork.set(executorIndex);
+            ExecutorService executor = executors[executorIndex];
+            TreeSet<Result> results = new TreeSet<>();
+            int count = (int) (workCount[executorIndex].sample() * multiplier);
+            long targetTotalElapsed = 0;
+            long start = System.nanoTime();
+            long baseTime;
+            if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
+            else  baseTime = 0;
+            for (int j = 0 ; j < count ; j++)
+            {
+                long time;
+                if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
+                else time = (long) (baseTime * Math.random());
+                if (time < minWorkTime)
+                    time = minWorkTime;
+                if (time > maxWorkTime)
+                    time = maxWorkTime;
+                targetTotalElapsed += time;
+                Future<?> future = executor.submit(new WaitTask(time));
+                results.add(new Result(future, System.nanoTime() + time));
+            }
+            long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
+                       + TimeUnit.MILLISECONDS.toNanos(100L);
+            long now = System.nanoTime();
+            if (runs++ > executorCount && now > end)
+                throw new AssertionError();
+            events += results.size();
+            pending.add(new Batch(results, end, executorIndex));
+//            System.out.println(String.format("Submitted batch to executor %d with %d items
and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
+        }
+    }
+
+    public static void main(String[] args) throws InterruptedException, ExecutionException
+    {
+        // do longer test
+        new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L),
0.1f);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3e04184/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
new file mode 100644
index 0000000..9641930
--- /dev/null
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+// TODO : should probably lower fan-factor for tests to make them more intensive
+public class LongBTreeTest
+{
+
+    private static final MetricRegistry metrics = new MetricRegistry();
+    private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class,
"BTREE"));
+    private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class,
"TREE"));
+    private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new NamedThreadFactory("MODIFY"));
+    private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new NamedThreadFactory("COMPARE"));
+    private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new
Random(), 0.0001f);
+
+    static
+    {
+        System.setProperty("cassandra.btree.fanfactor", "4");
+    }
+
+    @Test
+    public void testOversizedMiddleInsert()
+    {
+        TreeSet<Integer> canon = new TreeSet<>();
+        for (int i = 0 ; i < 10000000 ; i++)
+            canon.add(i);
+        Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE),
ICMP, true, null);
+        btree = BTree.update(btree, ICMP, canon, true);
+        canon.add(Integer.MIN_VALUE);
+        canon.add(Integer.MAX_VALUE);
+        Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
+        testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
+    }
+
+    @Test
+    public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
+    {
+        testInsertions(10000000, 50, 1, 1, true);
+    }
+
+    @Test
+    public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
+    {
+        testInsertions(10000000, 50, 1, 5, true);
+    }
+
+    @Test
+    public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
+    {
+        testInsertions(10000000, 500, 10, 1, true);
+    }
+
+    @Test
+    public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
+    {
+        testInsertions(10000000, 500, 10, 10, true);
+    }
+
+    @Test
+    public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
+    {
+        testInsertions(100000000, 5000, 3, 100, true);
+    }
+
+    @Test
+    public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
+    {
+        testInsertions(10000, 50, 10, 10, false);
+    }
+
+    @Test
+    public void testSearchIterator() throws InterruptedException
+    {
+        int threads = Runtime.getRuntime().availableProcessors();
+        final CountDownLatch latch = new CountDownLatch(threads);
+        final AtomicLong errors = new AtomicLong();
+        final AtomicLong count = new AtomicLong();
+        final int perThreadTrees = 100;
+        final int perTreeSelections = 100;
+        final long totalCount = threads * perThreadTrees * perTreeSelections;
+        for (int t = 0 ; t < threads ; t++)
+        {
+            MODIFY.execute(new Runnable()
+            {
+                public void run()
+                {
+                    ThreadLocalRandom random = ThreadLocalRandom.current();
+                    for (int i = 0 ; i < perThreadTrees ; i++)
+                    {
+                        Object[] tree = randomTree(10000, random);
+                        for (int j = 0 ; j < perTreeSelections ; j++)
+                        {
+                            BTreeSearchIterator<Integer, Integer, Integer> searchIterator
= new BTreeSearchIterator<>(tree, ICMP);
+                            for (Integer key : randomSelection(tree, random))
+                                if (key != searchIterator.next(key))
+                                    errors.incrementAndGet();
+                            searchIterator = new BTreeSearchIterator<Integer, Integer,
Integer>(tree, ICMP);
+                            for (Integer key : randomMix(tree, random))
+                                if (key != searchIterator.next(key))
+                                    if (BTree.find(tree, ICMP, key) == key)
+                                        errors.incrementAndGet();
+                            count.incrementAndGet();
+                        }
+                    }
+                    latch.countDown();
+                }
+            });
+        }
+        while (latch.getCount() > 0)
+        {
+            latch.await(10L, TimeUnit.SECONDS);
+            System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double)
totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
+            assert errors.get() == 0;
+        }
+    }
+
+    private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio,
int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
+    {
+        int batchesPerTest = perTestCount / modificationBatchSize;
+        int maximumRunLength = 100;
+        int testKeyRange = perTestCount * testKeyRatio;
+        int tests = totalCount / perTestCount;
+        System.out.println(String.format("Performing %d tests of %d operations, with %.2f
max size/key-range ratio in batches of ~%d ops",
+                tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
+
+        // if we're not doing quick-equality, we can spam with garbage for all the checks
we perform, so we'll split the work into smaller chunks
+        int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
+        for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
+        {
+            final List<ListenableFutureTask<List<ListenableFuture<?>>>>
outer = new ArrayList<>();
+            for (int i = 0 ; i < chunkSize ; i++)
+            {
+                outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize,
batchesPerTest, quickEquality));
+            }
+
+            final List<ListenableFuture<?>> inner = new ArrayList<>();
+            int complete = 0;
+            int reportInterval = totalCount / 100;
+            int lastReportAt = 0;
+            for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
+            {
+                inner.addAll(f.get());
+                complete += perTestCount;
+                if (complete - lastReportAt >= reportInterval)
+                {
+                    System.out.println(String.format("Completed %d of %d operations", (chunk
* perTestCount) + complete, totalCount));
+                    lastReportAt = complete;
+                }
+            }
+            Futures.allAsList(inner).get();
+        }
+        Snapshot snap = BTREE_TIMER.getSnapshot();
+        System.out.println(String.format("btree   : %.2fns, %.2fns, %.2fns", snap.getMedian(),
snap.get95thPercentile(), snap.get999thPercentile()));
+        snap = TREE_TIMER.getSnapshot();
+        System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(),
snap.get95thPercentile(), snap.get999thPercentile()));
+        System.out.println("Done");
+    }
+
+    private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final
int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations,
final boolean quickEquality)
+    {
+        ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new
Callable<List<ListenableFuture<?>>>()
+        {
+            @Override
+            public List<ListenableFuture<?>> call()
+            {
+                final List<ListenableFuture<?>> r = new ArrayList<>();
+                NavigableMap<Integer, Integer> canon = new TreeMap<>();
+                Object[] btree = BTree.empty();
+                final TreeMap<Integer, Integer> buffer = new TreeMap<>();
+                final Random rnd = new Random();
+                for (int i = 0 ; i < iterations ; i++)
+                {
+                    buffer.clear();
+                    int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
+                    while (mods > 0)
+                    {
+                        int v = rnd.nextInt(upperBound);
+                        int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
+                        int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
+                        for (int j = 0 ; j < c ; j++)
+                        {
+                            buffer.put(v, v);
+                            v++;
+                        }
+                        mods -= c;
+                    }
+                    Timer.Context ctxt;
+                    ctxt = TREE_TIMER.time();
+                    canon.putAll(buffer);
+                    ctxt.stop();
+                    ctxt = BTREE_TIMER.time();
+                    Object[] next = null;
+                    while (next == null)
+                        next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
+                    btree = next;
+                    ctxt.stop();
+
+                    if (!BTree.isWellFormed(btree, ICMP))
+                    {
+                        System.out.println("ERROR: Not well formed");
+                        throw new AssertionError("Not well formed!");
+                    }
+                    if (quickEquality)
+                        testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
+                    else
+                        r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
+                }
+                return r;
+            }
+        });
+        MODIFY.execute(f);
+        return f;
+    }
+
+    @Test
+    public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
+    {
+        Object[] cur = BTree.empty();
+        TreeSet<Integer> canon = new TreeSet<>();
+        // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
+        for (int i = 0 ; i < 128 ; i++)
+        {
+            String id = String.format("[0..%d)", canon.size());
+            System.out.println("Testing " + id);
+            Futures.allAsList(testAllSlices(id, cur, canon)).get();
+            Object[] next = null;
+            while (next == null)
+                next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
+            cur = next;
+            canon.add(i);
+        }
+    }
+
+    static final Comparator<Integer> ICMP = new Comparator<Integer>()
+    {
+        @Override
+        public int compare(Integer o1, Integer o2)
+        {
+            return Integer.compare(o1, o2);
+        }
+    };
+
+    private static List<ListenableFuture<?>> testAllSlices(String id, Object[]
btree, NavigableSet<Integer> canon)
+    {
+        List<ListenableFuture<?>> waitFor = new ArrayList<>();
+        testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
+        testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(),
false, waitFor);
+        return waitFor;
+    }
+
+    private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer>
canon, boolean ascending, List<ListenableFuture<?>> results)
+    {
+        testOneSlice(id, btree, canon, results);
+        for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
+        {
+            // test head/tail sets
+            testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true),
canon.headSet(lb, true), results);
+            testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false),
canon.headSet(lb, false), results);
+            testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true),
canon.tailSet(lb, true), results);
+            testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false),
canon.tailSet(lb, false), results);
+            for (Integer ub : range(canon.size(), lb, ascending))
+            {
+                // test subsets
+                testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb,
true, ub, true), canon.subSet(lb, true, ub, true), results);
+                testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb,
false, ub, true), canon.subSet(lb, false, ub, true), results);
+                testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb,
true, ub, false), canon.subSet(lb, true, ub, false), results);
+                testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb,
false, ub, false), canon.subSet(lb, false, ub, false), results);
+            }
+        }
+    }
+
+    private static void testOneSlice(final String id, final NavigableSet<Integer> test,
final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
+    {
+        ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
+        {
+
+            @Override
+            public void run()
+            {
+                test(id + " Count", test.size(), canon.size());
+                testEqual(id, test.iterator(), canon.iterator());
+                testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
+                testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
+                testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(),
canon.descendingSet().descendingIterator());
+            }
+        }, null);
+        results.add(f);
+        COMPARE.execute(f);
+    }
+
+    private static void test(String id, int test, int expect)
+    {
+        if (test != expect)
+        {
+            System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
+        }
+    }
+
+    private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V>
canon)
+    {
+        boolean equal = true;
+        while (btree.hasNext() && canon.hasNext())
+        {
+            Object i = btree.next();
+            Object j = canon.next();
+            if (!i.equals(j))
+            {
+                System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
+                equal = false;
+            }
+        }
+        while (btree.hasNext())
+        {
+            System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
+            equal = false;
+        }
+        while (canon.hasNext())
+        {
+            System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
+            equal = false;
+        }
+        if (!equal)
+            throw new AssertionError("Not equal");
+    }
+
+    // should only be called on sets that range from 0->N or N->0
+    private static final Iterable<Integer> range(final int size, final int from, final
boolean ascending)
+    {
+        return new Iterable<Integer>()
+        {
+            int cur;
+            int delta;
+            int end;
+            {
+                if (ascending)
+                {
+                    end = size + 1;
+                    cur = from == Integer.MIN_VALUE ? -1 : from;
+                    delta = 1;
+                }
+                else
+                {
+                    end = -2;
+                    cur = from == Integer.MIN_VALUE ? size : from;
+                    delta = -1;
+                }
+            }
+            @Override
+            public Iterator<Integer> iterator()
+            {
+                return new Iterator<Integer>()
+                {
+                    @Override
+                    public boolean hasNext()
+                    {
+                        return cur != end;
+                    }
+
+                    @Override
+                    public Integer next()
+                    {
+                        Integer r = cur;
+                        cur += delta;
+                        return r;
+                    }
+
+                    @Override
+                    public void remove()
+                    {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        };
+    }
+
+    private static Object[] randomTree(int maxSize, Random random)
+    {
+        TreeSet<Integer> build = new TreeSet<>();
+        int size = random.nextInt(maxSize);
+        for (int i = 0 ; i < size ; i++)
+        {
+            build.add(random.nextInt());
+        }
+        return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance());
+    }
+
+    private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd)
+    {
+        final float proportion = rnd.nextFloat();
+        return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>()
+        {
+            public boolean apply(Integer integer)
+            {
+                return rnd.nextFloat() < proportion;
+            }
+        });
+    }
+
+    private static Iterable<Integer> randomMix(Object[] iter, final Random rnd)
+    {
+        final float proportion = rnd.nextFloat();
+        return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer,
Integer>()
+        {
+            long last = Integer.MIN_VALUE;
+
+            public Integer apply(Integer v)
+            {
+                long last = this.last;
+                this.last = v;
+                if (rnd.nextFloat() < proportion)
+                    return v;
+                return (int)((v - last) / 2);
+            }
+        });
+    }
+
+    private static final class RandomAbort<V> implements UpdateFunction<V>
+    {
+        final Random rnd;
+        final float chance;
+        private RandomAbort(Random rnd, float chance)
+        {
+            this.rnd = rnd;
+            this.chance = chance;
+        }
+
+        public V apply(V replacing, V update)
+        {
+            return update;
+        }
+
+        public boolean abortEarly()
+        {
+            return rnd.nextFloat() < chance;
+        }
+
+        public void allocated(long heapSize)
+        {
+
+        }
+
+        public V apply(V v)
+        {
+            return v;
+        }
+    }
+}


Mime
View raw message