cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [2/2] git commit: replace SnapTreeMap/AtomicSortedColumns with BTree/AtomicBTreeColumns patch by Benedict Elliott Smith; reviewed by jbellis for CASSANDRA-6271
Date Sat, 04 Jan 2014 01:11:26 GMT
replace SnapTreeMap/AtomicSortedColumns with BTree/AtomicBTreeColumns
patch by Benedict Elliott Smith; reviewed by jbellis for CASSANDRA-6271


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/412b053c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/412b053c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/412b053c

Branch: refs/heads/trunk
Commit: 412b053ca171495b33dd92eaa692df40d83a35c4
Parents: 9ce5e1a
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Fri Jan 3 14:50:26 2014 -0600
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Fri Jan 3 19:11:20 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra-env.sh                           |   2 +-
 .../apache/cassandra/db/AtomicBTreeColumns.java | 414 +++++++++++++++++
 .../cassandra/db/AtomicSortedColumns.java       | 348 ---------------
 src/java/org/apache/cassandra/db/Memtable.java  |  30 +-
 .../apache/cassandra/db/RowIteratorFactory.java |   2 +-
 .../apache/cassandra/db/filter/ColumnSlice.java |  65 ++-
 .../org/apache/cassandra/utils/btree/BTree.java | 440 +++++++++++++++++++
 .../apache/cassandra/utils/btree/BTreeSet.java  | 365 +++++++++++++++
 .../apache/cassandra/utils/btree/Builder.java   | 101 +++++
 .../apache/cassandra/utils/btree/Cursor.java    | 199 +++++++++
 .../cassandra/utils/btree/NodeBuilder.java      | 330 ++++++++++++++
 .../org/apache/cassandra/utils/btree/Path.java  | 274 ++++++++++++
 .../cassandra/utils/btree/ReplaceFunction.java  |  12 +
 .../apache/cassandra/utils/LongBTreeTest.java   | 335 ++++++++++++++
 15 files changed, 2548 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 79957e5..af2f6e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1
+ * Introduce AtomicBTreeColumns (CASSANDRA-6271)
  * Multithreaded commitlog (CASSANDRA-3578)
  * allocate fixed index summary memory pool and resample cold index summaries 
    to use less memory (CASSANDRA-5519)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/conf/cassandra-env.sh
----------------------------------------------------------------------
diff --git a/conf/cassandra-env.sh b/conf/cassandra-env.sh
index 95acc4e..98fec11 100644
--- a/conf/cassandra-env.sh
+++ b/conf/cassandra-env.sh
@@ -159,7 +159,7 @@ JMX_PORT="7199"
 
 # enable assertions.  disabling this in production will give a modest
 # performance benefit (around 5%).
