cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [2/2] cassandra git commit: Use BTree to back default Row and ComplexColumnData objects
Date Tue, 28 Jul 2015 14:01:00 GMT
Use BTree to back default Row and ComplexColumnData objects

patch by benedict; reviewed by branimir for CASSANDRA-9888


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/639d4b24
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/639d4b24
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/639d4b24

Branch: refs/heads/trunk
Commit: 639d4b240c084900b6589222a0984babfc1890b1
Parents: de420e5
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Thu Jul 23 14:22:16 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Tue Jul 28 15:00:27 2015 +0100

----------------------------------------------------------------------
 .../cassandra/config/ColumnDefinition.java      |  16 +
 .../apache/cassandra/cql3/UpdateParameters.java |   4 +-
 .../cassandra/db/HintedHandOffManager.java      |   4 +-
 .../org/apache/cassandra/db/LegacyLayout.java   |   4 +-
 .../apache/cassandra/db/RowUpdateBuilder.java   |   6 +-
 .../cassandra/db/UnfilteredDeserializer.java    |   2 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |   4 +-
 .../db/index/composites/CompositesIndex.java    |   2 +-
 .../AbstractThreadUnsafePartition.java          |   2 +-
 .../db/partitions/AtomicBTreePartition.java     |   7 +-
 .../db/partitions/PartitionUpdate.java          |   4 +-
 .../apache/cassandra/db/rows/AbstractCell.java  |   8 +-
 .../cassandra/db/rows/ArrayBackedRow.java       | 927 -------------------
 .../cassandra/db/rows/BTreeBackedRow.java       | 535 +++++++++++
 .../apache/cassandra/db/rows/BufferCell.java    |   9 +-
 src/java/org/apache/cassandra/db/rows/Cell.java |  35 +-
 .../apache/cassandra/db/rows/ColumnData.java    |  25 +-
 .../cassandra/db/rows/ComplexColumnData.java    | 208 ++---
 src/java/org/apache/cassandra/db/rows/Row.java  |   6 +-
 src/java/org/apache/cassandra/db/rows/Rows.java |   4 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |   2 +-
 .../db/rows/UnfilteredRowIterators.java         |   2 +-
 .../cassandra/db/rows/UnfilteredSerializer.java |   2 +-
 .../io/sstable/SSTableSimpleIterator.java       |   2 +-
 .../apache/cassandra/service/DataResolver.java  |   2 +-
 .../cassandra/thrift/CassandraServer.java       |   9 +-
 .../cassandra/thrift/ThriftResultsMerger.java   |   4 +-
 .../org/apache/cassandra/utils/btree/BTree.java | 320 ++++++-
 .../utils/btree/BTreeSearchIterator.java        |   8 +-
 .../apache/cassandra/utils/btree/BTreeSet.java  |  18 +-
 .../cassandra/utils/btree/NodeBuilder.java      |   3 +-
 .../cassandra/utils/btree/TreeCursor.java       |   4 +-
 .../utils/memory/AbstractAllocator.java         |   6 +-
 .../apache/cassandra/utils/LongBTreeTest.java   | 173 +++-
 test/unit/org/apache/cassandra/db/RowTest.java  |   4 +-
 .../rows/RowAndDeletionMergeIteratorTest.java   |   2 +-
 .../rows/UnfilteredRowIteratorsMergeTest.java   |   2 +-
 .../cassandra/triggers/TriggerExecutorTest.java |   2 +-
 .../org/apache/cassandra/utils/BTreeTest.java   |  75 +-
 39 files changed, 1200 insertions(+), 1252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 54a00f5..8d8a929 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.exceptions.*;
 
 public class ColumnDefinition extends ColumnSpecification implements Comparable<ColumnDefinition>
 {
+    public static final Comparator<Object> asymmetricColumnDataComparator = (a, b) -> ((ColumnData) a).column().compareTo((ColumnDefinition) b);
+
     /*
      * The type of CQL3 column this definition represents.
      * There is 4 main type of CQL3 columns: those parts of the partition key,
@@ -70,6 +72,8 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
     private final Integer componentIndex;
 
     private final Comparator<CellPath> cellPathComparator;
+    private final Comparator<Object> asymmetricCellPathComparator;
+    private final Comparator<? super Cell> cellComparator;
 
     /**
      * These objects are compared frequently, so we encode several of their comparison components
@@ -156,6 +160,8 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
         this.componentIndex = componentIndex;
         this.setIndexType(indexType, indexOptions);
         this.cellPathComparator = makeCellPathComparator(kind, validator);
+        this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path());
+        this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell)a).path(), (CellPath) b);
         this.comparisonOrder = comparisonOrder(kind, isComplex(), position());
     }
 
@@ -407,6 +413,16 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
         return cellPathComparator;
     }
 
+    public Comparator<Object> asymmetricCellPathComparator()
+    {
+        return asymmetricCellPathComparator;
+    }
+
+    public Comparator<? super Cell> cellComparator()
+    {
+        return cellComparator;
+    }
+
     public boolean isComplex()
     {
         return cellPathComparator != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 8dcb7e5..519eb4b 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -120,13 +120,13 @@ public class UpdateParameters
         if (clustering == Clustering.STATIC_CLUSTERING)
         {
             if (staticBuilder == null)
-                staticBuilder = ArrayBackedRow.unsortedBuilder(updatedColumns.statics, nowInSec);
+                staticBuilder = BTreeBackedRow.unsortedBuilder(updatedColumns.statics, nowInSec);
             builder = staticBuilder;
         }
         else
         {
             if (regularBuilder == null)
-                regularBuilder = ArrayBackedRow.unsortedBuilder(updatedColumns.regulars, nowInSec);
+                regularBuilder = BTreeBackedRow.unsortedBuilder(updatedColumns.regulars, nowInSec);
             builder = regularBuilder;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 234ab97..6ff880c 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -137,7 +137,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
         Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value);
 
-        return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, ArrayBackedRow.singleCellRow(clustering, cell)));
+        return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, BTreeBackedRow.singleCellRow(clustering, cell)));
     }
 
     /*
@@ -181,7 +181,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     {
         DecoratedKey dk =  StorageService.getPartitioner().decorateKey(tokenBytes);
         Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds());
-        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, ArrayBackedRow.singleCellRow(clustering, cell));
+        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, BTreeBackedRow.singleCellRow(clustering, cell));
         new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 501dbb2..696c1c9 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -353,7 +353,7 @@ public abstract class LegacyLayout
         for (ColumnDefinition column : statics)
             columnsToFetch.add(column.name.bytes);
 
-        Row.Builder builder = ArrayBackedRow.unsortedBuilder(statics, FBUtilities.nowInSeconds());
+        Row.Builder builder = BTreeBackedRow.unsortedBuilder(statics, FBUtilities.nowInSeconds());
         builder.newRow(Clustering.STATIC_CLUSTERING);
 
         boolean foundOne = false;
@@ -822,7 +822,7 @@ public abstract class LegacyLayout
             this.metadata = metadata;
             this.isStatic = isStatic;
             this.helper = helper;
-            this.builder = ArrayBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
+            this.builder = BTreeBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
         }
 
         public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index 71b7bd8..c06a7f7 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -80,7 +80,7 @@ public class RowUpdateBuilder
         assert staticBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object";
         assert regularBuilder == null : "Cannot add the clustering twice to the same row";
 
-        regularBuilder = ArrayBackedRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds());
+        regularBuilder = BTreeBackedRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds());
         regularBuilder.newRow(clustering);
 
         // If a CQL table, add the "row marker"
@@ -105,7 +105,7 @@ public class RowUpdateBuilder
         assert regularBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object";
         if (staticBuilder == null)
         {
-            staticBuilder = ArrayBackedRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds());
+            staticBuilder = BTreeBackedRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds());
             staticBuilder.newRow(Clustering.STATIC_CLUSTERING);
         }
         return staticBuilder;
@@ -186,7 +186,7 @@ public class RowUpdateBuilder
         assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty());
 
         boolean isStatic = clusteringValues.length != update.metadata().comparator.size();
-        Row.Builder builder = ArrayBackedRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars);
+        Row.Builder builder = BTreeBackedRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars);
 
         if (isStatic)
             builder.newRow(Clustering.STATIC_CLUSTERING);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 36a372f..c00597a 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -121,7 +121,7 @@ public abstract class UnfilteredDeserializer
             super(metadata, in, helper);
             this.header = header;
             this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
-            this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
+            this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
         }
 
         public boolean hasNext() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index c3a3c08..842cbb9 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -115,7 +115,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
 
-        Row row = ArrayBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
+        Row row = BTreeBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
         PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 
         indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
@@ -132,7 +132,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
 
-        Row row = ArrayBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
+        Row row = BTreeBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
         PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 
         if (logger.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index e073802..42861c5 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -112,7 +112,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
     {
-        Row row = ArrayBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
+        Row row = BTreeBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
         PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row);
         indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
index d79ab06..a716768 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
@@ -167,7 +167,7 @@ public abstract class AbstractThreadUnsafePartition implements Partition, Iterab
                     activeDeletion = rt.deletionTime();
 
                 if (row == null)
-                    return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
+                    return activeDeletion.isLive() ? null : BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion);
 
                 return row.filter(columns, activeDeletion, true, metadata);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index c06ffd5..e8ec4c0 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.service.StorageService;
 import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
 
 /**
  * A thread-safe and atomic Partition implementation.
@@ -164,7 +165,7 @@ public class AtomicBTreePartition implements Partition
         final Holder current = ref;
         return new SearchIterator<Clustering, Row>()
         {
-            private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
+            private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed));
             private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
 
             public boolean hasNext()
@@ -188,7 +189,7 @@ public class AtomicBTreePartition implements Partition
                     activeDeletion = rt.deletionTime();
 
                 if (row == null)
-                    return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
+                    return activeDeletion.isLive() ? null : BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion);
 
                 return row.filter(columns, activeDeletion, true, metadata);
             }
@@ -235,7 +236,7 @@ public class AtomicBTreePartition implements Partition
     {
         Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
         Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
-        Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, !reversed);
+        Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed));
 
         return new RowAndDeletionMergeIterator(metadata,
                                                partitionKey,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 689b832..102008f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -822,8 +822,8 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
         {
             // This is a bit of a giant hack as this is the only place where we mutate a Row object. This makes it more efficient
             // for counters however and this won't be needed post-#6506 so that's probably fine.
-            assert row instanceof ArrayBackedRow;
-            ((ArrayBackedRow)row).setValue(column, path, value);
+            assert row instanceof BTreeBackedRow;
+            ((BTreeBackedRow)row).setValue(column, path, value);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 807741a..f53322a 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
 import java.security.MessageDigest;
 import java.util.Objects;
 
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
@@ -32,8 +33,13 @@ import org.apache.cassandra.utils.FBUtilities;
  * Unless you have a very good reason not to, every cell implementation
  * should probably extend this class.
  */
-public abstract class AbstractCell implements Cell
+public abstract class AbstractCell extends Cell
 {
+    protected AbstractCell(ColumnDefinition column)
+    {
+        super(column);
+    }
+
     public void digest(MessageDigest digest)
     {
         digest.update(value().duplicate());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java b/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
deleted file mode 100644
index 12b23e1..0000000
--- a/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
+++ /dev/null
@@ -1,927 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.function.Predicate;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.utils.SearchIterator;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Immutable implementation of a Row object.
- */
-public class ArrayBackedRow extends AbstractRow
-{
-    private static final ColumnData[] NO_DATA = new ColumnData[0];
-
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayBackedRow(Clustering.EMPTY, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE));
-
-    private final Clustering clustering;
-    private final Columns columns;
-    private final LivenessInfo primaryKeyLivenessInfo;
-    private final DeletionTime deletion;
-
-    // The data for each columns present in this row in column sorted order.
-    private final int size;
-    private final ColumnData[] data;
-
-    // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
-    // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
-    // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
-    // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
-    // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
-    // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
-    // no expiring cells, this will be Integer.MAX_VALUE;
-    private final int minLocalDeletionTime;
-
-    private ArrayBackedRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, ColumnData[] data, int minLocalDeletionTime)
-    {
-        this.clustering = clustering;
-        this.columns = columns;
-        this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
-        this.deletion = deletion;
-        this.size = size;
-        this.data = data;
-        this.minLocalDeletionTime = minLocalDeletionTime;
-    }
-
-    // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
-    public static ArrayBackedRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, ColumnData[] data)
-    {
-        int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-        if (minDeletionTime != Integer.MIN_VALUE)
-        {
-            for (int i = 0; i < size; i++)
-                minDeletionTime = Math.min(minDeletionTime, minDeletionTime(data[i]));
-        }
-
-        return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
-    }
-
-    public static ArrayBackedRow emptyRow(Clustering clustering)
-    {
-        return new ArrayBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE);
-    }
-
-    public static ArrayBackedRow singleCellRow(Clustering clustering, Cell cell)
-    {
-        if (cell.column().isSimple())
-            return new ArrayBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ cell }, minDeletionTime(cell));
-
-        ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
-        return new ArrayBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ complexData }, minDeletionTime(cell));
-    }
-
-    public static ArrayBackedRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
-    {
-        assert !deletion.isLive();
-        return new ArrayBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, 0, NO_DATA, Integer.MIN_VALUE);
-    }
-
-    public static ArrayBackedRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
-    {
-        assert !primaryKeyLivenessInfo.isEmpty();
-        return new ArrayBackedRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, 0, NO_DATA, minDeletionTime(primaryKeyLivenessInfo));
-    }
-
-    private static int minDeletionTime(Cell cell)
-    {
-        return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime();
-    }
-
-    private static int minDeletionTime(LivenessInfo info)
-    {
-        return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE;
-    }
-
-    private static int minDeletionTime(DeletionTime dt)
-    {
-        return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
-    }
-
-    private static int minDeletionTime(ComplexColumnData cd)
-    {
-        int min = minDeletionTime(cd.complexDeletion());
-        for (Cell cell : cd)
-            min = Math.min(min, minDeletionTime(cell));
-        return min;
-    }
-
-    private static int minDeletionTime(ColumnData cd)
-    {
-        return cd.column().isSimple() ? minDeletionTime((Cell)cd) : minDeletionTime((ComplexColumnData)cd);
-    }
-
-    public Clustering clustering()
-    {
-        return clustering;
-    }
-
-    public Columns columns()
-    {
-        return columns;
-    }
-
-    public LivenessInfo primaryKeyLivenessInfo()
-    {
-        return primaryKeyLivenessInfo;
-    }
-
-    public DeletionTime deletion()
-    {
-        return deletion;
-    }
-
-    public Cell getCell(ColumnDefinition c)
-    {
-        assert !c.isComplex();
-        int idx = binarySearch(c);
-        return idx < 0 ? null : (Cell)data[idx];
-    }
-
-    public Cell getCell(ColumnDefinition c, CellPath path)
-    {
-        assert c.isComplex();
-        int idx = binarySearch(c);
-        if (idx < 0)
-            return null;
-
-        return ((ComplexColumnData)data[idx]).getCell(path);
-    }
-
-    public ComplexColumnData getComplexColumnData(ColumnDefinition c)
-    {
-        assert c.isComplex();
-        int idx = binarySearch(c);
-        return idx < 0 ? null : (ComplexColumnData)data[idx];
-    }
-
-    public Iterator<ColumnData> iterator()
-    {
-        return new ColumnDataIterator();
-    }
-
-    public Iterable<Cell> cells()
-    {
-        return CellIterator::new;
-    }
-
-    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
-    {
-        return new ColumnSearchIterator();
-    }
-
-    public Row filter(ColumnFilter filter, CFMetaData metadata)
-    {
-        return filter(filter, DeletionTime.LIVE, false, metadata);
-    }
-
-    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata)
-    {
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
-
-        if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
-            return this;
-
-        boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
-
-        LivenessInfo newInfo = primaryKeyLivenessInfo;
-        DeletionTime newDeletion = deletion;
-        if (mayHaveShadowed)
-        {
-            if (activeDeletion.deletes(newInfo.timestamp()))
-                newInfo = LivenessInfo.EMPTY;
-            // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
-            // the row deletion is shadowed and we shouldn't return it.
-            newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
-        }
-
-        ColumnData[] newData = new ColumnData[size];
-        int newMinDeletionTime = Math.min(minDeletionTime(newInfo), minDeletionTime(newDeletion));
-        Columns columns = filter.fetchedColumns().columns(isStatic());
-        Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
-        int newSize = 0;
-        for (int i = 0; i < size; i++)
-        {
-            ColumnData cd = data[i];
-            ColumnDefinition column = cd.column();
-            if (!inclusionTester.test(column))
-                continue;
-
-            CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
-            if (column.isSimple())
-            {
-                Cell cell = (Cell)cd;
-                if ((dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell)))
-                {
-                    newData[newSize++] = cell;
-                    newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(cell));
-                }
-            }
-            else
-            {
-                ColumnData newCd = ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
-                if (newCd != null)
-                {
-                    newData[newSize++] = newCd;
-                    newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(newCd));
-                }
-            }
-        }
-
-        if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive())
-            return null;
-
-        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, newSize, newData, newMinDeletionTime);
-    }
-
-    public boolean hasComplexDeletion()
-    {
-        // We start by the end cause we know complex columns sort before simple ones
-        for (int i = size - 1; i >= 0; i--)
-        {
-            ColumnData cd = data[i];
-            if (cd.column().isSimple())
-                return false;
-
-            if (!((ComplexColumnData)cd).complexDeletion().isLive())
-                return true;
-        }
-        return false;
-    }
-
-    public Row markCounterLocalToBeCleared()
-    {
-        ColumnData[] newData = null;
-        for (int i = 0; i < size; i++)
-        {
-            ColumnData cd = data[i];
-            ColumnData newCd = cd.column().cellValueType().isCounter()
-                             ? cd.markCounterLocalToBeCleared()
-                             : cd;
-            if (newCd != cd)
-            {
-                if (newData == null)
-                    newData = Arrays.copyOf(data, size);
-                newData[i] = newCd;
-            }
-        }
-
-        return newData == null
-             ? this
-             : new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, newData, minLocalDeletionTime);
-    }
-
-    public boolean hasDeletion(int nowInSec)
-    {
-        return nowInSec >= minLocalDeletionTime;
-    }
-
-    /**
-     * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and
-     * all deletion timestamp by {@code newTimestamp - 1}.
-     *
-     * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
-     */
-    public Row updateAllTimestamp(long newTimestamp)
-    {
-        LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
-        DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
-
-        ColumnData[] newData = new ColumnData[size];
-        for (int i = 0; i < size; i++)
-            newData[i] = data[i].updateAllTimestamp(newTimestamp);
-
-        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, size, newData, minLocalDeletionTime);
-    }
-
-    public Row purge(DeletionPurger purger, int nowInSec)
-    {
-        if (!hasDeletion(nowInSec))
-            return this;
-
-        LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
-        DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
-
-        int newMinDeletionTime = Math.min(minDeletionTime(newInfo), minDeletionTime(newDeletion));
-        ColumnData[] newData = new ColumnData[size];
-        int newSize = 0;
-        for (int i = 0; i < size; i++)
-        {
-            ColumnData purged = data[i].purge(purger, nowInSec);
-            if (purged != null)
-            {
-                newData[newSize++] = purged;
-                newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(purged));
-            }
-        }
-
-        if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive())
-            return null;
-
-        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, newSize, newData, newMinDeletionTime);
-    }
-
-    public int dataSize()
-    {
-        int dataSize = clustering.dataSize()
-                     + primaryKeyLivenessInfo.dataSize()
-                     + deletion.dataSize();
-
-        for (int i = 0; i < size; i++)
-            dataSize += data[i].dataSize();
-        return dataSize;
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        long heapSize = EMPTY_SIZE
-                      + clustering.unsharedHeapSizeExcludingData()
-                      + ObjectSizes.sizeOfArray(data);
-
-        for (int i = 0; i < size; i++)
-            heapSize += data[i].unsharedHeapSizeExcludingData();
-        return heapSize;
-    }
-
-    public static Row.Builder sortedBuilder(Columns columns)
-    {
-        return new SortedBuilder(columns);
-    }
-
-    public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
-    {
-        return new UnsortedBuilder(columns, nowInSec);
-    }
-
-    // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general
-    // assumption that Row objects are immutable. This method should go away post-#6506 in particular.
-    // This method is in particular not exposed by the Row API on purpose.
-    // This method also *assumes* that the cell we're setting already exists.
-    public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value)
-    {
-        int idx = binarySearch(column);
-        assert idx >= 0;
-        if (column.isSimple())
-            data[idx] = ((Cell)data[idx]).withUpdatedValue(value);
-        else
-            ((ComplexColumnData)data[idx]).setValue(path, value);
-    }
-
-    private int binarySearch(ColumnDefinition column)
-    {
-        return binarySearch(column, 0, size);
-    }
-
-    /**
-     * Simple binary search for a given column (in the data list).
-     *
-     * The return value has the exact same meaning that the one of Collections.binarySearch() but
-     * we don't use the later because we're searching for a 'ColumnDefinition' in an array of 'ColumnData'.
-     */
-    private int binarySearch(ColumnDefinition column, int fromIndex, int toIndex)
-    {
-        int low = fromIndex;
-        int mid = toIndex;
-        int high = mid - 1;
-        int result = -1;
-        while (low <= high)
-        {
-            mid = (low + high) >> 1;
-            if ((result = column.compareTo(data[mid].column())) > 0)
-                low = mid + 1;
-            else if (result == 0)
-                return mid;
-            else
-                high = mid - 1;
-        }
-        return -mid - (result < 0 ? 1 : 2);
-    }
-
-    private class ColumnDataIterator extends AbstractIterator<ColumnData>
-    {
-        private int i;
-
-        protected ColumnData computeNext()
-        {
-            return i < size ? data[i++] : endOfData();
-        }
-    }
-
-    private class CellIterator extends AbstractIterator<Cell>
-    {
-        private int i;
-        private Iterator<Cell> complexCells;
-
-        protected Cell computeNext()
-        {
-            while (true)
-            {
-                if (complexCells != null)
-                {
-                    if (complexCells.hasNext())
-                        return complexCells.next();
-
-                    complexCells = null;
-                }
-
-                if (i >= size)
-                    return endOfData();
-
-                ColumnData cd = data[i++];
-                if (cd.column().isComplex())
-                    complexCells = ((ComplexColumnData)cd).iterator();
-                else
-                    return (Cell)cd;
-            }
-        }
-    }
-
-    private class ColumnSearchIterator implements SearchIterator<ColumnDefinition, ColumnData>
-    {
-        // The index at which the next call to "next" should start looking from
-        private int searchFrom = 0;
-
-        public boolean hasNext()
-        {
-            return searchFrom < size;
-        }
-
-        public ColumnData next(ColumnDefinition column)
-        {
-            int idx = binarySearch(column, searchFrom, size);
-            if (idx < 0)
-            {
-                searchFrom = -idx - 1;
-                return null;
-            }
-            else
-            {
-                // We've found it. We'll start after it next time.
-                searchFrom = idx + 1;
-                return data[idx];
-            }
-        }
-    }
-
-    private static abstract class AbstractBuilder implements Row.Builder
-    {
-        protected final Columns columns;
-
-        protected Clustering clustering;
-        protected LivenessInfo primaryKeyLivenessInfo;
-        protected DeletionTime deletion;
-
-        protected List<Cell> cells = new ArrayList<>();
-
-        // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion.
-        protected DeletionTime[] complexDeletions;
-        protected int columnsWithComplexDeletion;
-
-        protected AbstractBuilder(Columns columns)
-        {
-            this.columns = columns;
-            this.complexDeletions = new DeletionTime[columns.complexColumnCount()];
-        }
-
-        public void newRow(Clustering clustering)
-        {
-            assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before
-            this.clustering = clustering;
-        }
-
-        public Clustering clustering()
-        {
-            return clustering;
-        }
-
-        protected void reset()
-        {
-            this.clustering = null;
-            this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
-            this.deletion = DeletionTime.LIVE;
-            this.cells.clear();
-            Arrays.fill(this.complexDeletions, null);
-            this.columnsWithComplexDeletion = 0;
-        }
-
-        public void addPrimaryKeyLivenessInfo(LivenessInfo info)
-        {
-            this.primaryKeyLivenessInfo = info;
-        }
-
-        public void addRowDeletion(DeletionTime deletion)
-        {
-            this.deletion = deletion;
-        }
-
-        public void addCell(Cell cell)
-        {
-            assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
-            cells.add(cell);
-        }
-
-        public Row build()
-        {
-            Row row = buildInternal();
-            reset();
-            return row;
-        }
-
-        protected abstract Row buildInternal();
-
-        protected Row buildNoCells()
-        {
-            assert cells.isEmpty();
-            int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-            if (columnsWithComplexDeletion == 0)
-                return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, 0, NO_DATA, minDeletionTime);
-
-            ColumnData[] data = new ColumnData[columnsWithComplexDeletion];
-            int size = 0;
-            for (int i = 0; i < complexDeletions.length; i++)
-            {
-                DeletionTime complexDeletion = complexDeletions[i];
-                if (complexDeletion != null)
-                {
-                    assert !complexDeletion.isLive();
-                    data[size++] = new ComplexColumnData(columns.getComplex(i), ComplexColumnData.NO_CELLS, complexDeletion);
-                    minDeletionTime = Integer.MIN_VALUE;
-                }
-            }
-            return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
-        }
-    }
-
-    public static class SortedBuilder extends AbstractBuilder
-    {
-        private int columnCount;
-
-        private ColumnDefinition column;
-
-        // The index of the last column for which we've called setColumn if complex.
-        private int complexColumnIndex;
-
-        // For complex column at index i of 'columns', we store at complexColumnCellsCount[i] its number of added cells.
-        private final int[] complexColumnCellsCount;
-
-        protected SortedBuilder(Columns columns)
-        {
-            super(columns);
-            this.complexColumnCellsCount = new int[columns.complexColumnCount()];
-            reset();
-        }
-
-        @Override
-        protected void reset()
-        {
-            super.reset();
-            this.column = null;
-            this.columnCount = 0;
-            this.complexColumnIndex = -1;
-            Arrays.fill(this.complexColumnCellsCount, 0);
-        }
-
-        public boolean isSorted()
-        {
-            return true;
-        }
-
-        private void setColumn(ColumnDefinition column)
-        {
-            int cmp = this.column == null ? -1 : this.column.compareTo(column);
-            assert cmp <= 0 : "current = " + this.column + ", new = " + column;
-            if (cmp != 0)
-            {
-                this.column = column;
-                ++columnCount;
-                if (column.isComplex())
-                    complexColumnIndex = columns.complexIdx(column, complexColumnIndex + 1);
-            }
-        }
-
-        @Override
-        public void addCell(Cell cell)
-        {
-            setColumn(cell.column());
-            super.addCell(cell);
-            if (column.isComplex())
-                complexColumnCellsCount[complexColumnIndex] += 1;
-        }
-
-        @Override
-        public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
-        {
-            if (complexDeletion.isLive())
-                return;
-
-            setColumn(column);
-            assert complexDeletions[complexColumnIndex] == null;
-            complexDeletions[complexColumnIndex] = complexDeletion;
-            ++columnsWithComplexDeletion;
-        }
-
-        protected Row buildInternal()
-        {
-            if (cells.isEmpty())
-                return buildNoCells();
-
-            int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-
-            ColumnData[] data = new ColumnData[columnCount];
-            int complexIdx = 0;
-            int i = 0;
-            int size = 0;
-            while (i < cells.size())
-            {
-                Cell cell = cells.get(i);
-                ColumnDefinition column = cell.column();
-                if (column.isSimple())
-                {
-                    data[size++] = cell;
-                    minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cell));
-                    ++i;
-                }
-                else
-                {
-                    while (columns.getComplex(complexIdx).compareTo(column) < 0)
-                    {
-                        if (complexDeletions[complexIdx] != null)
-                        {
-                            data[size++] = new ComplexColumnData(columns.getComplex(complexIdx), ComplexColumnData.NO_CELLS, complexDeletions[complexIdx]);
-                            minDeletionTime = Integer.MIN_VALUE;
-                        }
-                        ++complexIdx;
-                    }
-
-                    DeletionTime complexDeletion = complexDeletions[complexIdx];
-                    if (complexDeletion != null)
-                        minDeletionTime = Integer.MIN_VALUE;
-                    int cellCount = complexColumnCellsCount[complexIdx];
-                    Cell[] complexCells = new Cell[cellCount];
-                    for (int j = 0; j < cellCount; j++)
-                    {
-                        Cell complexCell = cells.get(i + j);
-                        complexCells[j] = complexCell;
-                        minDeletionTime = Math.min(minDeletionTime, minDeletionTime(complexCell));
-                    }
-                    i += cellCount;
-
-                    data[size++] = new ComplexColumnData(column, complexCells, complexDeletion == null ? DeletionTime.LIVE : complexDeletion);
-                    ++complexIdx;
-                }
-            }
-            for (int j = complexIdx; j < complexDeletions.length; j++)
-            {
-                if (complexDeletions[j] != null)
-                {
-                    data[size++] = new ComplexColumnData(columns.getComplex(j), ComplexColumnData.NO_CELLS, complexDeletions[j]);
-                    minDeletionTime = Integer.MIN_VALUE;
-                }
-            }
-            assert size == data.length;
-            return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
-        }
-    }
-
-    private static class UnsortedBuilder extends AbstractBuilder
-    {
-        private final int nowInSec;
-
-        private UnsortedBuilder(Columns columns, int nowInSec)
-        {
-            super(columns);
-            this.nowInSec = nowInSec;
-            reset();
-        }
-
-        public boolean isSorted()
-        {
-            return false;
-        }
-
-        public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
-        {
-            assert column.isComplex();
-            assert column.isStatic() == (clustering == Clustering.STATIC_CLUSTERING);
-
-            if (complexDeletion.isLive())
-                return;
-
-            int complexColumnIndex = columns.complexIdx(column, 0);
-
-            DeletionTime previous = complexDeletions[complexColumnIndex];
-            if (previous == null || complexDeletion.supersedes(previous))
-            {
-                complexDeletions[complexColumnIndex] = complexDeletion;
-                if (previous == null)
-                    ++columnsWithComplexDeletion;
-            }
-        }
-
-        protected Row buildInternal()
-        {
-            // First, the easy cases
-            if (cells.isEmpty())
-                return buildNoCells();
-
-            // Cells have been added in an unsorted way, so sort them first
-            Collections.sort(cells, Cell.comparator);
-
-            // We now need to
-            //  1) merge equal cells together
-            //  2) group the cells for a given complex column together, and include their potential complex deletion time.
-            //     And this without forgetting that some complex columns may have a complex deletion but not cells.
-
-            int addedColumns = countAddedColumns();
-            ColumnData[] data = new ColumnData[addedColumns];
-
-            int nextComplexWithDeletion = findNextComplexWithDeletion(0);
-            ColumnDefinition previousColumn = null;
-
-            int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-
-            int i = 0;
-            int size = 0;
-            while (i < cells.size())
-            {
-                Cell cell = cells.get(i++);
-                ColumnDefinition column = cell.column();
-                if (column.isSimple())
-                {
-                    // Either it's a cell for the same column than our previous cell and we merge them together, or it's a new column
-                    if (previousColumn != null && previousColumn.compareTo(column) == 0)
-                        data[size - 1] = Cells.reconcile((Cell)data[size - 1], cell, nowInSec);
-                    else
-                        data[size++] = cell;
-                }
-                else
-                {
-                    // First, collect the complex deletion time for the column we got the first complex column of. We'll
-                    // also find if there is columns that sorts before but had only a complex deletion and add them.
-                    DeletionTime complexDeletion = DeletionTime.LIVE;
-                    while (nextComplexWithDeletion >= 0)
-                    {
-                        int cmp = column.compareTo(columns.getComplex(nextComplexWithDeletion));
-                        if (cmp < 0)
-                        {
-                            // This is after the column we're gonna add cell for. We'll deal with it later
-                            break;
-                        }
-                        else if (cmp > 0)
-                        {
-                            // We have a column that only has a complex deletion and no column. Add its data first
-                            data[size++] = new ComplexColumnData(columns.getComplex(nextComplexWithDeletion), ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]);
-                            minDeletionTime = Integer.MIN_VALUE;
-                            nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                        }
-                        else // cmp == 0
-                        {
-                            // This is the column we'll about to add cell for. Record the deletion time and break to the cell addition
-                            complexDeletion = complexDeletions[nextComplexWithDeletion];
-                            minDeletionTime = Integer.MIN_VALUE;
-                            nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                            break;
-                        }
-                    }
-
-                    // Find how many cells the complex column has (cellCount) and the index of the next cell that doesn't belong to it (nextColumnIdx).
-                    int nextColumnIdx = i; // i is on cell following the current one
-                    int cellCount = 1; // We have at least the current cell
-                    Cell previousCell = cell;
-                    while (nextColumnIdx < cells.size())
-                    {
-                        Cell newCell = cells.get(nextColumnIdx);
-                        if (column.compareTo(newCell.column()) != 0)
-                            break;
-
-                        ++nextColumnIdx;
-                        if (column.cellPathComparator().compare(previousCell.path(), newCell.path()) != 0)
-                            ++cellCount;
-                        previousCell = newCell;
-                    }
-                    Cell[] columnCells = new Cell[cellCount];
-                    int complexSize = 0;
-                    columnCells[complexSize++] = cell;
-                    previousCell = cell;
-                    for (int j = i; j < nextColumnIdx; j++)
-                    {
-                        Cell newCell = cells.get(j);
-                        // Either it's a cell for the same path than our previous cell and we merge them together, or it's a new path
-                        if (column.cellPathComparator().compare(previousCell.path(), newCell.path()) == 0)
-                            columnCells[complexSize - 1] = Cells.reconcile(previousCell, newCell, nowInSec);
-                        else
-                            columnCells[complexSize++] = newCell;
-                        previousCell = newCell;
-                    }
-                    i = nextColumnIdx;
-
-                    data[size++] = new ComplexColumnData(column, columnCells, complexDeletion);
-                }
-                previousColumn = column;
-            }
-            // We may still have some complex columns with only a complex deletion
-            while (nextComplexWithDeletion >= 0)
-            {
-                data[size++] = new ComplexColumnData(columns.getComplex(nextComplexWithDeletion), ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]);
-                nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                minDeletionTime = Integer.MIN_VALUE;
-            }
-            assert size == addedColumns;
-
-            // Reconciliation made it harder to compute minDeletionTime for cells in the loop above, so just do it now if we need to.
-            if (minDeletionTime != Integer.MIN_VALUE)
-            {
-                for (ColumnData cd : data)
-                    minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
-            }
-
-            return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
-        }
-
-        private int findNextComplexWithDeletion(int from)
-        {
-            for (int i = from; i < complexDeletions.length; i++)
-            {
-                if (complexDeletions[i] != null)
-                    return i;
-            }
-            return -1;
-        }
-
-        // Should only be called once the cells have been sorted
-        private int countAddedColumns()
-        {
-            int columnCount = 0;
-            int nextComplexWithDeletion = findNextComplexWithDeletion(0);
-            ColumnDefinition previousColumn = null;
-            for (Cell cell : cells)
-            {
-                if (previousColumn != null && previousColumn.compareTo(cell.column()) == 0)
-                    continue;
-
-                ++columnCount;
-                previousColumn = cell.column();
-
-                // We know that simple columns sort before the complex ones, so don't bother with the column having complex deletion
-                // until we've reached the cells of complex columns.
-                if (!previousColumn.isComplex())
-                    continue;
-
-                while (nextComplexWithDeletion >= 0)
-                {
-                    // Check how the column we just counted compared to the next with complex deletion
-                    int cmp = previousColumn.compareTo(columns.getComplex(nextComplexWithDeletion));
-                    if (cmp < 0)
-                    {
-                        // it's before, we'll handle nextColumnWithComplexDeletion later
-                        break;
-                    }
-                    else if (cmp > 0)
-                    {
-                        // it's after. nextColumnWithComplexDeletion has no cell but we should count it
-                        ++columnCount;
-                        nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                    }
-                    else // cmp == 0
-                    {
-                        // it's the column we just counted. Ignore it and we know we're good with nextComplexWithDeletion for this loop
-                        nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                        break;
-                    }
-                }
-            }
-            // Anything remaining in complexDeletionColumns are complex columns with no cells but some complex deletion
-            while (nextComplexWithDeletion >= 0)
-            {
-                ++columnCount;
-                nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-            }
-            return columnCount;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
new file mode 100644
index 0000000..7e9ceb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Predicate;
+
+import com.google.common.base.Function;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+/**
+ * Immutable implementation of a Row object.
+ */
+public class BTreeBackedRow extends AbstractRow
+{
+    private static final ColumnData[] NO_DATA = new ColumnData[0];
+
+    private static final long EMPTY_SIZE = ObjectSizes.measure(emptyRow(Clustering.EMPTY));
+
+    private final Clustering clustering;
+    private final Columns columns;
+    private final LivenessInfo primaryKeyLivenessInfo;
+    private final DeletionTime deletion;
+
+    // The data for each columns present in this row in column sorted order.
+    private final Object[] btree;
+
+    // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
+    // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
+    // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
+    // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
+    // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
+    // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
+    // no expiring cells, this will be Integer.MAX_VALUE;
+    private final int minLocalDeletionTime;
+
+    private BTreeBackedRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int minLocalDeletionTime)
+    {
+        this.clustering = clustering;
+        this.columns = columns;
+        this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
+        this.deletion = deletion;
+        this.btree = btree;
+        this.minLocalDeletionTime = minLocalDeletionTime;
+    }
+
+    // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
+    public static BTreeBackedRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree)
+    {
+        int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+        if (minDeletionTime != Integer.MIN_VALUE)
+        {
+            for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+                minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
+        }
+
+        return new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+    }
+
+    public static BTreeBackedRow emptyRow(Clustering clustering)
+    {
+        return new BTreeBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE);
+    }
+
+    public static BTreeBackedRow singleCellRow(Clustering clustering, Cell cell)
+    {
+        if (cell.column().isSimple())
+            return new BTreeBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), minDeletionTime(cell));
+
+        ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
+        return new BTreeBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), minDeletionTime(cell));
+    }
+
+    public static BTreeBackedRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
+    {
+        assert !deletion.isLive();
+        return new BTreeBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE);
+    }
+
+    public static BTreeBackedRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
+    {
+        assert !primaryKeyLivenessInfo.isEmpty();
+        return new BTreeBackedRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+    }
+
+    private static int minDeletionTime(Cell cell)
+    {
+        return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime();
+    }
+
+    private static int minDeletionTime(LivenessInfo info)
+    {
+        return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE;
+    }
+
+    private static int minDeletionTime(DeletionTime dt)
+    {
+        return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
+    }
+
+    private static int minDeletionTime(ComplexColumnData cd)
+    {
+        int min = minDeletionTime(cd.complexDeletion());
+        for (Cell cell : cd)
+        {
+            min = Math.min(min, minDeletionTime(cell));
+            if (min == Integer.MIN_VALUE)
+                break;
+        }
+        return min;
+    }
+
+    private static int minDeletionTime(ColumnData cd)
+    {
+        return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd);
+    }
+
+    private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion)
+    {
+        int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion));
+        for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+        {
+            min = Math.min(min, minDeletionTime(cd));
+            if (min == Integer.MIN_VALUE)
+                break;
+        }
+        return min;
+    }
+
+    public Clustering clustering()
+    {
+        return clustering;
+    }
+
+    public Columns columns()
+    {
+        return columns;
+    }
+
+    public LivenessInfo primaryKeyLivenessInfo()
+    {
+        return primaryKeyLivenessInfo;
+    }
+
+    public DeletionTime deletion()
+    {
+        return deletion;
+    }
+
+    public Cell getCell(ColumnDefinition c)
+    {
+        assert !c.isComplex();
+        return (Cell) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+    }
+
+    public Cell getCell(ColumnDefinition c, CellPath path)
+    {
+        assert c.isComplex();
+        ComplexColumnData cd = getComplexColumnData(c);
+        if (cd == null)
+            return null;
+        return cd.getCell(path);
+    }
+
+    public ComplexColumnData getComplexColumnData(ColumnDefinition c)
+    {
+        assert c.isComplex();
+        return (ComplexColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+    }
+
+    public Iterator<ColumnData> iterator()
+    {
+        return searchIterator();
+    }
+
+    public Iterable<Cell> cells()
+    {
+        return CellIterator::new;
+    }
+
+    public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator()
+    {
+        return BTree.slice(btree, ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC);
+    }
+
+    public Row filter(ColumnFilter filter, CFMetaData metadata)
+    {
+        return filter(filter, DeletionTime.LIVE, false, metadata);
+    }
+
+    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata)
+    {
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
+
+        if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
+            return this;
+
+        boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
+
+        LivenessInfo newInfo = primaryKeyLivenessInfo;
+        DeletionTime newDeletion = deletion;
+        if (mayHaveShadowed)
+        {
+            if (activeDeletion.deletes(newInfo.timestamp()))
+                newInfo = LivenessInfo.EMPTY;
+            // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
+            // the row deletion is shadowed and we shouldn't return it.
+            newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
+        }
+
+        Columns columns = filter.fetchedColumns().columns(isStatic());
+        Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
+        return transformAndFilter(newInfo, newDeletion, (cd) -> {
+
+            ColumnDefinition column = cd.column();
+            if (!inclusionTester.test(column))
+                return null;
+
+            CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
+            if (column.isComplex())
+                return ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
+
+            Cell cell = (Cell)cd;
+            return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell))
+                   ? cell : null;
+        });
+    }
+
+    public boolean hasComplexDeletion()
+    {
+        // We start by the end cause we know complex columns sort before simple ones
+        for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC))
+        {
+            if (cd.column().isSimple())
+                return false;
+
+            if (!((ComplexColumnData)cd).complexDeletion().isLive())
+                return true;
+        }
+        return false;
+    }
+
+    public Row markCounterLocalToBeCleared()
+    {
+        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter()
+                                                                            ? cd.markCounterLocalToBeCleared()
+                                                                            : cd);
+    }
+
+    public boolean hasDeletion(int nowInSec)
+    {
+        return nowInSec >= minLocalDeletionTime;
+    }
+
+    /**
+     * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and
+     * all deletion timestamp by {@code newTimestamp - 1}.
+     *
+     * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
+     */
+    public Row updateAllTimestamp(long newTimestamp)
+    {
+        LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
+        DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
+
+        return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp));
+    }
+
+    public Row purge(DeletionPurger purger, int nowInSec)
+    {
+        if (!hasDeletion(nowInSec))
+            return this;
+
+        LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
+        DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
+
+        return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec));
+    }
+
+    private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, Function<ColumnData, ColumnData> function)
+    {
+        Object[] transformed = BTree.transformAndFilter(btree, function);
+
+        if (btree == transformed && info == this.primaryKeyLivenessInfo && deletion == this.deletion)
+            return this;
+
+        if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed))
+            return null;
+
+        int minDeletionTime = minDeletionTime(transformed, info, deletion);
+        return new BTreeBackedRow(clustering, columns, info, deletion, transformed, minDeletionTime);
+    }
+
+    public int dataSize()
+    {
+        int dataSize = clustering.dataSize()
+                     + primaryKeyLivenessInfo.dataSize()
+                     + deletion.dataSize();
+
+        for (ColumnData cd : this)
+            dataSize += cd.dataSize();
+        return dataSize;
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        long heapSize = EMPTY_SIZE
+                      + clustering.unsharedHeapSizeExcludingData()
+                      + BTree.sizeOfStructureOnHeap(btree);
+
+        for (ColumnData cd : this)
+            heapSize += cd.unsharedHeapSizeExcludingData();
+        return heapSize;
+    }
+
+    public static Row.Builder sortedBuilder(Columns columns)
+    {
+        return new Builder(columns, true);
+    }
+
+    public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
+    {
+        return new Builder(columns, false, nowInSec);
+    }
+
+    // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general
+    // assumption that Row objects are immutable. This method should go away post-#6506 in particular.
+    // This method is in particular not exposed by the Row API on purpose.
+    // This method also *assumes* that the cell we're setting already exists.
+    public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value)
+    {
+        ColumnData current = (ColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, column);
+        if (column.isSimple())
+            BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) current).withUpdatedValue(value));
+        else
+            ((ComplexColumnData) current).setValue(path, value);
+    }
+
+    private class CellIterator extends AbstractIterator<Cell>
+    {
+        private Iterator<ColumnData> columnData = iterator();
+        private Iterator<Cell> complexCells;
+
+        protected Cell computeNext()
+        {
+            while (true)
+            {
+                if (complexCells != null)
+                {
+                    if (complexCells.hasNext())
+                        return complexCells.next();
+
+                    complexCells = null;
+                }
+
+                if (!columnData.hasNext())
+                    return endOfData();
+
+                ColumnData cd = columnData.next();
+                if (cd.column().isComplex())
+                    complexCells = ((ComplexColumnData)cd).iterator();
+                else
+                    return (Cell)cd;
+            }
+        }
+    }
+
+    public static class Builder implements Row.Builder
+    {
+        // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time
+        private static class ComplexColumnDeletion extends BufferCell
+        {
+            public ComplexColumnDeletion(ColumnDefinition column, DeletionTime deletionTime)
+            {
+                super(column, deletionTime.markedForDeleteAt(), 0, deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.BOTTOM);
+            }
+        }
+
+        // converts a run of Cell with equal column into a ColumnData
+        private static class CellResolver implements BTree.Builder.Resolver
+        {
+            final int nowInSec;
+            private CellResolver(int nowInSec)
+            {
+                this.nowInSec = nowInSec;
+            }
+
+            public ColumnData resolve(Object[] cells, int lb, int ub)
+            {
+                Cell cell = (Cell) cells[lb];
+                ColumnDefinition column = cell.column;
+                if (cell.column.isSimple())
+                {
+                    assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE;
+                    while (++lb < ub)
+                        cell = Cells.reconcile(cell, (Cell) cells[lb], nowInSec);
+                    return cell;
+                }
+
+                // TODO: relax this in the case our outer provider is sorted (want to delay until remaining changes are
+                // bedded in, as less important; galloping makes it pretty cheap anyway)
+                Arrays.sort(cells, lb, ub, (Comparator<Object>) column.cellComparator());
+                cell = (Cell) cells[lb];
+                DeletionTime deletion = DeletionTime.LIVE;
+                if (cell instanceof ComplexColumnDeletion)
+                {
+                    // TODO: do we need to be robust to multiple of these being provided?
+                    deletion = new DeletionTime(cell.timestamp(), cell.localDeletionTime());
+                    lb++;
+                }
+
+                List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub);
+                Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
+                return new ComplexColumnData(column, btree, deletion);
+            }
+
+        };
+        protected final Columns columns;
+
+        protected Clustering clustering;
+        protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+        protected DeletionTime deletion = DeletionTime.LIVE;
+
+        private final boolean isSorted;
+        private final BTree.Builder<Cell> cells;
+        private final CellResolver resolver;
+        private boolean hasComplex = false;
+
+        // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion.
+
+        protected Builder(Columns columns, boolean isSorted)
+        {
+            this(columns, isSorted, Integer.MIN_VALUE);
+        }
+
+        protected Builder(Columns columns, boolean isSorted, int nowInSecs)
+        {
+            this.columns = columns;
+            this.cells = BTree.builder(ColumnData.comparator);
+            resolver = new CellResolver(nowInSecs);
+            this.isSorted = isSorted;
+            this.cells.auto(false);
+        }
+
+        public boolean isSorted()
+        {
+            return isSorted;
+        }
+
+        public void newRow(Clustering clustering)
+        {
+            assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before
+            this.clustering = clustering;
+        }
+
+        public Clustering clustering()
+        {
+            return clustering;
+        }
+
+        protected void reset()
+        {
+            this.clustering = null;
+            this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+            this.deletion = DeletionTime.LIVE;
+            this.cells.reuse();
+        }
+
+        public void addPrimaryKeyLivenessInfo(LivenessInfo info)
+        {
+            this.primaryKeyLivenessInfo = info;
+        }
+
+        public void addRowDeletion(DeletionTime deletion)
+        {
+            this.deletion = deletion;
+        }
+
+        public void addCell(Cell cell)
+        {
+            assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
+            cells.add(cell);
+            hasComplex |= cell.column.isComplex();
+        }
+
+        public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
+        {
+            cells.add(new ComplexColumnDeletion(column, complexDeletion));
+            hasComplex = true;
+        }
+
+        public Row build()
+        {
+            if (!isSorted)
+                cells.sort();
+            // we can avoid resolving if we're sorted and have no complex values
+            // (because we'll only have unique simple cells, which are already in their final condition)
+            if (!isSorted | hasComplex)
+                cells.resolve(resolver);
+            Object[] btree = cells.build();
+            int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion);
+            Row row = new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+            reset();
+            return row;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index e952748..81c42d4 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -36,8 +36,6 @@ public class BufferCell extends AbstractCell
 {
     private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(ColumnDefinition.regularDef("", "", "", ByteType.instance), 0L, 0, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null));
 
-    private final ColumnDefinition column;
-
     private final long timestamp;
     private final int ttl;
     private final int localDeletionTime;
@@ -47,8 +45,8 @@ public class BufferCell extends AbstractCell
 
     public BufferCell(ColumnDefinition column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path)
     {
+        super(column);
         assert column.isComplex() == (path != null);
-        this.column = column;
         this.timestamp = timestamp;
         this.ttl = ttl;
         this.localDeletionTime = localDeletionTime;
@@ -90,11 +88,6 @@ public class BufferCell extends AbstractCell
         return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
     }
 
-    public ColumnDefinition column()
-    {
-        return column;
-    }
-
     public boolean isCounterCell()
     {
         return !isTombstone() && column.cellValueType().isCounter();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index ccb9708..1820de2 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
  *   2) expiring cells: on top of regular cells, those have a ttl and a local deletion time (when they are expired).
  *   3) tombstone cells: those won't have value, but they have a local deletion time (when the tombstone was created).
  */
-public interface Cell extends ColumnData
+public abstract class Cell extends ColumnData
 {
     public static final int NO_TTL = 0;
     public static final int NO_DELETION_TIME = Integer.MAX_VALUE;
@@ -51,35 +51,40 @@ public interface Cell extends ColumnData
         return pathComparator == null ? 0 : pathComparator.compare(c1.path(), c2.path());
     };
 