-JVM_OPTS="$JVM_OPTS -ea"
+#JVM_OPTS="$JVM_OPTS -ea"
 
 # add the jamm javaagent
 if [ "$JVM_VENDOR" != "OpenJDK" -o "$JVM_VERSION" \> "1.6.0" ] \

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
new file mode 100644
index 0000000..8d37356
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import com.google.common.base.*;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.btree.ReplaceFunction;
+
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+
+/**
+ * A thread-safe and atomic ISortedColumns implementation.
+ * Operations (in particular addAll) on this implemenation are atomic and
+ * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
+ * other thread can see the state where only parts but not all columns have
+ * been added.
+ * <p/>
+ * WARNING: removing element through getSortedColumns().iterator() is *not* supported
+ */
+public class AtomicBTreeColumns extends ColumnFamily
+{
+    private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>()
+    {
+        public CellName apply(Cell column)
+        {
+            return column.name;
+        }
+    };
+
+    public static final Factory<AtomicBTreeColumns> factory = new Factory<AtomicBTreeColumns>()
+    {
+        public AtomicBTreeColumns create(CFMetaData metadata, boolean insertReversed)
+        {
+            if (insertReversed)
+                throw new IllegalArgumentException();
+            return new AtomicBTreeColumns(metadata);
+        }
+    };
+
+    private static final DeletionInfo LIVE = DeletionInfo.live();
+    private static final Holder EMPTY = new Holder(BTree.empty(), LIVE);
+
+    private volatile Holder ref;
+
+    private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, "ref");
+
+    private AtomicBTreeColumns(CFMetaData metadata)
+    {
+        this(metadata, EMPTY);
+    }
+
+    private AtomicBTreeColumns(CFMetaData metadata, Holder holder)
+    {
+        super(metadata);
+        this.ref = holder;
+    }
+
+    public CellNameType getComparator()
+    {
+        return metadata.comparator;
+    }
+
+    public Factory getFactory()
+    {
+        return factory;
+    }
+
+    public ColumnFamily cloneMe()
+    {
+        return new AtomicBTreeColumns(metadata, ref);
+    }
+
+    public DeletionInfo deletionInfo()
+    {
+        return ref.deletionInfo;
+    }
+
+    public void delete(DeletionTime delTime)
+    {
+        delete(new DeletionInfo(delTime));
+    }
+
+    protected void delete(RangeTombstone tombstone)
+    {
+        delete(new DeletionInfo(tombstone, getComparator()));
+    }
+
+    public void delete(DeletionInfo info)
+    {
+        if (info.isLive())
+            return;
+
+        // Keeping deletion info for max markedForDeleteAt value
+        while (true)
+        {
+            Holder current = ref;
+            DeletionInfo newDelInfo = current.deletionInfo.copy().add(info);
+            if (refUpdater.compareAndSet(this, current, current.with(newDelInfo)))
+                break;
+        }
+    }
+
+    public void setDeletionInfo(DeletionInfo newInfo)
+    {
+        ref = ref.with(newInfo);
+    }
+
+    public void purgeTombstones(int gcBefore)
+    {
+        while (true)
+        {
+            Holder current = ref;
+            if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
+                break;
+
+            DeletionInfo purgedInfo = current.deletionInfo.copy();
+            purgedInfo.purge(gcBefore);
+            if (refUpdater.compareAndSet(this, current, current.with(purgedInfo)))
+                break;
+        }
+    }
+
+    public void addColumn(Cell column, Allocator allocator)
+    {
+        while (true)
+        {
+            Holder current = ref;
+            Holder update = ref.update(this, current.deletionInfo, metadata.comparator.columnComparator(), Arrays.asList(column), null);
+            if (refUpdater.compareAndSet(this, current, update))
+                return;
+        }
+    }
+
+    public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation)
+    {
+        addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater);
+    }
+
+    // the function we provide to the btree utilities to perform any column replacements
+    private static final class ColumnUpdater implements ReplaceFunction<Cell>
+    {
+        final Allocator allocator;
+        final Function<Cell, Cell> transform;
+        final Updater indexer;
+        long delta;
+
+        private ColumnUpdater(Allocator allocator, Function<Cell, Cell> transform, Updater indexer)
+        {
+            this.allocator = allocator;
+            this.transform = transform;
+            this.indexer = indexer;
+        }
+
+        public Cell apply(Cell replaced, Cell update)
+        {
+            if (replaced == null)
+            {
+                indexer.insert(update);
+                delta += update.dataSize();
+            }
+            else
+            {
+                Cell reconciled = update.reconcile(replaced, allocator);
+                if (reconciled == update)
+                    indexer.update(replaced, reconciled);
+                else
+                    indexer.update(update, reconciled);
+                delta += reconciled.dataSize() - replaced.dataSize();
+            }
+
+            return transform.apply(update);
+        }
+    }
+
+    private static Collection<Cell> transform(Comparator<Cell> cmp, ColumnFamily cf, Function<Cell, Cell> transformation, boolean sort)
+    {
+        Cell[] tmp = new Cell[cf.getColumnCount()];
+
+        int i = 0;
+        for (Cell c : cf)
+            tmp[i++] = transformation.apply(c);
+
+        if (sort)
+            Arrays.sort(tmp, cmp);
+
+        return Arrays.asList(tmp);
+    }
+
+    /**
+     * This is only called by Memtable.resolve, so only AtomicSortedColumns needs to implement it.
+     *
+     * @return the difference in size seen after merging the given columns
+     */
+    public long addAllWithSizeDelta(final ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation, Updater indexer)
+    {
+        boolean transformed = false;
+        Collection<Cell> insert;
+        if (cm instanceof UnsortedColumns)
+        {
+            insert = transform(metadata.comparator.columnComparator(), cm, transformation, true);
+            transformed = true;
+        }
+        else
+            insert = cm.getSortedColumns();
+
+        while (true)
+        {
+            Holder current = ref;
+
+            DeletionInfo deletionInfo = cm.deletionInfo();
+            if (deletionInfo.hasRanges())
+            {
+                for (Iterator<Cell> iter : new Iterator[] { insert.iterator(), BTree.<Cell>slice(current.tree, true) })
+                {
+                    while (iter.hasNext())
+                    {
+                        Cell col = iter.next();
+                        if (deletionInfo.isDeleted(col))
+                            indexer.remove(col);
+                    }
+                }
+            }
+            deletionInfo = current.deletionInfo.copy().add(deletionInfo);
+
+            ColumnUpdater updater = new ColumnUpdater(allocator, transformation, indexer);
+            Holder h = current.update(this, deletionInfo, metadata.comparator.columnComparator(), insert, updater);
+            if (h != null && refUpdater.compareAndSet(this, current, h))
+            {
+                indexer.updateRowLevelIndexes();
+                return updater.delta;
+            }
+
+            if (!transformed)
+            {
+                // After failing once, transform Columns into a new collection to avoid repeatedly allocating Slab space
+                insert = transform(metadata.comparator.columnComparator(), cm, transformation, false);
+                transformed = true;
+            }
+        }
+
+    }
+
+    public boolean replace(Cell oldColumn, Cell newColumn)
+    {
+        if (!oldColumn.name().equals(newColumn.name()))
+            throw new IllegalArgumentException();
+
+        while (true)
+        {
+            Holder current = ref;
+            Holder modified = current.update(this, current.deletionInfo, metadata.comparator.columnComparator(), Arrays.asList(newColumn), null);
+            if (modified == current)
+                return false;
+            if (refUpdater.compareAndSet(this, current, modified))
+                return true;
+        }
+    }
+
+    public void clear()
+    {
+        // no particular reason not to implement this, we just haven't needed it yet
+        throw new UnsupportedOperationException();
+    }
+
+    public Cell getColumn(CellName name)
+    {
+        return (Cell) BTree.find(ref.tree, asymmetricComparator(), name);
+    }
+
+    private Comparator<Object> asymmetricComparator()
+    {
+        final Comparator<? super CellName> cmp = metadata.comparator;
+        return new Comparator<Object>()
+        {
+            public int compare(Object o1, Object o2)
+            {
+                return cmp.compare((CellName) o1, ((Cell) o2).name);
+            }
+        };
+    }
+
+    public Iterable<CellName> getColumnNames()
+    {
+        return collection(false, NAME);
+    }
+
+    public Collection<Cell> getSortedColumns()
+    {
+        return collection(true, Functions.<Cell>identity());
+    }
+
+    public Collection<Cell> getReverseSortedColumns()
+    {
+        return collection(false, Functions.<Cell>identity());
+    }
+
+    private <V> Collection<V> collection(final boolean forwards, final Function<Cell, V> f)
+    {
+        final Holder ref = this.ref;
+        return new AbstractCollection<V>()
+        {
+            public Iterator<V> iterator()
+            {
+                return Iterators.transform(BTree.<Cell>slice(ref.tree, forwards), f);
+            }
+
+            public int size()
+            {
+                return BTree.slice(ref.tree, true).count();
+            }
+        };
+    }
+
+    public int getColumnCount()
+    {
+        return BTree.slice(ref.tree, true).count();
+    }
+
+    public Iterator<Cell> iterator(ColumnSlice[] slices)
+    {
+        return new ColumnSlice.NavigableSetIterator(new BTreeSet<>(ref.tree, getComparator().columnComparator()), slices);
+    }
+
+    public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
+    {
+        return new ColumnSlice.NavigableSetIterator(new BTreeSet<>(ref.tree, getComparator().columnComparator()).descendingSet(), slices);
+    }
+
+    public boolean isInsertReversed()
+    {
+        return false;
+    }
+
+    private static class Holder
+    {
+        // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
+        // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
+        final DeletionInfo deletionInfo;
+        // the btree of columns
+        final Object[] tree;
+
+        Holder(Object[] tree, DeletionInfo deletionInfo)
+        {
+            this.tree = tree;
+            this.deletionInfo = deletionInfo;
+        }
+
+        Holder with(DeletionInfo info)
+        {
+            return new Holder(this.tree, info);
+        }
+
+        Holder update(AtomicBTreeColumns container, DeletionInfo deletionInfo, Comparator<Cell> cmp, Collection<Cell> update, ReplaceFunction<Cell> replaceF)
+        {
+            Object[] r = BTree.update(tree, cmp, update, true, replaceF, new TerminateEarly(container, this));
+            // result can be null if terminate early kicks in, in which case we need to propagate the early failure so we can retry
+            if (r == null)
+                return null;
+            return new Holder(r, deletionInfo);
+        }
+    }
+
+    // a function provided to the btree functions that aborts the modification
+    // if we already know the final cas will fail
+    private static final class TerminateEarly implements Function<Object, Boolean>
+    {
+        final AtomicBTreeColumns columns;
+        final Holder ref;
+
+        private TerminateEarly(AtomicBTreeColumns columns, Holder ref)
+        {
+            this.columns = columns;
+            this.ref = ref;
+        }
+
+        public Boolean apply(Object o)
+        {
+            if (ref != columns.ref)
+                return Boolean.TRUE;
+            return Boolean.FALSE;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
deleted file mode 100644
index b1f1e59..0000000
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
-import edu.stanford.ppl.concurrent.SnapTreeMap;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.utils.Allocator;
-
-/**
- * A thread-safe and atomic ISortedColumns implementation.
- * Operations (in particular addAll) on this implemenation are atomic and
- * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
- * other thread can see the state where only parts but not all columns have
- * been added.
- *
- * The implementation uses snaptree (https://github.com/nbronson/snaptree),
- * and in particular it's copy-on-write clone operation to achieve its
- * atomicity guarantee.
- *
- * WARNING: removing element through getSortedColumns().iterator() is *not*
- * isolated of other operations and could actually be fully ignored in the
- * face of a concurrent. Don't use it unless in a non-concurrent context.
- */
-public class AtomicSortedColumns extends ColumnFamily
-{
-    private volatile Holder ref;
-    private static final AtomicReferenceFieldUpdater<AtomicSortedColumns, Holder> refUpdater
-            = AtomicReferenceFieldUpdater.newUpdater(AtomicSortedColumns.class, Holder.class, "ref");
-
-    public static final ColumnFamily.Factory<AtomicSortedColumns> factory = new Factory<AtomicSortedColumns>()
-    {
-        public AtomicSortedColumns create(CFMetaData metadata, boolean insertReversed)
-        {
-            return new AtomicSortedColumns(metadata);
-        }
-    };
-
-    private AtomicSortedColumns(CFMetaData metadata)
-    {
-        this(metadata, new Holder(metadata.comparator));
-    }
-
-    private AtomicSortedColumns(CFMetaData metadata, Holder holder)
-    {
-        super(metadata);
-        this.ref = holder;
-    }
-
-    public CellNameType getComparator()
-    {
-        return (CellNameType)ref.map.comparator();
-    }
-
-    public ColumnFamily.Factory getFactory()
-    {
-        return factory;
-    }
-
-    public ColumnFamily cloneMe()
-    {
-        return new AtomicSortedColumns(metadata, ref.cloneMe());
-    }
-
-    public DeletionInfo deletionInfo()
-    {
-        return ref.deletionInfo;
-    }
-
-    public void delete(DeletionTime delTime)
-    {
-        delete(new DeletionInfo(delTime));
-    }
-
-    protected void delete(RangeTombstone tombstone)
-    {
-        delete(new DeletionInfo(tombstone, getComparator()));
-    }
-
-    public void delete(DeletionInfo info)
-    {
-        if (info.isLive())
-            return;
-
-        // Keeping deletion info for max markedForDeleteAt value
-        while (true)
-        {
-            Holder current = ref;
-            DeletionInfo newDelInfo = current.deletionInfo.copy().add(info);
-            if (refUpdater.compareAndSet(this, current, current.with(newDelInfo)))
-                break;
-        }
-    }
-
-    public void setDeletionInfo(DeletionInfo newInfo)
-    {
-        ref = ref.with(newInfo);
-    }
-
-    public void purgeTombstones(int gcBefore)
-    {
-        while (true)
-        {
-            Holder current = ref;
-            if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
-                break;
-
-            DeletionInfo purgedInfo = current.deletionInfo.copy();
-            purgedInfo.purge(gcBefore);
-            if (refUpdater.compareAndSet(this, current, current.with(purgedInfo)))
-                break;
-        }
-    }
-
-    public void addColumn(Cell cell, Allocator allocator)
-    {
-        Holder current, modified;
-        do
-        {
-            current = ref;
-            modified = current.cloneMe();
-            modified.addColumn(cell, allocator, SecondaryIndexManager.nullUpdater);
-        }
-        while (!refUpdater.compareAndSet(this, current, modified));
-    }
-
-    public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation)
-    {
-        addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater);
-    }
-
-    /**
-     *  This is only called by Memtable.resolve, so only AtomicSortedColumns needs to implement it.
-     *
-     *  @return the difference in size seen after merging the given columns
-     */
-    public long addAllWithSizeDelta(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation, SecondaryIndexManager.Updater indexer)
-    {
-        /*
-         * This operation needs to atomicity and isolation. To that end, we
-         * add the new column to a copy of the map (a cheap O(1) snapTree
-         * clone) and atomically compare and swap when everything has been
-         * added. Of course, we must not forget to update the deletion times
-         * too.
-         * In case we are adding a lot of columns, failing the final compare
-         * and swap could be expensive. To mitigate, we check we haven't been
-         * beaten by another thread after every column addition. If we have,
-         * we bail early, avoiding unnecessary work if possible.
-         */
-        Holder current, modified;
-        long sizeDelta;
-
-        main_loop:
-        do
-        {
-            sizeDelta = 0;
-            current = ref;
-            DeletionInfo newDelInfo = current.deletionInfo.copy().add(cm.deletionInfo());
-            modified = new Holder(current.map.clone(), newDelInfo);
-
-            if (cm.deletionInfo().hasRanges())
-            {
-                for (Cell currentCell : Iterables.concat(current.map.values(), cm))
-                {
-                    if (cm.deletionInfo().isDeleted(currentCell))
-                        indexer.remove(currentCell);
-                }
-            }
-
-            for (Cell cell : cm)
-            {
-                sizeDelta += modified.addColumn(transformation.apply(cell), allocator, indexer);
-                // bail early if we know we've been beaten
-                if (ref != current)
-                    continue main_loop;
-            }
-        }
-        while (!refUpdater.compareAndSet(this, current, modified));
-
-        indexer.updateRowLevelIndexes();
-
-        return sizeDelta;
-    }
-
-    public boolean replace(Cell oldCell, Cell newCell)
-    {
-        if (!oldCell.name().equals(newCell.name()))
-            throw new IllegalArgumentException();
-
-        Holder current, modified;
-        boolean replaced;
-        do
-        {
-            current = ref;
-            modified = current.cloneMe();
-            replaced = modified.map.replace(oldCell.name(), oldCell, newCell);
-        }
-        while (!refUpdater.compareAndSet(this, current, modified));
-        return replaced;
-    }
-
-    public void clear()
-    {
-        Holder current, modified;
-        do
-        {
-            current = ref;
-            modified = current.clear();
-        }
-        while (!refUpdater.compareAndSet(this, current, modified));
-    }
-
-    public Cell getColumn(CellName name)
-    {
-        return ref.map.get(name);
-    }
-
-    public SortedSet<CellName> getColumnNames()
-    {
-        return ref.map.keySet();
-    }
-
-    public Collection<Cell> getSortedColumns()
-    {
-        return ref.map.values();
-    }
-
-    public Collection<Cell> getReverseSortedColumns()
-    {
-        return ref.map.descendingMap().values();
-    }
-
-    public int getColumnCount()
-    {
-        return ref.map.size();
-    }
-
-    public Iterator<Cell> iterator(ColumnSlice[] slices)
-    {
-        return new ColumnSlice.NavigableMapIterator(ref.map, slices);
-    }
-
-    public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
-    {
-        return new ColumnSlice.NavigableMapIterator(ref.map.descendingMap(), slices);
-    }
-
-    public boolean isInsertReversed()
-    {
-        return false;
-    }
-
-    private static class Holder
-    {
-        // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
-        // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
-        private static final DeletionInfo LIVE = DeletionInfo.live();
-
-        final SnapTreeMap<CellName, Cell> map;
-        final DeletionInfo deletionInfo;
-
-        Holder(CellNameType comparator)
-        {
-            this(new SnapTreeMap<CellName, Cell>(comparator), LIVE);
-        }
-
-        Holder(SnapTreeMap<CellName, Cell> map, DeletionInfo deletionInfo)
-        {
-            this.map = map;
-            this.deletionInfo = deletionInfo;
-        }
-
-        Holder cloneMe()
-        {
-            return with(map.clone());
-        }
-
-        Holder with(DeletionInfo info)
-        {
-            return new Holder(map, info);
-        }
-
-        Holder with(SnapTreeMap<CellName, Cell> newMap)
-        {
-            return new Holder(newMap, deletionInfo);
-        }
-
-        // There is no point in cloning the underlying map to clear it
-        // afterwards.
-        Holder clear()
-        {
-            return new Holder(new SnapTreeMap<CellName, Cell>(map.comparator()), LIVE);
-        }
-
-        long addColumn(Cell cell, Allocator allocator, SecondaryIndexManager.Updater indexer)
-        {
-            CellName name = cell.name();
-            while (true)
-            {
-                Cell oldCell = map.putIfAbsent(name, cell);
-                if (oldCell == null)
-                {
-                    indexer.insert(cell);
-                    return cell.dataSize();
-                }
-
-                Cell reconciledCell = cell.reconcile(oldCell, allocator);
-                if (map.replace(name, oldCell, reconciledCell))
-                {
-                    // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
-                    // we need to make sure we update indexes no matter the order we merge
-                    if (reconciledCell == cell)
-                        indexer.update(oldCell, reconciledCell);
-                    else
-                        indexer.update(cell, reconciledCell);
-                    return reconciledCell.dataSize() - oldCell.dataSize();
-                }
-                // We failed to replace cell due to a concurrent update or a concurrent removal. Keep trying.
-                // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 11f11f8..3761826 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -93,7 +93,7 @@ public class Memtable
     // We index the memtable by RowPosition only for the purpose of being able
     // to select key range using Token.KeyBound. However put() ensures that we
     // actually only store DecoratedKey.
-    private final ConcurrentNavigableMap<RowPosition, AtomicSortedColumns> rows = new ConcurrentSkipListMap<RowPosition, AtomicSortedColumns>();
+    private final ConcurrentNavigableMap<RowPosition, AtomicBTreeColumns> rows = new ConcurrentSkipListMap<>();
     public final ColumnFamilyStore cfs;
     private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
@@ -180,11 +180,11 @@ public class Memtable
 
     private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
     {
-        AtomicSortedColumns previous = rows.get(key);
+        AtomicBTreeColumns previous = rows.get(key);
 
         if (previous == null)
         {
-            AtomicSortedColumns empty = cf.cloneMeShallow(AtomicSortedColumns.factory, false);
+            AtomicBTreeColumns empty = cf.cloneMeShallow(AtomicBTreeColumns.factory, false);
             // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
             previous = rows.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
             if (previous == null)
@@ -203,7 +203,7 @@ public class Memtable
     {
         StringBuilder builder = new StringBuilder();
         builder.append("{");
-        for (Map.Entry<RowPosition, AtomicSortedColumns> entry : rows.entrySet())
+        for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
         {
             builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
         }
@@ -226,29 +226,29 @@ public class Memtable
      * @param startWith Include data in the result from and including this key and to the end of the memtable
      * @return An iterator of entries with the data from the start key
      */
-    public Iterator<Map.Entry<DecoratedKey, AtomicSortedColumns>> getEntryIterator(final RowPosition startWith, final RowPosition stopAt)
+    public Iterator<Map.Entry<DecoratedKey, AtomicBTreeColumns>> getEntryIterator(final RowPosition startWith, final RowPosition stopAt)
     {
-        return new Iterator<Map.Entry<DecoratedKey, AtomicSortedColumns>>()
+        return new Iterator<Map.Entry<DecoratedKey, AtomicBTreeColumns>>()
         {
-            private Iterator<Map.Entry<RowPosition, AtomicSortedColumns>> iter = stopAt.isMinimum(cfs.partitioner)
-                                                                               ? rows.tailMap(startWith).entrySet().iterator()
-                                                                               : rows.subMap(startWith, true, stopAt, true).entrySet().iterator();
-            private Map.Entry<RowPosition, AtomicSortedColumns> currentEntry;
+            private Iterator<? extends Map.Entry<RowPosition, AtomicBTreeColumns>> iter = stopAt.isMinimum(cfs.partitioner)
+                                                                                        ? rows.tailMap(startWith).entrySet().iterator()
+                                                                                        : rows.subMap(startWith, true, stopAt, true).entrySet().iterator();
+            private Map.Entry<RowPosition, AtomicBTreeColumns> currentEntry;
 
             public boolean hasNext()
             {
                 return iter.hasNext();
             }
 
-            public Map.Entry<DecoratedKey, AtomicSortedColumns> next()
+            public Map.Entry<DecoratedKey, AtomicBTreeColumns> next()
             {
-                Map.Entry<RowPosition, AtomicSortedColumns> entry = iter.next();
+                Map.Entry<RowPosition, AtomicBTreeColumns> entry = iter.next();
                 // Store the reference to the current entry so that remove() can update the current size.
                 currentEntry = entry;
                 // Actual stored key should be true DecoratedKey
                 assert entry.getKey() instanceof DecoratedKey;
                 // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey
-                return (Map.Entry<DecoratedKey, AtomicSortedColumns>) (Object)entry;
+                return (Map.Entry<DecoratedKey, AtomicBTreeColumns>) (Object)entry;
             }
 
             public void remove()
@@ -339,7 +339,7 @@ public class Memtable
             {
                 // (we can't clear out the map as-we-go to free up memory,
                 //  since the memtable is being used for queries in the "pending flush" category)
-                for (Map.Entry<RowPosition, AtomicSortedColumns> entry : rows.entrySet())
+                for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
                 {
                     ColumnFamily cf = entry.getValue();
                     if (cf.isMarkedForDelete())
@@ -413,7 +413,7 @@ public class Memtable
                 // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
                 long deepSize = memtable.meter.measure(memtable.rows);
                 int objects = 0;
-                for (Map.Entry<RowPosition, AtomicSortedColumns> entry : memtable.rows.entrySet())
+                for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : memtable.rows.entrySet())
                 {
                     deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
                     objects += entry.getValue().getColumnCount();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index c02da1d..93698a0 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -61,7 +61,7 @@ public class RowIteratorFactory
         // memtables
         for (Memtable memtable : memtables)
         {
-            iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(range, memtable.getEntryIterator(range.startKey(), range.stopKey())));
+            iterators.add(new ConvertToColumnIterator<>(range, memtable.getEntryIterator(range.startKey(), range.stopKey())));
         }
 
         for (SSTableReader sstable : sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 4fd53bd..0e096fe 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
+import java.util.NavigableSet;
 
 import com.google.common.collect.AbstractIterator;
 
@@ -165,15 +166,15 @@ public class ColumnSlice
                     if (slice.finish.isEmpty())
                         currentSlice = map.values().iterator();
                     else
-                        currentSlice = map.headMap(new FakeCell(slice.finish), true).values().iterator();
+                        currentSlice = map.headMap(new FakeCellName(slice.finish), true).values().iterator();
                 }
                 else if (slice.finish.isEmpty())
                 {
-                    currentSlice = map.tailMap(new FakeCell(slice.start), true).values().iterator();
+                    currentSlice = map.tailMap(new FakeCellName(slice.start), true).values().iterator();
                 }
                 else
                 {
-                    currentSlice = map.subMap(new FakeCell(slice.start), true, new FakeCell(slice.finish), true).values().iterator();
+                    currentSlice = map.subMap(new FakeCellName(slice.start), true, new FakeCellName(slice.finish), true).values().iterator();
                 }
             }
 
@@ -185,6 +186,60 @@ public class ColumnSlice
         }
     }
 
+    public static class NavigableSetIterator extends AbstractIterator<Cell>
+    {
+        private final NavigableSet<Cell> set;
+        private final ColumnSlice[] slices;
+
+        private int idx = 0;
+        private Iterator<Cell> currentSlice;
+
+        public NavigableSetIterator(NavigableSet<Cell> set, ColumnSlice[] slices)
+        {
+            this.set = set;
+            this.slices = slices;
+        }
+
+        protected Cell computeNext()
+        {
+            if (currentSlice == null)
+            {
+                if (idx >= slices.length)
+                    return endOfData();
+
+                ColumnSlice slice = slices[idx++];
+                // We specialize the case of start == "" and finish = "" because it is slightly more efficient,
+                // but also they have a specific meaning (namely, they always extend to the beginning/end of the range).
+                if (slice.start.isEmpty())
+                {
+                    if (slice.finish.isEmpty())
+                        currentSlice = set.iterator();
+                    else
+                        currentSlice = set.headSet(fakeCell(slice.finish), true).iterator();
+                }
+                else if (slice.finish.isEmpty())
+                {
+                    currentSlice = set.tailSet(fakeCell(slice.start), true).iterator();
+                }
+                else
+                {
+                    currentSlice = set.subSet(fakeCell(slice.start), true, fakeCell(slice.finish), true).iterator();
+                }
+            }
+
+            if (currentSlice.hasNext())
+                return currentSlice.next();
+
+            currentSlice = null;
+            return computeNext();
+        }
+    }
+
+    private static Cell fakeCell(Composite name)
+    {
+        return new Cell(new FakeCellName(name), ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    }
+
     /*
     * We need to take a slice (headMap/tailMap/subMap) of a CellName map
     * based on a Composite. While CellName and Composite are comparable
@@ -194,11 +249,11 @@ public class ColumnSlice
     * CellNameType, but this doesn't matter here (since we only care about
     * comparison). This is arguably a bit of a hack.
     */
-    private static class FakeCell extends AbstractComposite implements CellName
+    private static class FakeCellName extends AbstractComposite implements CellName
     {
         private final Composite prefix;
 
-        private FakeCell(Composite prefix)
+        private FakeCellName(Composite prefix)
         {
             this.prefix = prefix;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
new file mode 100644
index 0000000..44f75b1
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -0,0 +1,440 @@
+package org.apache.cassandra.utils.btree;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+
+import com.google.common.base.Function;
+
+public class BTree
+{
+    /**
+     * Leaf Nodes are a raw array of values: Object[V1, V1, ...,].
+     *
+     * Branch Nodes: Object[V1, V2, ..., child[&lt;V1.key], child[&lt;V2.key], ..., child[&lt; Inf]], where
+     * each child is another node, i.e., an Object[].  Thus, the value elements in a branch node are the
+     * first half of the array, rounding down.  In our implementation, each value must include its own key;
+     * we access these via Comparator, rather than directly. 
+     *
+     * So we can quickly distinguish between leaves and branches, we require that leaf nodes are always even number
+     * of elements (padded with a null, if necessary), and branches are always an odd number of elements.
+     *
+     * BTrees are immutable; updating one returns a new tree that reuses unmodified nodes.
+     *
+     * There are no references back to a parent node from its children.  (This would make it impossible to re-use
+     * subtrees when modifying the tree, since the modified tree would need new parent references.)
+     * Instead, we store these references in a Path as needed when navigating the tree.
+     */
+
+    // The maximum fan factor used for B-Trees
+    static final int FAN_SHIFT;
+    static
+    {
+        int fanfactor = 32;
+        if (System.getProperty("cassandra.btree.fanfactor") != null)
+            fanfactor = Integer.parseInt(System.getProperty("cassandra.btree.fanfactor"));
+        int shift = 1;
+        while (1 << shift < fanfactor)
+            shift += 1;
+        FAN_SHIFT = shift;
+    }
+    // NB we encode Path indexes as Bytes, so this needs to be less than Byte.MAX_VALUE / 2
+    static final int FAN_FACTOR = 1 << FAN_SHIFT;
+    static final int QUICK_MERGE_LIMIT = Math.min(FAN_FACTOR, 16) * 2;
+
+    // Maximum depth of any B-Tree. In reality this is just an arbitrary limit, and is currently imposed on iterators only,
+    // but a maximum depth sufficient to store at worst Integer.MAX_VALUE items seems reasonable
+    // 2^n = (2^k).(2^(n/k)) => 2^31 <= 2^(FAN_SHIFT-1) . 2^ceil(31 / (FAN_SHIFT - 1))
+    static final int MAX_DEPTH = (int) Math.ceil(31d / (FAN_SHIFT - 1));
+
+    // An empty BTree Leaf - which is the same as an empty BTree
+    static final Object[] EMPTY_LEAF = new Object[0];
+
+    // An empty BTree branch - used only for internal purposes in Modifier
+    static final Object[] EMPTY_BRANCH = new Object[1];
+
+    /**
+     * Returns an empty BTree
+     *
+     * @return
+     */
+    public static Object[] empty()
+    {
+        return EMPTY_LEAF;
+    }
+
+    /**
+     * Creates a BTree containing all of the objects in the provided collection
+     *
+     * @param source     the items to build the tree with
+     * @param comparator the comparator that defines the ordering over the items in the tree
+     * @param sorted     if false, the collection will be copied and sorted to facilitate construction
+     * @param <V>
+     * @return
+     */
+    public static <V> Object[] build(Collection<V> source, Comparator<V> comparator, boolean sorted)
+    {
+        int size = source.size();
+
+        if (size < FAN_FACTOR)
+        {
+            // pad to even length to match contract that all leaf nodes are even
+            V[] values = source.toArray((V[]) new Object[size + (size & 1)]);
+            // inline sorting since we're already calling toArray
+            if (!sorted)
+                Arrays.sort(values, 0, size, comparator);
+            return values;
+        }
+
+        if (!sorted)
+            source = sorted(source, comparator, size);
+
+        return modifier.get().build(source, size);
+    }
+
+    /**
+     * Returns a new BTree with the provided set inserting/replacing as necessary any equal items
+     *
+     * @param btree              the tree to update
+     * @param comparator         the comparator that defines the ordering over the items in the tree
+     * @param updateWith         the items to either insert / update
+     * @param updateWithIsSorted if false, updateWith will be copied and sorted to facilitate construction
+     * @param <V>
+     * @return
+     */
+    public static <V> Object[] update(Object[] btree, Comparator<V> comparator, Collection<V> updateWith, boolean updateWithIsSorted)
+    {
+        return update(btree, comparator, updateWith, updateWithIsSorted, null, null);
+    }
+
+    /**
+     * Returns a new BTree with the provided set inserting/replacing as necessary any equal items
+     *
+     * @param btree              the tree to update
+     * @param comparator         the comparator that defines the ordering over the items in the tree
+     * @param updateWith         the items to either insert / update
+     * @param updateWithIsSorted if false, updateWith will be copied and sorted to facilitate construction
+     * @param replaceF           a function to apply to a pair we are swapping
+     * @param terminateEarly     a function that returns Boolean.TRUE if we should terminate before finishing our work.
+     *                           the argument to terminateEarly is ignored.
+     * @param <V>
+     * @return
+     */
+    public static <V> Object[] update(Object[] btree,
+                                      Comparator<V> comparator,
+                                      Collection<V> updateWith,
+                                      boolean updateWithIsSorted,
+                                      ReplaceFunction<V> replaceF,
+                                      Function<?, Boolean> terminateEarly)
+    {
+        if (btree.length == 0)
+            return build(updateWith, comparator, updateWithIsSorted);
+
+        if (!updateWithIsSorted)
+            updateWith = sorted(updateWith, comparator, updateWith.size());
+
+        // if the b-tree is just a single root node, we can try a quick in-place merge
+        if (isLeaf(btree) && btree.length + updateWith.size() < QUICK_MERGE_LIMIT)
+        {
+            // since updateWith is sorted, we can skip elements from earlier iterations tracked by this offset
+            int btreeOffset = 0;
+            int keyEnd = getLeafKeyEnd(btree);
+            Object[] merged = new Object[QUICK_MERGE_LIMIT];
+            int mergedCount = 0;
+            for (V v : updateWith)
+            {
+                // find the index i where v would belong in the original btree
+                int i = find(comparator, v, btree, btreeOffset, keyEnd);
+                boolean found = i >= 0;
+                if (!found)
+                    i = -i - 1;
+
+                // copy original elements up to i into the merged array
+                int count = i - btreeOffset;
+                if (count > 0)
+                {
+                    System.arraycopy(btree, btreeOffset, merged, mergedCount, count);
+                    mergedCount += count;
+                    btreeOffset = i;
+                }
+
+                if (found)
+                {
+                    // apply replaceF if it matches an existing element
+                    btreeOffset++;
+                    if (replaceF != null)
+                        v = replaceF.apply((V) btree[i], v);
+                }
+                else if (replaceF != null)
+                {
+                    // new element but still need to apply replaceF to handle indexing and size-tracking
+                    v = replaceF.apply(null, v);
+                }
+
+                merged[mergedCount++] = v;
+            }
+
+            // copy any remaining original elements
+            if (btreeOffset < keyEnd)
+            {
+                int count = keyEnd - btreeOffset;
+                System.arraycopy(btree, btreeOffset, merged, mergedCount, count);
+                mergedCount += count;
+            }
+
+            if (mergedCount > FAN_FACTOR)
+            {
+                // TODO this code will never execute since QUICK_MERGE_LIMIT == FAN_FACTOR
+                int mid = (mergedCount >> 1) & ~1; // divide by two, rounding down to an even number
+                return new Object[] { merged[mid],
+                                      Arrays.copyOfRange(merged, 0, mid),
+                                      Arrays.copyOfRange(merged, 1 + mid, mergedCount + ((mergedCount + 1) & 1)), };
+            }
+
+            return Arrays.copyOfRange(merged, 0, mergedCount + (mergedCount & 1));
+        }
+
+        return modifier.get().update(btree, comparator, updateWith, replaceF, terminateEarly);
+    }
+
+    /**
+     * Returns an Iterator over the entire tree
+     *
+     * @param btree    the tree to iterate over
+     * @param forwards if false, the iterator will start at the end and move backwards
+     * @param <V>
+     * @return
+     */
+    public static <V> Cursor<V> slice(Object[] btree, boolean forwards)
+    {
+        Cursor<V> r = Cursor.newCursor();
+        r.reset(btree, forwards);
+        return r;
+    }
+
+    /**
+     * Returns an Iterator over a sub-range of the tree
+     *
+     * @param btree      the tree to iterate over
+     * @param comparator the comparator that defines the ordering over the items in the tree
+     * @param start      the first item to include
+     * @param end        the last item to include
+     * @param forwards   if false, the iterator will start at end and move backwards
+     * @param <V>
+     * @return
+     */
+    public static <V> Cursor<V> slice(Object[] btree, Comparator<V> comparator, V start, V end, boolean forwards)
+    {
+        Cursor<V> r = Cursor.newCursor();
+        r.reset(btree, comparator, start, end, forwards);
+        return r;
+    }
+
+    /**
+     * Returns an Iterator over a sub-range of the tree
+     *
+     * @param btree      the tree to iterate over
+     * @param comparator the comparator that defines the ordering over the items in the tree
+     * @param start      the first item to include
+     * @param end        the last item to include
+     * @param forwards   if false, the iterator will start at end and move backwards
+     * @param <V>
+     * @return
+     */
+    public static <V> Cursor<V> slice(Object[] btree, Comparator<V> comparator, V start, boolean startInclusive, V end, boolean endInclusive, boolean forwards)
+    {
+        Cursor<V> r = Cursor.newCursor();
+        r.reset(btree, comparator, start, startInclusive, end, endInclusive, forwards);
+        return r;
+    }
+
+    public static <V> V find(Object[] node, Comparator<V> comparator, V find)
+    {
+        while (true)
+        {
+            int keyEnd = getKeyEnd(node);
+            int i = BTree.find(comparator, find, node, 0, keyEnd);
+            if (i >= 0)
+            {
+                return (V) node[i];
+            }
+            else if (!isLeaf(node))
+            {
+                i = -i - 1;
+                node = (Object[]) node[keyEnd + i];
+            }
+            else
+            {
+                return null;
+            }
+        }
+    }
+
+
+    // UTILITY METHODS
+
+    // same basic semantics as Arrays.binarySearch, but delegates to compare() method to avoid
+    // wrapping generic Comparator with support for Special +/- infinity sentinels
+    static <V> int find(Comparator<V> comparator, Object key, Object[] a, final int fromIndex, final int toIndex)
+    {
+        // attempt to terminate quickly by checking the first element,
+        // as many uses of this class will (probably) be updating identical sets
+        if (fromIndex >= toIndex)
+            return -(fromIndex + 1);
+
+        int c = compare(comparator, key, a[fromIndex]);
+        if (c <= 0)
+        {
+            if (c == 0)
+                return fromIndex;
+            else
+                return -(fromIndex + 1);
+        }
+
+        int low = fromIndex + 1;
+        int high = toIndex - 1;
+
+        while (low <= high)
+        {
+            int mid = (low + high) / 2;
+            int cmp = compare(comparator, key, a[mid]);
+
+            if (cmp > 0)
+                low = mid + 1;
+            else if (cmp < 0)
+                high = mid - 1;
+            else
+                return mid; // key found
+        }
+        return -(low + 1);  // key not found.
+    }
+
+    // get the upper bound we should search in for keys in the node
+    static int getKeyEnd(Object[] node)
+    {
+        if (isLeaf(node))
+            return getLeafKeyEnd(node);
+        else
+            return getBranchKeyEnd(node);
+    }
+
+    // get the last index that is non-null in the leaf node
+    static int getLeafKeyEnd(Object[] node)
+    {
+        int len = node.length;
+        if (len == 0)
+            return 0;
+        else if (node[len - 1] == null)
+            return len - 1;
+        else
+            return len;
+    }
+
+    // return the boundary position between keys/children for the branch node
+    static int getBranchKeyEnd(Object[] node)
+    {
+        return node.length / 2;
+    }
+
+    // returns true if the provided node is a leaf, false if it is a branch
+    static boolean isLeaf(Object[] node)
+    {
+        return (node.length & 1) == 0;
+    }
+
+    // Special class for making certain operations easier, so we can define a +/- Inf
+    private static interface Special extends Comparable<Object> { }
+    static final Special POSITIVE_INFINITY = new Special()
+    {
+        public int compareTo(Object o)
+        {
+            return o == this ? 0 : 1;
+        }
+    };
+    static final Special NEGATIVE_INFINITY = new Special()
+    {
+        public int compareTo(Object o)
+        {
+            return o == this ? 0 : -1;
+        }
+    };
+
+    private static final ThreadLocal<Builder> modifier = new ThreadLocal<Builder>()
+    {
+        @Override
+        protected Builder initialValue()
+        {
+            return new Builder();
+        }
+    };
+
+    // return a sorted collection
+    private static <V> Collection<V> sorted(Collection<V> collection, Comparator<V> comparator, int size)
+    {
+        V[] vs = collection.toArray((V[]) new Object[size]);
+        Arrays.sort(vs, comparator);
+        return Arrays.asList(vs);
+    }
+
+    /** simple static wrapper to calls to cmp.compare() which checks if either a or b are Special (i.e. represent an infinity) */
+    // TODO : cheaper to check for POSITIVE/NEGATIVE infinity in callers, rather than here
+    static <V> int compare(Comparator<V> cmp, Object a, Object b)
+    {
+        if (a instanceof Special)
+            return ((Special) a).compareTo(b);
+        if (b instanceof Special)
+            return -((Special) b).compareTo(a);
+        return cmp.compare((V) a, (V) b);
+    }
+
+    public static boolean isWellFormed(Object[] btree)
+    {
+        return isWellFormed(null, btree, true, NEGATIVE_INFINITY, POSITIVE_INFINITY);
+    }
+
+    public static boolean isWellFormed(Object[] btree, Comparator<? extends Object> cmp)
+    {
+        return isWellFormed(cmp, btree, true, NEGATIVE_INFINITY, POSITIVE_INFINITY);
+    }
+
+    private static boolean isWellFormed(Comparator<?> cmp, Object[] node, boolean isRoot, Object min, Object max)
+    {
+        if (cmp != null && !isNodeWellFormed(cmp, node, min, max))
+            return false;
+
+        if (isLeaf(node))
+        {
+            if (isRoot)
+                return node.length <= FAN_FACTOR;
+            return node.length >= FAN_FACTOR / 2 && node.length <= FAN_FACTOR;
+        }
+
+        int type = 0;
+        int childOffset = getBranchKeyEnd(node);
+        // compare each child node with the branch element at the head of this node it corresponds with
+        for (int i = childOffset; i < node.length; i++)
+        {
+            Object[] child = (Object[]) node[i];
+            Object localmax = i < node.length - 1 ? node[i - childOffset] : max;
+            if (!isWellFormed(cmp, child, false, min, localmax))
+                return false;
+            type |= isLeaf(child) ? 1 : 2;
+            min = localmax;
+        }
+        return type < 3; // either all leaves or all branches but not a mix
+    }
+
+    private static boolean isNodeWellFormed(Comparator<?> cmp, Object[] node, Object min, Object max)
+    {
+        Object previous = min;
+        int end = getKeyEnd(node);
+        for (int i = 0; i < end; i++)
+        {
+            Object current = node[i];
+            if (compare(cmp, previous, current) >= 0)
+                return false;
+            previous = current;
+        }
+        return compare(cmp, previous, max) < 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
new file mode 100644
index 0000000..1730cfc
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
@@ -0,0 +1,365 @@
+package org.apache.cassandra.utils.btree;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+
+public class BTreeSet<V> implements NavigableSet<V>
+{
+    protected final Comparator<V> comparator;
+    protected final Object[] tree;
+
+    public BTreeSet(Object[] tree, Comparator<V> comparator)
+    {
+        this.tree = tree;
+        this.comparator = comparator;
+    }
+
+    public BTreeSet<V> update(Collection<V> updateWith, boolean isSorted)
+    {
+        return new BTreeSet<>(BTree.update(tree, comparator, updateWith, isSorted, null, null), comparator);
+    }
+
+    @Override
+    public Comparator<? super V> comparator()
+    {
+        return comparator;
+    }
+
+    protected Cursor<V> slice(boolean forwards, boolean permitInversion)
+    {
+        return BTree.<V>slice(tree, forwards);
+    }
+
+    @Override
+    public int size()
+    {
+        return slice(true, false).count();
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return slice(true, false).hasNext();
+    }
+
+    @Override
+    public Iterator<V> iterator()
+    {
+        return slice(true, true);
+    }
+
+    @Override
+    public Iterator<V> descendingIterator()
+    {
+        return slice(false, true);
+    }
+
+    @Override
+    public Object[] toArray()
+    {
+        return toArray(new Object[0]);
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a)
+    {
+        int size = size();
+        if (a.length < size)
+            a = Arrays.copyOf(a, size);
+        int i = 0;
+        for (V v : this)
+            a[i++] = (T) v;
+        return a;
+    }
+
+    @Override
+    public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive)
+    {
+        return new BTreeRange<>(tree, comparator, fromElement, fromInclusive, toElement, toInclusive);
+    }
+
+    @Override
+    public NavigableSet<V> headSet(V toElement, boolean inclusive)
+    {
+        return new BTreeRange<>(tree, comparator, null, true, toElement, inclusive);
+    }
+
+    @Override
+    public NavigableSet<V> tailSet(V fromElement, boolean inclusive)
+    {
+        return new BTreeRange<>(tree, comparator, fromElement, inclusive, null, true);
+    }
+
+    @Override
+    public SortedSet<V> subSet(V fromElement, V toElement)
+    {
+        return subSet(fromElement, true, toElement, false);
+    }
+
+    @Override
+    public SortedSet<V> headSet(V toElement)
+    {
+        return headSet(toElement, false);
+    }
+
+    @Override
+    public SortedSet<V> tailSet(V fromElement)
+    {
+        return tailSet(fromElement, true);
+    }
+
+    @Override
+    public V first()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public V last()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends V> c)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public V pollFirst()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public V pollLast()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean add(V v)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean remove(Object o)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public V lower(V v)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public V floor(V v)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public V ceiling(V v)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public V higher(V v)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean contains(Object o)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public NavigableSet<V> descendingSet()
+    {
+        return new BTreeRange<>(this.tree, this.comparator).descendingSet();
+    }
+
+    public static class BTreeRange<V> extends BTreeSet<V> implements NavigableSet<V>
+    {
+
+        protected final V lowerBound, upperBound;
+        protected final boolean inclusiveLowerBound, inclusiveUpperBound;
+
+        BTreeRange(Object[] tree, Comparator<V> comparator)
+        {
+            this(tree, comparator, null, true, null, true);
+        }
+
+        BTreeRange(BTreeRange<V> from)
+        {
+            this(from.tree, from.comparator, from.lowerBound, from.inclusiveLowerBound, from.upperBound, from.inclusiveUpperBound);
+        }
+
+        BTreeRange(Object[] tree, Comparator<V> comparator, V lowerBound, boolean inclusiveLowerBound, V upperBound, boolean inclusiveUpperBound)
+        {
+            super(tree, comparator);
+            this.lowerBound = lowerBound;
+            this.upperBound = upperBound;
+            this.inclusiveLowerBound = inclusiveLowerBound;
+            this.inclusiveUpperBound = inclusiveUpperBound;
+        }
+
+        // narrowing range constructor - makes this the intersection of the two ranges over the same tree b
+        BTreeRange(BTreeRange<V> a, BTreeRange<V> b)
+        {
+            super(a.tree, a.comparator);
+            assert a.tree == b.tree;
+            final BTreeRange<V> lb, ub;
+
+            if (a.lowerBound == null)
+            {
+                lb = b;
+            }
+            else if (b.lowerBound == null)
+            {
+                lb = a;
+            }
+            else
+            {
+                int c = comparator.compare(a.lowerBound, b.lowerBound);
+                if (c < 0)
+                    lb = b;
+                else if (c > 0)
+                    lb = a;
+                else if (!a.inclusiveLowerBound)
+                    lb = a;
+                else
+                    lb = b;
+            }
+
+            if (a.upperBound == null)
+            {
+                ub = b;
+            }
+            else if (b.upperBound == null)
+            {
+                ub = a;
+            }
+            else
+            {
+                int c = comparator.compare(b.upperBound, a.upperBound);
+                if (c < 0)
+                    ub = b;
+                else if (c > 0)
+                    ub = a;
+                else if (!a.inclusiveUpperBound)
+                    ub = a;
+                else
+                    ub = b;
+            }
+
+            lowerBound = lb.lowerBound;
+            inclusiveLowerBound = lb.inclusiveLowerBound;
+            upperBound = ub.upperBound;
+            inclusiveUpperBound = ub.inclusiveUpperBound;
+        }
+
+        @Override
+        protected Cursor<V> slice(boolean forwards, boolean permitInversion)
+        {
+            return BTree.slice(tree, comparator, lowerBound, inclusiveLowerBound, upperBound, inclusiveUpperBound, forwards);
+        }
+
+        @Override
+        public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive)
+        {
+            return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, fromElement, fromInclusive, toElement, toInclusive));
+        }
+
+        @Override
+        public NavigableSet<V> headSet(V toElement, boolean inclusive)
+        {
+            return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, lowerBound, true, toElement, inclusive));
+        }
+
+        @Override
+        public NavigableSet<V> tailSet(V fromElement, boolean inclusive)
+        {
+            return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, fromElement, inclusive, null, true));
+        }
+
+        @Override
+        public NavigableSet<V> descendingSet()
+        {
+            return new BTreeDescRange<>(this);
+        }
+    }
+
+    public static class BTreeDescRange<V> extends BTreeRange<V>
+    {
+        BTreeDescRange(BTreeRange<V> from)
+        {
+            super(from.tree, from.comparator, from.lowerBound, from.inclusiveLowerBound, from.upperBound, from.inclusiveUpperBound);
+        }
+
+        @Override
+        protected Cursor<V> slice(boolean forwards, boolean permitInversion)
+        {
+            return super.slice(permitInversion ? !forwards : forwards, false);
+        }
+
+        @Override
+        public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive)
+        {
+            return super.subSet(toElement, toInclusive, fromElement, fromInclusive).descendingSet();
+        }
+
+        @Override
+        public NavigableSet<V> headSet(V toElement, boolean inclusive)
+        {
+            return super.tailSet(toElement, inclusive).descendingSet();
+        }
+
+        @Override
+        public NavigableSet<V> tailSet(V fromElement, boolean inclusive)
+        {
+            return super.headSet(fromElement, inclusive).descendingSet();
+        }
+
+        @Override
+        public NavigableSet<V> descendingSet()
+        {
+            return new BTreeRange<>(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/utils/btree/Builder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/Builder.java b/src/java/org/apache/cassandra/utils/btree/Builder.java
new file mode 100644
index 0000000..0865146
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/btree/Builder.java
@@ -0,0 +1,101 @@
+package org.apache.cassandra.utils.btree;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import com.google.common.base.Function;
+
+import static org.apache.cassandra.utils.btree.BTree.EMPTY_BRANCH;
+import static org.apache.cassandra.utils.btree.BTree.EMPTY_LEAF;
+import static org.apache.cassandra.utils.btree.BTree.FAN_SHIFT;
+import static org.apache.cassandra.utils.btree.BTree.POSITIVE_INFINITY;
+
+/**
+ * A class for constructing a new BTree, either from an existing one and some set of modifications
+ * or a new tree from a sorted collection of items.
+ * <p/>
+ * This is a fairly heavy-weight object, so a ThreadLocal instance is created for making modifications to a tree
+ */
+final class Builder
+{
+    private final NodeBuilder rootBuilder = new NodeBuilder();
+
+    /**
+     * At the highest level, we adhere to the classic b-tree insertion algorithm:
+     *
+     * 1. Add to the appropriate leaf
+     * 2. Split the leaf if necessary, add the median to the parent
+     * 3. Split the parent if necessary, etc.
+     *
+     * There is one important difference: we don't actually modify the original tree, but copy each node that we
+     * modify.  Note that every node on the path to the key being inserted or updated will be modified; this
+     * implies that at a minimum, the root node will be modified for every update, so every root is a "snapshot"
+     * of a tree that can be iterated or sliced without fear of concurrent modifications.
+     *
+     * The NodeBuilder class handles the details of buffering the copied contents of the original tree and
+     * adding in our changes.  Since NodeBuilder maintains parent/child references, it also handles parent-splitting
+     * (easy enough, since any node affected by the split will already be copied into a NodeBuilder).
+     *
+     * One other difference from the simple algorithm is that we perform modifications in bulk;
+     * we assume @param source has been sorted, e.g. by BTree.update, so the update of each key resumes where
+     * the previous left off.
+     */
+    public <V> Object[] update(Object[] btree, Comparator<V> comparator, Collection<V> source, ReplaceFunction<V> replaceF, Function<?, Boolean> terminateEarly)
+    {
+        NodeBuilder current = rootBuilder;
+        current.reset(btree, POSITIVE_INFINITY);
+
+        for (V key : source)
+        {
+            while (true)
+            {
+                if (terminateEarly != null && terminateEarly.apply(null) == Boolean.TRUE)
+                {
+                    rootBuilder.clear();
+                    return null;
+                }
+                NodeBuilder next = current.update(key, comparator, replaceF);
+                if (next == null)
+                    break;
+                // we were in a subtree from a previous key that didn't contain this new key;
+                // retry against the correct subtree
+                current = next;
+            }
+        }
+
+        // finish copying any remaining keys from the original btree
+        while (true)
+        {
+            NodeBuilder next = current.update(POSITIVE_INFINITY, comparator, replaceF);
+            if (next == null)
+                break;
+            current = next;
+        }
+
+        // updating with POSITIVE_INFINITY means that current should be back to the root
+        assert current.isRoot();
+
+        Object[] r = current.toNode();
+        current.clear();
+        return r;
+    }
+
+    public <V> Object[] build(Collection<V> source, int size)
+    {
+        NodeBuilder current = rootBuilder;
+        // we descend only to avoid wasting memory; in update() we will often descend into existing trees
+        // so here we want to descend also, so we don't have lg max(N) depth in both directions
+        while ((size >>= FAN_SHIFT) > 0)
+            current = current.ensureChild();
+
+        current.reset(EMPTY_LEAF, POSITIVE_INFINITY);
+        for (V key : source)
+            current.addNewKey(key, null);
+
+        current = current.ascendToRoot();
+
+        Object[] r = current.toNode();
+        current.clear();
+        return r;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/412b053c/src/java/org/apache/cassandra/utils/btree/Cursor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/Cursor.java b/src/java/org/apache/cassandra/utils/btree/Cursor.java
new file mode 100644
index 0000000..aa747be
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/btree/Cursor.java
@@ -0,0 +1,199 @@
+package org.apache.cassandra.utils.btree;
+
+import java.util.Comparator;
+import java.util.Iterator;
+
+import static org.apache.cassandra.utils.btree.BTree.MAX_DEPTH;
+import static org.apache.cassandra.utils.btree.BTree.NEGATIVE_INFINITY;
+import static org.apache.cassandra.utils.btree.BTree.POSITIVE_INFINITY;
+import static org.apache.cassandra.utils.btree.BTree.getLeafKeyEnd;
+import static org.apache.cassandra.utils.btree.BTree.isLeaf;
+
+/**
+ * An extension of Path which provides a public interface for iterating over or counting a subrange of the tree
+ *
+ * @param <V>
+ */
+public final class Cursor<V> extends Path implements Iterator<V>
+{
+    /*
+     * Conceptually, a Cursor derives two Paths, one for the first object in the slice requested (inclusive),
+     * and one for the last (exclusive).  Then hasNext just checks, have we reached the last yet, and next
+     * calls successor() to get to the next item in the Tree.
+     *
+     * To optimize memory use, we summarize the last Path as just endNode/endIndex, and inherit from Path for
+     *
+     * the first one.
+     */
+
+    /**
+     * Returns a cursor that can be reused to iterate over trees
+     *
+     * @param <V>
+     * @return
+     */
+    static <V> Cursor<V> newCursor()
+    {
+        // try to encourage stack allocation - may be misguided. but no harm
+        Object[][] stack = new Object[MAX_DEPTH][];
+        byte[] index = new byte[MAX_DEPTH];
+        return new Cursor(stack, index);
+    }
+
+    // the last node covered by the requested range
+    private Object[] endNode;
+    // the index within endNode that signals we're finished -- that is, endNode[endIndex] is NOT part of the Cursor
+    private byte endIndex;
+
+    private boolean forwards;
+
+    private Cursor(Object[][] stack, byte[] index)
+    {
+        super(stack, index);
+    }
+
+    /**
+     * Reset this cursor for the provided tree, to iterate over its entire range
+     *
+     * @param btree    the tree to iterate over
+     * @param forwards if false, the cursor will start at the end and move backwards
+     */
+    public void reset(Object[] btree, boolean forwards)
+    {
+        _reset(btree, null, NEGATIVE_INFINITY, false, POSITIVE_INFINITY, false, forwards);
+    }
+
+    /**
+     * Reset this cursor for the provided tree, to iterate between the provided start and end
+     *
+     * @param btree      the tree to iterate over
+     * @param comparator the comparator that defines the ordering over the items in the tree
+     * @param lowerBound the first item to include, inclusive
+     * @param upperBound the last item to include, exclusive
+     * @param forwards   if false, the cursor will start at the end and move backwards
+     */
+    public void reset(Object[] btree, Comparator<V> comparator, V lowerBound, V upperBound, boolean forwards)
+    {
+        _reset(btree, comparator, lowerBound, true, upperBound, false, forwards);
+    }
+
+    /**
+     * Reset this cursor for the provided tree, to iterate between the provided start and end
+     *
+     * @param btree               the tree to iterate over
+     * @param comparator          the comparator that defines the ordering over the items in the tree
+     * @param lowerBound          the first item to include
+     * @param inclusiveLowerBound should include start in the iterator, if present in the tree
+     * @param upperBound          the last item to include
+     * @param inclusiveUpperBound should include end in the iterator, if present in the tree
+     * @param forwards            if false, the cursor will start at the end and move backwards
+     */
+    public void reset(Object[] btree, Comparator<V> comparator, V lowerBound, boolean inclusiveLowerBound, V upperBound, boolean inclusiveUpperBound, boolean forwards)
+    {
+        _reset(btree, comparator, lowerBound, inclusiveLowerBound, upperBound, inclusiveUpperBound, forwards);
+    }
+
+    private void _reset(Object[] btree, Comparator<V> comparator, Object lowerBound, boolean inclusiveLowerBound, Object upperBound, boolean inclusiveUpperBound, boolean forwards)
+    {
+        if (lowerBound == null)
+            lowerBound = NEGATIVE_INFINITY;
+        if (upperBound == null)
+            upperBound = POSITIVE_INFINITY;
+
+        this.forwards = forwards;
+
+        Path findLast = Path.newPath();
+        if (forwards)
+        {
+            findLast.find(btree, comparator, upperBound, inclusiveUpperBound ? Op.HIGHER : Op.CEIL, true);
+            find(btree, comparator, lowerBound, inclusiveLowerBound ? Op.CEIL : Op.HIGHER, true);
+        }
+        else
+        {
+            findLast.find(btree, comparator, lowerBound, inclusiveLowerBound ? Op.LOWER : Op.FLOOR, false);
+            find(btree, comparator, upperBound, inclusiveUpperBound ? Op.FLOOR : Op.LOWER, false);
+        }
+        int c = this.compareTo(findLast, forwards);
+        if (forwards ? c > 0 : c < 0)
+        {
+            endNode = currentNode();
+            endIndex = currentIndex();
+        }
+        else
+        {
+            endNode = findLast.currentNode();
+            endIndex = findLast.currentIndex();
+        }
+    }
+
+    public boolean hasNext()
+    {
+        return path[depth] != endNode || indexes[depth] != endIndex;
+    }
+
+    public V next()
+    {
+        Object r = currentKey();
+        if (forwards)
+            successor();
+        else
+            predecessor();
+        return (V) r;
+    }
+
+    public int count()
+    {
+        if (!forwards)
+            throw new IllegalStateException("Count can only be run on forward cursors");
+        int count = 0;
+        int next;
+        while ((next = consumeNextLeaf()) >= 0)
+            count += next;
+        return count;
+    }
+
+    /**
+     * @return the number of objects consumed by moving out of the next (possibly current) leaf
+     */
+    private int consumeNextLeaf()
+    {
+        Object[] node = currentNode();
+        int r = 0;
+
+        if (!isLeaf(node))
+        {
+            // if we're not in a leaf, then calling successor once will take us to a leaf, since the next
+            // key will be in the leftmost subtree of whichever branch is next.  For instance, if we
+            // are in the root node of the tree depicted by http://cis.stvincent.edu/html/tutorials/swd/btree/btree1.gif,
+            // successor() will take us to the leaf containing N and O.
+            int i = currentIndex();
+            if (node == endNode && i == endIndex)
+                return -1;
+            r = 1;
+            successor();
+            node = currentNode();
+        }
+
+        if (node == endNode)
+        {
+            // only count up to endIndex, and don't call successor()
+            if (currentIndex() == endIndex)
+                return r > 0 ? r : -1;
+            r += endIndex - currentIndex();
+            setIndex(endIndex);
+            return r;
+        }
+
+        // count the remaining objects in this leaf
+        int keyEnd = getLeafKeyEnd(node);
+        r += keyEnd - currentIndex();
+        setIndex(keyEnd);
+        successor();
+        return r;
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+}


Mime
View raw message