-    public final Serializer serializer = new BufferCell.Serializer();
+    public static final Serializer serializer = new BufferCell.Serializer();
+
+    protected Cell(ColumnDefinition column)
+    {
+        super(column);
+    }
 
     /**
      * Whether the cell is a counter cell or not.
      *
      * @return whether the cell is a counter cell or not.
      */
-    public boolean isCounterCell();
+    public abstract boolean isCounterCell();
 
     /**
      * The cell value.
      *
      * @return the cell value.
      */
-    public ByteBuffer value();
+    public abstract ByteBuffer value();
 
     /**
      * The cell timestamp.
      * <p>
      * @return the cell timestamp.
      */
-    public long timestamp();
+    public abstract long timestamp();
 
     /**
      * The cell ttl.
      *
      * @return the cell ttl, or {@code NO_TTL} if the cell isn't an expiring one.
      */
-    public int ttl();
+    public abstract int ttl();
 
     /**
      * The cell local deletion time.
@@ -87,14 +92,14 @@ public interface Cell extends ColumnData
      * @return the cell local deletion time, or {@code NO_DELETION_TIME} if the cell is neither
      * a tombstone nor an expiring one.
      */
-    public int localDeletionTime();
+    public abstract int localDeletionTime();
 
     /**
      * Whether the cell is a tombstone or not.
      *
      * @return whether the cell is a tombstone or not.
      */
-    public boolean isTombstone();
+    public abstract boolean isTombstone();
 
     /**
      * Whether the cell is an expiring one or not.
@@ -105,7 +110,7 @@ public interface Cell extends ColumnData
      *
      * @return whether the cell is an expiring one or not.
      */
-    public boolean isExpiring();
+    public abstract boolean isExpiring();
 
     /**
      * Whether the cell is live or not given the current time.
@@ -114,7 +119,7 @@ public interface Cell extends ColumnData
      * decide if an expiring cell is expired or live.
      * @return whether the cell is live or not at {@code nowInSec}.
      */
-    public boolean isLive(int nowInSec);
+    public abstract boolean isLive(int nowInSec);
 
     /**
      * For cells belonging to complex types (non-frozen collection and UDT), the
@@ -122,19 +127,19 @@ public interface Cell extends ColumnData
      *
      * @return the cell path for cells of complex column, and {@code null} for other cells.
      */
-    public CellPath path();
+    public abstract CellPath path();
 
-    public Cell withUpdatedValue(ByteBuffer newValue);
+    public abstract Cell withUpdatedValue(ByteBuffer newValue);
 
-    public Cell copy(AbstractAllocator allocator);
+    public abstract Cell copy(AbstractAllocator allocator);
 
     @Override
     // Overrides super type to provide a more precise return type.
-    public Cell markCounterLocalToBeCleared();
+    public abstract Cell markCounterLocalToBeCleared();
 
     @Override
     // Overrides super type to provide a more precise return type.
-    public Cell purge(DeletionPurger purger, int nowInSec);
+    public abstract Cell purge(DeletionPurger purger, int nowInSec);
 
     public interface Serializer
     {


Mime
View raw message