cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [21/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:45 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
new file mode 100644
index 0000000..5bfd1a3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Objects;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+public abstract class AbstractUnfilteredRowIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator
+{
+    protected final CFMetaData metadata;
+    protected final DecoratedKey partitionKey;
+    protected final DeletionTime partitionLevelDeletion;
+    protected final PartitionColumns columns;
+    protected final Row staticRow;
+    protected final boolean isReverseOrder;
+    protected final RowStats stats;
+
+    protected AbstractUnfilteredRowIterator(CFMetaData metadata,
+                                            DecoratedKey partitionKey,
+                                            DeletionTime partitionLevelDeletion,
+                                            PartitionColumns columns,
+                                            Row staticRow,
+                                            boolean isReverseOrder,
+                                            RowStats stats)
+    {
+        this.metadata = metadata;
+        this.partitionKey = partitionKey;
+        this.partitionLevelDeletion = partitionLevelDeletion;
+        this.columns = columns;
+        this.staticRow = staticRow;
+        this.isReverseOrder = isReverseOrder;
+        this.stats = stats;
+    }
+
+    public CFMetaData metadata()
+    {
+        return metadata;
+    }
+
+    public PartitionColumns columns()
+    {
+        return columns;
+    }
+
+    public boolean isReverseOrder()
+    {
+        return isReverseOrder;
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return partitionKey;
+    }
+
+    public DeletionTime partitionLevelDeletion()
+    {
+        return partitionLevelDeletion;
+    }
+
+    public Row staticRow()
+    {
+        return staticRow;
+    }
+
+    public RowStats stats()
+    {
+        return stats;
+    }
+
+    public void close()
+    {
+    }
+
+    public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator b)
+    {
+        return Objects.equals(a.columns(), b.columns())
+            && Objects.equals(a.metadata(), b.metadata())
+            && Objects.equals(a.isReverseOrder(), b.isReverseOrder())
+            && Objects.equals(a.partitionKey(), b.partitionKey())
+            && Objects.equals(a.partitionLevelDeletion(), b.partitionLevelDeletion())
+            && Objects.equals(a.staticRow(), b.staticRow())
+            && Objects.equals(a.stats(), b.stats())
+            && Objects.equals(a.metadata(), b.metadata())
+            && Iterators.elementsEqual(a, b);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
new file mode 100644
index 0000000..80bf901
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Aliasable;
+import org.apache.cassandra.db.LivenessInfo;
+
+/**
+ * A cell holds a single "simple" value for a given column, as well as "liveness"
+ * informations regarding that value.
+ * <p>
+ * The is 2 kind of columns: simple ones and complex ones.
+ * Simple columns have only a single associated cell, while complex ones,
+ * the one corresponding to non-frozen collections and UDTs, are comprised
+ * of multiple cells. For complex columns, the different cells are distinguished
+ * by their cell path.
+ * <p>
+ * We can also distinguish different kind of cells based on the property of their
+ * {@link #livenessInfo}:
+ *  1) "Normal" cells: their liveness info has no ttl and no deletion time.
+ *  2) Expiring cells: their liveness info has both a ttl and a deletion time (the latter
+ *    deciding when the cell is actually expired).
+ *  3) Tombstones/deleted cells: their liveness info has a deletion time but no ttl. Those
+ *     cells don't really have a value but their {@link #value} method return an empty
+ *     buffer by convention.
+ */
+public interface Cell extends Aliasable<Cell>
+{
+    /**
+     * The column this cell belongs to.
+     *
+     * @return the column this cell belongs to.
+     */
+    public ColumnDefinition column();
+
+    /**
+     * Whether the cell is a counter cell or not.
+     *
+     * @return whether the cell is a counter cell or not.
+     */
+    public boolean isCounterCell();
+
+    /**
+     * The cell value.
+     *
+     * @return the cell value.
+     */
+    public ByteBuffer value();
+
+    /**
+     * The liveness info of the cell, that is its timestamp and whether it is
+     * expiring, deleted or none of the above.
+     *
+     * @return the cell {@link LivenessInfo}.
+     */
+    public LivenessInfo livenessInfo();
+
+    /**
+     * Whether the cell is a tombstone or not.
+     *
+     * @return whether the cell is a tombstone or not.
+     */
+    public boolean isTombstone();
+
+    /**
+     * Whether the cell is an expiring one or not.
+     * <p>
+     * Note that this only correspond to whether the cell liveness info
+     * have a TTL or not, but doesn't tells whether the cell is already expired
+     * or not. You should use {@link #isLive} for that latter information.
+     *
+     * @return whether the cell is an expiring one or not.
+     */
+    public boolean isExpiring();
+
+    /**
+     * Whether the cell is live or not given the current time.
+     *
+     * @param nowInSec the current time in seconds. This is used to
+     * decide if an expiring cell is expired or live.
+     * @return whether the cell is live or not at {@code nowInSec}.
+     */
+    public boolean isLive(int nowInSec);
+
+    /**
+     * For cells belonging to complex types (non-frozen collection and UDT), the
+     * path to the cell.
+     *
+     * @return the cell path for cells of complex column, and {@code null} for other cells.
+     */
+    public CellPath path();
+
+    /**
+     * Write the cell to the provided writer.
+     *
+     * @param writer the row writer to write the cell to.
+     */
+    public void writeTo(Row.Writer writer);
+
+    /**
+     * Adds the cell to the provided digest.
+     *
+     * @param digest the {@code MessageDigest} to add the cell to.
+     */
+    public void digest(MessageDigest digest);
+
+    /**
+     * Validate the cell value.
+     *
+     * @throws MarshalException if the cell value is not a valid value for
+     * the column type this is a cell of.
+     */
+    public void validate();
+
+    /**
+     * The size of the data hold by this cell.
+     *
+     * This is mainly used to verify if batches goes over a given size.
+     *
+     * @return the size used by the data of this cell.
+     */
+    public int dataSize();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/CellData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/CellData.java b/src/java/org/apache/cassandra/db/rows/CellData.java
new file mode 100644
index 0000000..29eac01
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/CellData.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Contains (non-counter) cell data for one or more rows.
+ */
+class CellData
+{
+    private boolean isCounter;
+
+    private ByteBuffer[] values;
+    private final LivenessInfoArray livenessInfos;
+
+    CellData(int initialCellCapacity, boolean isCounter)
+    {
+        this.isCounter = isCounter;
+        this.values = new ByteBuffer[initialCellCapacity];
+        this.livenessInfos = new LivenessInfoArray(initialCellCapacity);
+    }
+
+    public void setCell(int idx, ByteBuffer value, LivenessInfo info)
+    {
+        ensureCapacity(idx);
+        values[idx] = value;
+        livenessInfos.set(idx, info);
+    }
+
+    public boolean hasCell(int idx)
+    {
+        return idx < values.length && values[idx] != null;
+    }
+
+    public ByteBuffer value(int idx)
+    {
+        return values[idx];
+    }
+
+    public void setValue(int idx, ByteBuffer value)
+    {
+        values[idx] = value;
+    }
+
+    private void ensureCapacity(int idxToSet)
+    {
+        int originalCapacity = values.length;
+        if (idxToSet < originalCapacity)
+            return;
+
+        int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, idxToSet);
+
+        values = Arrays.copyOf(values, newCapacity);
+        livenessInfos.resize(newCapacity);
+    }
+
+    // Swap cell i and j
+    public void swapCell(int i, int j)
+    {
+        ensureCapacity(Math.max(i, j));
+
+        ByteBuffer value = values[j];
+        values[j] = values[i];
+        values[i] = value;
+
+        livenessInfos.swap(i, j);
+    }
+
+    // Merge cell i into j
+    public void mergeCell(int i, int j, int nowInSec)
+    {
+        if (isCounter)
+            mergeCounterCell(this, i, this, j, this, j, nowInSec);
+        else
+            mergeRegularCell(this, i, this, j, this, j, nowInSec);
+    }
+
+    private static boolean handleNoCellCase(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged)
+    {
+        if (!d1.hasCell(i1))
+        {
+            if (d2.hasCell(i2))
+                d2.moveCell(i2, merged, iMerged);
+            return true;
+        }
+        if (!d2.hasCell(i2))
+        {
+            d1.moveCell(i1, merged, iMerged);
+            return true;
+        }
+        return false;
+    }
+
+    public static void mergeRegularCell(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged, int nowInSec)
+    {
+        if (handleNoCellCase(d1, i1, d2, i2, merged, iMerged))
+            return;
+
+        Conflicts.Resolution res = Conflicts.resolveRegular(d1.livenessInfos.timestamp(i1),
+                                                            d1.livenessInfos.isLive(i1, nowInSec),
+                                                            d1.livenessInfos.localDeletionTime(i1),
+                                                            d1.values[i1],
+                                                            d2.livenessInfos.timestamp(i2),
+                                                            d2.livenessInfos.isLive(i2, nowInSec),
+                                                            d2.livenessInfos.localDeletionTime(i2),
+                                                            d2.values[i2]);
+
+        assert res != Conflicts.Resolution.MERGE;
+        if (res == Conflicts.Resolution.LEFT_WINS)
+            d1.moveCell(i1, merged, iMerged);
+        else
+            d2.moveCell(i2, merged, iMerged);
+    }
+
+    public static void mergeCounterCell(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged, int nowInSec)
+    {
+        if (handleNoCellCase(d1, i1, d2, i2, merged, iMerged))
+            return;
+
+        Conflicts.Resolution res = Conflicts.resolveCounter(d1.livenessInfos.timestamp(i1),
+                                                            d1.livenessInfos.isLive(i1, nowInSec),
+                                                            d1.values[i1],
+                                                            d2.livenessInfos.timestamp(i2),
+                                                            d2.livenessInfos.isLive(i2, nowInSec),
+                                                            d2.values[i2]);
+
+        switch (res)
+        {
+            case LEFT_WINS:
+                d1.moveCell(i1, merged, iMerged);
+                break;
+            case RIGHT_WINS:
+                d2.moveCell(i2, merged, iMerged);
+                break;
+            default:
+                merged.values[iMerged] = Conflicts.mergeCounterValues(d1.values[i1], d2.values[i2]);
+                if (d1.livenessInfos.timestamp(i1) > d2.livenessInfos.timestamp(i2))
+                    merged.livenessInfos.set(iMerged, d1.livenessInfos.timestamp(i1), d1.livenessInfos.ttl(i1), d1.livenessInfos.localDeletionTime(i1));
+                else
+                    merged.livenessInfos.set(iMerged, d2.livenessInfos.timestamp(i2), d2.livenessInfos.ttl(i2), d2.livenessInfos.localDeletionTime(i2));
+                break;
+        }
+    }
+
+    // Move cell i into j
+    public void moveCell(int i, int j)
+    {
+        moveCell(i, this, j);
+    }
+
+    public void moveCell(int i, CellData target, int j)
+    {
+        if (!hasCell(i) || (target == this && i == j))
+            return;
+
+        target.ensureCapacity(j);
+
+        target.values[j] = values[i];
+        target.livenessInfos.set(j, livenessInfos.timestamp(i),
+                                    livenessInfos.ttl(i),
+                                    livenessInfos.localDeletionTime(i));
+    }
+
+    public int dataSize()
+    {
+        int size = livenessInfos.dataSize();
+        for (int i = 0; i < values.length; i++)
+            if (values[i] != null)
+                size += values[i].remaining();
+        return size;
+    }
+
+    public void clear()
+    {
+        Arrays.fill(values, null);
+        livenessInfos.clear();
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return ObjectSizes.sizeOnHeapExcludingData(values)
+             + livenessInfos.unsharedHeapSize();
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("CellData(size=").append(values.length);
+        if (isCounter)
+            sb.append(", counter");
+        sb.append("){");
+        LivenessInfoArray.Cursor cursor = LivenessInfoArray.newCursor();
+        for (int i = 0; i < values.length; i++)
+        {
+            if (values[i] == null)
+            {
+                sb.append("[null]");
+                continue;
+            }
+            sb.append("[len(v)=").append(values[i].remaining());
+            sb.append(", info=").append(cursor.setTo(livenessInfos, i));
+            sb.append("]");
+        }
+        return sb.append("}").toString();
+    }
+
+    static class ReusableCell extends AbstractCell
+    {
+        private final LivenessInfoArray.Cursor cursor = LivenessInfoArray.newCursor();
+
+        private CellData data;
+        private ColumnDefinition column;
+        protected int idx;
+
+        ReusableCell setTo(CellData data, ColumnDefinition column, int idx)
+        {
+            if (!data.hasCell(idx))
+                return null;
+
+            this.data = data;
+            this.column = column;
+            this.idx = idx;
+
+            cursor.setTo(data.livenessInfos, idx);
+            return this;
+        }
+
+        public ColumnDefinition column()
+        {
+            return column;
+        }
+
+        public boolean isCounterCell()
+        {
+            return data.isCounter && !cursor.hasLocalDeletionTime();
+        }
+
+        public ByteBuffer value()
+        {
+            return data.value(idx);
+        }
+
+        public LivenessInfo livenessInfo()
+        {
+            return cursor;
+        }
+
+        public CellPath path()
+        {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/CellPath.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/CellPath.java b/src/java/org/apache/cassandra/db/rows/CellPath.java
new file mode 100644
index 0000000..8233ac2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/CellPath.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A path for a cell belonging to a complex column type (non-frozen collection or UDT).
+ */
+public abstract class CellPath
+{
+    public static final CellPath BOTTOM = new EmptyCellPath();
+    public static final CellPath TOP = new EmptyCellPath();
+
+    public abstract int size();
+    public abstract ByteBuffer get(int i);
+
+    // The only complex we currently have are collections that have only one value.
+    public static CellPath create(ByteBuffer value)
+    {
+        assert value != null;
+        return new SimpleCellPath(new ByteBuffer[]{ value });
+    }
+
+    public int dataSize()
+    {
+        int size = 0;
+        for (int i = 0; i < size(); i++)
+            size += get(i).remaining();
+        return size;
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        for (int i = 0; i < size(); i++)
+            digest.update(get(i).duplicate());
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        int result = 31;
+        for (int i = 0; i < size(); i++)
+            result += 31 * Objects.hash(get(i));
+        return result;
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if(!(o instanceof CellPath))
+            return false;
+
+        CellPath that = (CellPath)o;
+        if (this.size() != that.size())
+            return false;
+
+        for (int i = 0; i < size(); i++)
+            if (!Objects.equals(this.get(i), that.get(i)))
+                return false;
+
+        return true;
+    }
+
+    public interface Serializer
+    {
+        public void serialize(CellPath path, DataOutputPlus out) throws IOException;
+        public CellPath deserialize(DataInput in) throws IOException;
+        public long serializedSize(CellPath path, TypeSizes sizes);
+        public void skip(DataInput in) throws IOException;
+    }
+
+    static class SimpleCellPath extends CellPath
+    {
+        protected final ByteBuffer[] values;
+
+        public SimpleCellPath(ByteBuffer[] values)
+        {
+            this.values = values;
+        }
+
+        public int size()
+        {
+            return values.length;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            return values[i];
+        }
+    }
+
+    private static class EmptyCellPath extends CellPath
+    {
+        public int size()
+        {
+            return 0;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Cells.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java
new file mode 100644
index 0000000..1e329e5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Static methods to work on cells.
+ */
+public abstract class Cells
+{
+    private Cells() {}
+
+    /**
+     * Writes a tombstone cell to the provided writer.
+     *
+     * @param writer the {@code Row.Writer} to write the tombstone to.
+     * @param column the column for the tombstone.
+     * @param timestamp the timestamp for the tombstone.
+     * @param localDeletionTime the local deletion time (in seconds) for the tombstone.
+     */
+    public static void writeTombstone(Row.Writer writer, ColumnDefinition column, long timestamp, int localDeletionTime)
+    {
+        writer.writeCell(column, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, SimpleLivenessInfo.forDeletion(timestamp, localDeletionTime), null);
+    }
+
+    /**
+     * Computes the difference between a cell and the result of merging this
+     * cell to other cells.
+     * <p>
+     * This method is used when cells from multiple sources are merged and we want to
+     * find for a given source if it was up to date for that cell, and if not, what
+     * should be sent to the source to repair it.
+     *
+     * @param merged the cell that is the result of merging multiple source.
+     * @param cell the cell from one of the source that has been merged to yied
+     * {@code merged}.
+     * @return {@code null} if the source having {@code cell} is up-to-date for that
+     * cell, or a cell that applied to the source will "repair" said source otherwise.
+     */
+    public static Cell diff(Cell merged, Cell cell)
+    {
+        // Note that it's enough to check if merged is a counterCell. If it isn't and
+        // cell is one, it means that merged is a tombstone with a greater timestamp
+        // than cell, because that's the only case where reconciling a counter with
+        // a tombstone don't yield a counter. If that's the case, the normal path will
+        // return what it should.
+        if (merged.isCounterCell())
+        {
+            if (merged.livenessInfo().supersedes(cell.livenessInfo()))
+                return merged;
+
+            // Reconciliation never returns something with a timestamp strictly lower than its operand. This
+            // means we're in the case where merged.timestamp() == cell.timestamp(). As 1) tombstones
+            // always win over counters (CASSANDRA-7346) and 2) merged is a counter, it follows that cell
+            // can't be a tombstone or merged would be one too.
+            assert !cell.isTombstone();
+
+            CounterContext.Relationship rel = CounterContext.instance().diff(merged.value(), cell.value());
+            return (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT) ? merged : null;
+        }
+        return merged.livenessInfo().supersedes(cell.livenessInfo()) ? merged : null;
+    }
+
+    /**
+     * Reconciles/merges two cells, one being an update to an existing cell,
+     * yielding index updates if appropriate.
+     * <p>
+     * Note that this method assumes that the provided cells can meaningfully
+     * be reconciled together, that is that those cells are for the same row and same
+     * column (and same cell path if the column is complex).
+     * <p>
+     * Also note that which cell is provided as {@code existing} and which is
+     * provided as {@code update} matters for index updates.
+     *
+     * @param clustering the clustering for the row the cells to merge originate from.
+     * This is only used for index updates, so this can be {@code null} if
+     * {@code indexUpdater == SecondaryIndexManager.nullUpdater}.
+     * @param existing the pre-existing cell, the one that is updated. This can be
+     * {@code null} if this reconciliation correspond to an insertion.
+     * @param update the newly added cell, the update. This can be {@code null} out
+     * of convenience, in which case this function simply copy {@code existing} to
+     * {@code writer}.
+     * @param deletion the deletion time that applies to the cells being considered.
+     * This deletion time may delete both {@code existing} or {@code update}.
+     * @param writer the row writer to which the result of the reconciliation is written.
+     * @param nowInSec the current time in seconds (which plays a role during reconciliation
+     * because deleted cells always have precedence on timestamp equality and deciding if a
+     * cell is a live or not depends on the current time due to expiring cells).
+     * @param indexUpdater an index updater to which the result of the reconciliation is
+     * signaled (if relevant, that is if the update is not simply ignored by the reconciliation).
+     * This cannot be {@code null} but {@code SecondaryIndexManager.nullUpdater} can be passed.
+     *
+     * @return the timestamp delta between existing and update, or {@code Long.MAX_VALUE} if one
+     * of them is {@code null} or deleted by {@code deletion}).
+     */
+    public static long reconcile(Clustering clustering,
+                                 Cell existing,
+                                 Cell update,
+                                 DeletionTime deletion,
+                                 Row.Writer writer,
+                                 int nowInSec,
+                                 SecondaryIndexManager.Updater indexUpdater)
+    {
+        existing = existing == null || deletion.deletes(existing.livenessInfo()) ? null : existing;
+        update = update == null || deletion.deletes(update.livenessInfo()) ? null : update;
+        if (existing == null || update == null)
+        {
+            if (update != null)
+            {
+                // It's inefficient that we call maybeIndex (which is for primary key indexes) on every cell, but
+                // we'll need to fix that damn 2ndary index API to avoid that.
+                updatePKIndexes(clustering, update, nowInSec, indexUpdater);
+                indexUpdater.insert(clustering, update);
+                update.writeTo(writer);
+            }
+            else if (existing != null)
+            {
+                existing.writeTo(writer);
+            }
+            return Long.MAX_VALUE;
+        }
+
+        Cell reconciled = reconcile(existing, update, nowInSec);
+        reconciled.writeTo(writer);
+
+        // Note that this test rely on reconcile returning either 'existing' or 'update'. That's not true for counters but we don't index them
+        if (reconciled == update)
+        {
+            updatePKIndexes(clustering, update, nowInSec, indexUpdater);
+            indexUpdater.update(clustering, existing, reconciled);
+        }
+        return Math.abs(existing.livenessInfo().timestamp() - update.livenessInfo().timestamp());
+    }
+
+    private static void updatePKIndexes(Clustering clustering, Cell cell, int nowInSec, SecondaryIndexManager.Updater indexUpdater)
+    {
+        if (indexUpdater != SecondaryIndexManager.nullUpdater && cell.isLive(nowInSec))
+            indexUpdater.maybeIndex(clustering, cell.livenessInfo().timestamp(), cell.livenessInfo().ttl(), DeletionTime.LIVE);
+    }
+
+    /**
+     * Reconciles/merge two cells.
+     * <p>
+     * Note that this method assumes that the provided cells can meaningfully
+     * be reconciled together, that is that cell are for the same row and same
+     * column (and same cell path if the column is complex).
+     * <p>
+     * This method is commutative over it's cells arguments: {@code reconcile(a, b, n) == reconcile(b, a, n)}.
+     *
+     * @param c1 the first cell participating in the reconciliation.
+     * @param c2 the second cell participating in the reconciliation.
+     * @param nowInSec the current time in seconds (which plays a role during reconciliation
+     * because deleted cells always have precedence on timestamp equality and deciding if a
+     * cell is a live or not depends on the current time due to expiring cells).
+     *
+     * @return a cell corresponding to the reconciliation of {@code c1} and {@code c2}.
+     * For non-counter cells, this will always be either {@code c1} or {@code c2}, but for
+     * counter cells this can be a newly allocated cell.
+     */
+    public static Cell reconcile(Cell c1, Cell c2, int nowInSec)
+    {
+        if (c1 == null)
+            return c2 == null ? null : c2;
+        if (c2 == null)
+            return c1;
+
+        if (c1.isCounterCell() || c2.isCounterCell())
+        {
+            Conflicts.Resolution res = Conflicts.resolveCounter(c1.livenessInfo().timestamp(),
+                                                                c1.isLive(nowInSec),
+                                                                c1.value(),
+                                                                c2.livenessInfo().timestamp(),
+                                                                c2.isLive(nowInSec),
+                                                                c2.value());
+
+            switch (res)
+            {
+                case LEFT_WINS: return c1;
+                case RIGHT_WINS: return c2;
+                default:
+                    ByteBuffer merged = Conflicts.mergeCounterValues(c1.value(), c2.value());
+                    LivenessInfo mergedInfo = c1.livenessInfo().mergeWith(c2.livenessInfo());
+
+                    // We save allocating a new cell object if it turns out that one cell was
+                    // a complete superset of the other
+                    if (merged == c1.value() && mergedInfo == c1.livenessInfo())
+                        return c1;
+                    else if (merged == c2.value() && mergedInfo == c2.livenessInfo())
+                        return c2;
+                    else // merge clocks and timestamps.
+                        return create(c1.column(), true, merged, mergedInfo, null);
+            }
+        }
+
+        Conflicts.Resolution res = Conflicts.resolveRegular(c1.livenessInfo().timestamp(),
+                                                            c1.isLive(nowInSec),
+                                                            c1.livenessInfo().localDeletionTime(),
+                                                            c1.value(),
+                                                            c2.livenessInfo().timestamp(),
+                                                            c2.isLive(nowInSec),
+                                                            c2.livenessInfo().localDeletionTime(),
+                                                            c2.value());
+        assert res != Conflicts.Resolution.MERGE;
+        return res == Conflicts.Resolution.LEFT_WINS ? c1 : c2;
+    }
+
+    /**
+     * Computes the reconciliation of a complex column given its pre-existing
+     * cells and the ones it is updated with, and generating index update if
+     * appropriate.
+     * <p>
+     * Note that this method assumes that the provided cells can meaningfully
+     * be reconciled together, that is that the cells are for the same row and same
+     * complex column.
+     * <p>
+     * Also note that which cells is provided as {@code existing} and which are
+     * provided as {@code update} matters for index updates.
+     *
+     * @param clustering the clustering for the row the cells to merge originate from.
+     * This is only used for index updates, so this can be {@code null} if
+     * {@code indexUpdater == SecondaryIndexManager.nullUpdater}.
+     * @param column the complex column the cells are for.
+     * @param existing the pre-existing cells, the ones that are updated. This can be
+     * {@code null} if this reconciliation correspond to an insertion.
+     * @param update the newly added cells, the update. This can be {@code null} out
+     * of convenience, in which case this function simply copy the cells from
+     * {@code existing} to {@code writer}.
+     * @param deletion the deletion time that applies to the cells being considered.
+     * This deletion time may delete cells in both {@code existing} and {@code update}.
+     * @param writer the row writer to which the result of the reconciliation is written.
+     * @param nowInSec the current time in seconds (which plays a role during reconciliation
+     * because deleted cells always have precedence on timestamp equality and deciding if a
+     * cell is a live or not depends on the current time due to expiring cells).
+     * @param indexUpdater an index updater to which the result of the reconciliation is
+     * signaled (if relevant, that is if the updates are not simply ignored by the reconciliation).
+     * This cannot be {@code null} but {@code SecondaryIndexManager.nullUpdater} can be passed.
+     *
+     * @return the smallest timestamp delta between corresponding cells from existing and update. A
+     * timestamp delta being computed as the difference between a cell from {@code update} and the
+     * cell in {@code existing} having the same cell path (if such cell exists). If the intersection
+     * of cells from {@code existing} and {@code update} having the same cell path is empty, this
+     * returns {@code Long.MAX_VALUE}.
+     */
+    public static long reconcileComplex(Clustering clustering,
+                                        ColumnDefinition column,
+                                        Iterator<Cell> existing,
+                                        Iterator<Cell> update,
+                                        DeletionTime deletion,
+                                        Row.Writer writer,
+                                        int nowInSec,
+                                        SecondaryIndexManager.Updater indexUpdater)
+    {
+        Comparator<CellPath> comparator = column.cellPathComparator();
+        Cell nextExisting = getNext(existing);
+        Cell nextUpdate = getNext(update);
+        long timeDelta = Long.MAX_VALUE;
+        while (nextExisting != null || nextUpdate != null)
+        {
+            int cmp = nextExisting == null ? 1
+                     : (nextUpdate == null ? -1
+                     : comparator.compare(nextExisting.path(), nextUpdate.path()));
+            if (cmp < 0)
+            {
+                reconcile(clustering, nextExisting, null, deletion, writer, nowInSec, indexUpdater);
+                nextExisting = getNext(existing);
+            }
+            else if (cmp > 0)
+            {
+                reconcile(clustering, null, nextUpdate, deletion, writer, nowInSec, indexUpdater);
+                nextUpdate = getNext(update);
+            }
+            else
+            {
+                timeDelta = Math.min(timeDelta, reconcile(clustering, nextExisting, nextUpdate, deletion, writer, nowInSec, indexUpdater));
+                nextExisting = getNext(existing);
+                nextUpdate = getNext(update);
+            }
+        }
+        return timeDelta;
+    }
+
+    private static Cell getNext(Iterator<Cell> iterator)
+    {
+        return iterator == null || !iterator.hasNext() ? null : iterator.next();
+    }
+
+    /**
+     * Creates a simple cell.
+     * <p>
+     * Note that in general cell objects are created by the container they are in and so this method should
+     * only be used in a handful of cases when we know it's the right thing to do.
+     *
+     * @param column the column for the cell to create.
+     * @param isCounter whether the create cell should be a counter one.
+     * @param value the value for the cell.
+     * @param info the liveness info for the cell.
+     * @param path the cell path for the cell.
+     * @return the newly allocated cell object.
+     */
+    public static Cell create(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+    {
+        return new SimpleCell(column, isCounter, value, info, path);
+    }
+
+    private static class SimpleCell extends AbstractCell
+    {
+        private final ColumnDefinition column;
+        private final boolean isCounter;
+        private final ByteBuffer value;
+        private final LivenessInfo info;
+        private final CellPath path;
+
+        private SimpleCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+        {
+            this.column = column;
+            this.isCounter = isCounter;
+            this.value = value;
+            this.info = info.takeAlias();
+            this.path = path;
+        }
+
+        public ColumnDefinition column()
+        {
+            return column;
+        }
+
+        public boolean isCounterCell()
+        {
+            return isCounter;
+        }
+
+        public ByteBuffer value()
+        {
+            return value;
+        }
+
+        public LivenessInfo livenessInfo()
+        {
+            return info;
+        }
+
+        public CellPath path()
+        {
+            return path;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/ColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
new file mode 100644
index 0000000..ea472eb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DeletionTime;
+
+public class ColumnData
+{
+    private final ColumnDefinition column;
+    private final Cell cell;
+    private final Iterator<Cell> cells;
+    private final DeletionTime complexDeletion;
+
+    ColumnData(ColumnDefinition column, Cell cell, Iterator<Cell> cells, DeletionTime complexDeletion)
+    {
+        assert column != null && (cell != null || (column.isComplex() && cells != null && complexDeletion != null));
+
+        this.column = column;
+        this.cell = cell;
+        this.cells = cells;
+        this.complexDeletion = complexDeletion;
+    }
+
+    public ColumnDefinition column()
+    {
+        return column;
+    }
+
+    public Cell cell()
+    {
+        return cell;
+    }
+
+    public Iterator<Cell> cells()
+    {
+        return cells;
+    }
+
+    public DeletionTime complexDeletion()
+    {
+        return complexDeletion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java
new file mode 100644
index 0000000..75df874
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Holds cells data and complex deletions for the complex columns of one or more rows.
+ * <p>
+ * Contrarily to {@code SimpleRowDataBlock}, each complex column can have multiple cells and
+ * we thus can't use a similar dense encoding. Instead, we still store the actual cell data
+ * in a {@code CellData} object, but we add a level of indirection (the cellIdx array in
+ * {@link ComplexCellBlock}) which for every column of every row stores 2 indexes: the index
+ * in the {@code CellData} where the first cell for this column is, and the the index of the
+ * last cell (or rather, the index to the first cell that does not belong to that column).
+ * <p>
+ * What makes this a little bit more complicated however is that in some cases (for
+ * {@link PartitionUpdate} typically), we need to be able to swap rows inside a
+ * {@code ComplexRowDataBlock} and the extra level of indirection makes that more complex.
+ * So in practice, we have 2 separate sub-implementation of a {@code ComplexRowDataBlock}:
+ *   - The first one, {@code SimpleComplexRowDataBlock} does not support swapping rows
+ *     (and is thus only used when we don't need to) but it uses a single {@code CellData}
+ *     for all the rows stored.
+ *   - The second one, {@code SortableComplexRowDataBlock}, uses one separate {@code CellData}
+ *     per row (in fact, a {@code ComplexCellBlock} which groups the cell data with the
+ *     indexing array discussed above) and simply keeps those per-row block in a list. It
+ *     is thus less compact in memory but make the swapping of rows trivial.
+ */
+public abstract class ComplexRowDataBlock
+{
+    private static final Logger logger = LoggerFactory.getLogger(ComplexRowDataBlock.class);
+
+    private final Columns columns;
+
+    // For each complex column, it's deletion time (if any): the nth complex column of row i
+    // will have it's deletion time at complexDelTimes[(i * ccs) + n] where ccs it the number
+    // of complex columns in 'columns'.
+    final DeletionTimeArray complexDelTimes;
+
+    protected ComplexRowDataBlock(Columns columns, int rows)
+    {
+        this.columns = columns;
+
+        int columnCount = rows * columns.complexColumnCount();
+        this.complexDelTimes = new DeletionTimeArray(columnCount);
+    }
+
+    public static ComplexRowDataBlock create(Columns columns, int rows, boolean sortable, boolean isCounter)
+    {
+        return sortable
+             ? new SortableComplexRowDataBlock(columns, rows, isCounter)
+             : new SimpleComplexRowDataBlock(columns, rows, isCounter);
+    }
+
+    public Columns columns()
+    {
+        return columns;
+    }
+
+    public CellData cellData(int row)
+    {
+        return cellBlock(row).data;
+    }
+
+    public int cellIdx(int row, ColumnDefinition c, CellPath path)
+    {
+        ComplexCellBlock block = cellBlock(row);
+        if (block == null)
+            return -1;
+
+        int base = cellBlockBase(row);
+        int i = base + 2 * columns.complexIdx(c, 0);
+
+        int start = block.cellIdx[i];
+        int end = block.cellIdx[i+1];
+
+        if (i >= block.cellIdx.length || end <= start)
+            return -1;
+
+        return Arrays.binarySearch(block.complexPaths, start, end, path, c.cellPathComparator());
+    }
+
+    // The following methods abstract the fact that we have 2 sub-implementations: both
+    // implementation will use a ComplexCellBlock to store a row, but one will use one
+    // ComplexCellBlock per row, while the other will store all rows into the same block.
+
+    // Returns the cell block for a given row. Can return null if the asked row has no data.
+    protected abstract ComplexCellBlock cellBlock(int row);
+    // Same as cellBlock(), but create the proper block if the row doesn't exists and return it.
+    protected abstract ComplexCellBlock cellBlockForWritting(int row);
+    // The index in the block returned by cellBlock()/cellBlockFroWriting() where the row starts.
+    protected abstract int cellBlockBase(int row);
+
+    protected abstract void swapCells(int i, int j);
+    protected abstract void mergeCells(int i, int j, int nowInSec);
+    protected abstract void moveCells(int i, int j);
+
+    protected abstract long cellDataUnsharedHeapSizeExcludingData();
+    protected abstract int dataCellSize();
+    protected abstract void clearCellData();
+
+    // Swap row i and j
+    public void swap(int i, int j)
+    {
+        swapCells(i, j);
+
+        int s = columns.complexColumnCount();
+        for (int k = 0; k < s; k++)
+            complexDelTimes.swap(i * s + k, j * s + k);
+    }
+
+    // Merge row i into j
+    public void merge(int i, int j, int nowInSec)
+    {
+        assert i > j;
+
+        mergeCells(i, j, nowInSec);
+
+        int s = columns.complexColumnCount();
+        if (i * s >= complexDelTimes.size())
+            return;
+
+        for (int k = 0; k < s; k++)
+            if (complexDelTimes.supersedes(i * s + k, j * s + k))
+                complexDelTimes.move(i * s + k, j * s + k);
+    }
+
+    // Move row i into j
+    public void move(int i, int j)
+    {
+        moveCells(i, j);
+        ensureDelTimesCapacity(Math.max(i, j));
+        int s = columns.complexColumnCount();
+        for (int k = 0; k < s; k++)
+            complexDelTimes.move(i * s + k, j * s + k);
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return cellDataUnsharedHeapSizeExcludingData() + complexDelTimes.unsharedHeapSize();
+    }
+
+    public int dataSize()
+    {
+        return dataCellSize() + complexDelTimes.dataSize();
+    }
+
+    public CellWriter cellWriter(boolean inOrderCells)
+    {
+        return new CellWriter(inOrderCells);
+    }
+
+    public int complexDeletionIdx(int row, ColumnDefinition column)
+    {
+        int baseIdx = columns.complexIdx(column, 0);
+        if (baseIdx < 0)
+            return -1;
+
+        int idx = (row * columns.complexColumnCount()) + baseIdx;
+        return idx < complexDelTimes.size() ? idx : -1;
+    }
+
+    public boolean hasComplexDeletion(int row)
+    {
+        int base = row * columns.complexColumnCount();
+        for (int i = base; i < base + columns.complexColumnCount(); i++)
+            if (!complexDelTimes.isLive(i))
+                return true;
+        return false;
+    }
+
+    public ByteBuffer getValue(int row, ColumnDefinition column, CellPath path)
+    {
+        CellData data = cellData(row);
+        assert data != null;
+        int idx = cellIdx(row, column, path);
+        return data.value(idx);
+    }
+
+    public void setValue(int row, ColumnDefinition column, CellPath path, ByteBuffer value)
+    {
+        CellData data = cellData(row);
+        assert data != null;
+        int idx = cellIdx(row, column, path);
+        data.setValue(idx, value);
+    }
+
+    public static ReusableIterator reusableComplexCells()
+    {
+        return new ReusableIterator();
+    }
+
+    public static DeletionTimeArray.Cursor complexDeletionCursor()
+    {
+        return new DeletionTimeArray.Cursor();
+    }
+
+    public static ReusableIterator reusableIterator()
+    {
+        return new ReusableIterator();
+    }
+
+    public void clear()
+    {
+        clearCellData();
+        complexDelTimes.clear();
+    }
+
+    private void ensureDelTimesCapacity(int rowToSet)
+    {
+        int originalCapacity = complexDelTimes.size() / columns.complexColumnCount();
+        if (rowToSet < originalCapacity)
+            return;
+
+        int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet);
+        complexDelTimes.resize(newCapacity * columns.complexColumnCount());
+    }
+
+    /**
+     * Simple sub-implementation that doesn't support swapping/sorting rows.
+     * The cell data for every row is stored in the same contiguous {@code ComplexCellBloc}
+     * object.
+     */
+    private static class SimpleComplexRowDataBlock extends ComplexRowDataBlock
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleComplexRowDataBlock(Columns.NONE, 0, false));
+
+        private final ComplexCellBlock cells;
+
+        private SimpleComplexRowDataBlock(Columns columns, int rows, boolean isCounter)
+        {
+            super(columns, rows);
+            this.cells = new ComplexCellBlock(columns, rows, isCounter);
+        }
+
+        protected ComplexCellBlock cellBlock(int row)
+        {
+            return cells;
+        }
+
+        protected ComplexCellBlock cellBlockForWritting(int row)
+        {
+            cells.ensureCapacity(row);
+            return cells;
+        }
+
+        protected int cellBlockBase(int row)
+        {
+            return 2 * row * columns().complexColumnCount();
+        }
+
+        // Swap cells from row i and j
+        public void swapCells(int i, int j)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        // Merge cells from row i into j
+        public void mergeCells(int i, int j, int nowInSec)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        // Move cells from row i into j
+        public void moveCells(int i, int j)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        protected long cellDataUnsharedHeapSizeExcludingData()
+        {
+            return EMPTY_SIZE + cells.unsharedHeapSizeExcludingData();
+        }
+
+        protected int dataCellSize()
+        {
+            return cells.dataSize();
+        }
+
+        protected void clearCellData()
+        {
+            cells.clear();
+        }
+    }
+
+    /**
+     * Sub-implementation that support swapping/sorting rows.
+     * The data for each row is stored in a different {@code ComplexCellBlock} object,
+     * making swapping rows easy.
+     */
+    private static class SortableComplexRowDataBlock extends ComplexRowDataBlock
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new SortableComplexRowDataBlock(Columns.NONE, 0, false));
+
+        // The cell data for each row.
+        private final List<ComplexCellBlock> cells;
+        private final boolean isCounter;
+
+        private SortableComplexRowDataBlock(Columns columns, int rows, boolean isCounter)
+        {
+            super(columns, rows);
+            this.cells = new ArrayList<>(rows);
+            this.isCounter = isCounter;
+        }
+
+        protected ComplexCellBlock cellBlockForWritting(int row)
+        {
+            if (row < cells.size())
+                return cells.get(row);
+
+            // Make sure the list of size 'row-1' before the insertion, adding nulls if necessary,
+            // so that we do are writing row 'row'
+            ensureCapacity(row-1);
+
+            assert row == cells.size();
+            ComplexCellBlock block = new ComplexCellBlock(columns(), 1, isCounter);
+            cells.add(block);
+            return block;
+        }
+
+        private void ensureCapacity(int row)
+        {
+            while (row >= cells.size())
+                cells.add(null);
+        }
+
+        protected ComplexCellBlock cellBlock(int row)
+        {
+            return row >= cells.size() ? null : cells.get(row);
+        }
+
+        protected int cellBlockBase(int row)
+        {
+            return 0;
+        }
+
+        // Swap row i and j
+        protected void swapCells(int i, int j)
+        {
+            int max = Math.max(i, j);
+            if (max >= cells.size())
+                ensureCapacity(max);
+
+            ComplexCellBlock block = cells.get(j);
+            move(i, j);
+            cells.set(i, block);
+        }
+
+        // Merge row i into j
+        protected void mergeCells(int i, int j, int nowInSec)
+        {
+            assert i > j;
+            if (i >= cells.size())
+                return;
+
+            ComplexCellBlock b1 = cells.get(i);
+            if (b1 == null)
+                return; // nothing to merge into j
+
+            ComplexCellBlock b2 = cells.get(j);
+            if (b2 == null)
+            {
+                cells.set(j, b1);
+                return;
+            }
+
+            ComplexCellBlock merged = new ComplexCellBlock(columns(), 1, isCounter);
+
+            int idxMerged = 0;
+            int s = columns().complexColumnCount();
+            for (int k = 0; k < s; k++)
+            {
+                ColumnDefinition column = columns().getComplex(k);
+                Comparator<CellPath> comparator = column.cellPathComparator();
+
+                merged.cellIdx[2 * k] = idxMerged;
+
+                int idx1 = b1.cellIdx[2 * k];
+                int end1 = b1.cellIdx[2 * k + 1];
+                int idx2 = b2.cellIdx[2 * k];
+                int end2 = b2.cellIdx[2 * k + 1];
+
+                while (idx1 < end1 || idx2 < end2)
+                {
+                    int cmp = idx1 >= end1 ? 1
+                            : (idx2 >= end2 ? -1
+                            : comparator.compare(b1.complexPaths[idx1], b2.complexPaths[idx2]));
+
+                    if (cmp == 0)
+                        merge(b1, idx1++, b2, idx2++, merged, idxMerged++, nowInSec);
+                    else if (cmp < 0)
+                        copy(b1, idx1++, merged, idxMerged++);
+                    else
+                        copy(b2, idx2++, merged, idxMerged++);
+                }
+
+                merged.cellIdx[2 * k + 1] = idxMerged;
+            }
+
+            cells.set(j, merged);
+        }
+
+        private void copy(ComplexCellBlock fromBlock, int fromIdx, ComplexCellBlock toBlock, int toIdx)
+        {
+            fromBlock.data.moveCell(fromIdx, toBlock.data, toIdx);
+            toBlock.ensureComplexPathsCapacity(toIdx);
+            toBlock.complexPaths[toIdx] = fromBlock.complexPaths[fromIdx];
+        }
+
+        private void merge(ComplexCellBlock b1, int idx1, ComplexCellBlock b2, int idx2, ComplexCellBlock mergedBlock, int mergedIdx, int nowInSec)
+        {
+            if (isCounter)
+                CellData.mergeCounterCell(b1.data, idx1, b2.data, idx2, mergedBlock.data, mergedIdx, nowInSec);
+            else
+                CellData.mergeRegularCell(b1.data, idx1, b2.data, idx2, mergedBlock.data, mergedIdx, nowInSec);
+            mergedBlock.ensureComplexPathsCapacity(mergedIdx);
+            mergedBlock.complexPaths[mergedIdx] = b1.complexPaths[idx1];
+        }
+
+        // Move row i into j
+        protected void moveCells(int i, int j)
+        {
+            int max = Math.max(i, j);
+            if (max >= cells.size())
+                ensureCapacity(max);
+
+            cells.set(j, cells.get(i));
+        }
+
+        protected long cellDataUnsharedHeapSizeExcludingData()
+        {
+            long size = EMPTY_SIZE;
+            for (ComplexCellBlock block : cells)
+                if (block != null)
+                    size += block.unsharedHeapSizeExcludingData();
+            return size;
+        }
+
+        protected int dataCellSize()
+        {
+            int size = 0;
+            for (ComplexCellBlock block : cells)
+                if (block != null)
+                    size += block.dataSize();
+            return size;
+        }
+
+        protected void clearCellData()
+        {
+            for (ComplexCellBlock block : cells)
+                if (block != null)
+                    block.clear();
+        }
+    }
+
+    /**
+     * Stores complex column cell data for one or more rows.
+     * <p>
+     * On top of a {@code CellData} object, this stores an index to where the cells
+     * of a given column start and stop in that {@code CellData} object (cellIdx)
+     * as well as the cell path for the cells (since {@code CellData} doesn't have those).
+     */
+    private static class ComplexCellBlock
+    {
+        private final Columns columns;
+
+        /*
+         * For a given complex column c, we have to store an unknown number of
+         * cells. So for each column of each row, we keep pointers (in data)
+         * to the start and end of the cells for this column (cells for a given
+         * columns are thus stored contiguously).
+         * For instance, if columns has 'c' complex columns, the x-th column of
+         * row 'n' will have it's cells in data at indexes
+         *    [cellIdx[2 * (n * c + x)], cellIdx[2 * (n * c + x) + 1])
+         */
+        private int[] cellIdx;
+
+        private final CellData data;
+
+        // The first free idx in data (for writing purposes).
+        private int idx;
+
+        // THe (complex) cells path. This is indexed exactly like the cells in data (so through cellIdx).
+        private CellPath[] complexPaths;
+
+        public ComplexCellBlock(Columns columns, int rows, boolean isCounter)
+        {
+            this.columns = columns;
+
+            int columnCount = columns.complexColumnCount();
+            this.cellIdx = new int[columnCount * 2 * rows];
+
+            // We start with an estimated 4 cells per complex column. The arrays
+            // will grow if needed so this is just a somewhat random estimation.
+            int cellCount =  columnCount * 4;
+            this.data = new CellData(cellCount, isCounter);
+            this.complexPaths = new CellPath[cellCount];
+        }
+
+        public void addCell(int columnIdx, ByteBuffer value, LivenessInfo info, CellPath path, boolean isFirstCell)
+        {
+            if (isFirstCell)
+                cellIdx[columnIdx] = idx;
+            cellIdx[columnIdx + 1] = idx + 1;
+
+            data.setCell(idx, value, info);
+            ensureComplexPathsCapacity(idx);
+            complexPaths[idx] = path;
+            idx++;
+        }
+
+        public long unsharedHeapSizeExcludingData()
+        {
+            long size = ObjectSizes.sizeOfArray(cellIdx)
+                      + data.unsharedHeapSizeExcludingData()
+                      + ObjectSizes.sizeOfArray(complexPaths);
+
+            for (int i = 0; i < complexPaths.length; i++)
+                if (complexPaths[i] != null)
+                    size += ((MemtableRowData.BufferCellPath)complexPaths[i]).unsharedHeapSizeExcludingData();
+            return size;
+        }
+
+        public int dataSize()
+        {
+            int size = data.dataSize() + cellIdx.length * 4;
+
+            for (int i = 0; i < complexPaths.length; i++)
+                if (complexPaths[i] != null)
+                    size += complexPaths[i].dataSize();
+
+            return size;
+        }
+
+        private void ensureCapacity(int rowToSet)
+        {
+            int columnCount = columns.complexColumnCount();
+            int originalCapacity = cellIdx.length / (2 * columnCount);
+            if (rowToSet < originalCapacity)
+                return;
+
+            int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet);
+            cellIdx = Arrays.copyOf(cellIdx, newCapacity * 2 * columnCount);
+        }
+
+        private void ensureComplexPathsCapacity(int idxToSet)
+        {
+            int originalCapacity = complexPaths.length;
+            if (idxToSet < originalCapacity)
+                return;
+
+            int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, idxToSet);
+            complexPaths = Arrays.copyOf(complexPaths, newCapacity);
+        }
+
+        public void clear()
+        {
+            data.clear();
+            Arrays.fill(cellIdx, 0);
+            Arrays.fill(complexPaths, null);
+            idx = 0;
+        }
+    }
+
+    /**
+     * Simple sublcassing of {@code CellData.ReusableCell} to include the cell path.
+     */
+    private static class ReusableCell extends CellData.ReusableCell
+    {
+        private ComplexCellBlock cellBlock;
+
+        ReusableCell setTo(ComplexCellBlock cellBlock, ColumnDefinition column, int idx)
+        {
+            this.cellBlock = cellBlock;
+            super.setTo(cellBlock.data, column, idx);
+            return this;
+        }
+
+        @Override
+        public CellPath path()
+        {
+            return cellBlock.complexPaths[idx];
+        }
+    }
+
+    /**
+     * An iterator over the complex cells of a given row.
+     * This is used both to iterate over all the (complex) cells of the row, or only on the cells
+     * of a given column within the row.
+     */
+    static class ReusableIterator extends UnmodifiableIterator<Cell>
+    {
+        private ComplexCellBlock cellBlock;
+        private final ReusableCell cell = new ReusableCell();
+
+        // The idx in 'cellBlock' of the row we're iterating over
+        private int rowIdx;
+
+        // columnIdx is the index in 'columns' of the current column we're iterating over.
+        // 'endColumnIdx' is the value of 'columnIdx' at which we should stop iterating.
+        private int columnIdx;
+        private int endColumnIdx;
+
+        // idx is the index in 'cellBlock.data' of the current cell this iterator is on. 'endIdx'
+        // is the index in 'cellBlock.data' of the first cell that does not belong to the current
+        // column we're iterating over (the one pointed by columnIdx).
+        private int idx;
+        private int endIdx;
+
+        private ReusableIterator()
+        {
+        }
+
+        // Sets the iterator for iterating over the cells of 'column' in 'row'
+        public ReusableIterator setTo(ComplexRowDataBlock dataBlock, int row, ColumnDefinition column)
+        {
+            if (dataBlock == null)
+            {
+                this.cellBlock = null;
+                return null;
+            }
+
+            this.cellBlock = dataBlock.cellBlock(row);
+            if (cellBlock == null)
+                return null;
+
+            rowIdx = dataBlock.cellBlockBase(row);
+
+            columnIdx = dataBlock.columns.complexIdx(column, 0);
+            if (columnIdx < 0)
+                return null;
+
+            // We only want the cells of 'column', so stop as soon as we've reach the next column
+            endColumnIdx = columnIdx + 1;
+
+            resetCellIdx();
+
+            return endIdx <= idx ? null : this;
+        }
+
+        // Sets the iterator for iterating over all the cells of 'row'
+        public ReusableIterator setTo(ComplexRowDataBlock dataBlock, int row)
+        {
+            if (dataBlock == null)
+            {
+                this.cellBlock = null;
+                return null;
+            }
+
+            this.cellBlock = dataBlock.cellBlock(row);
+            if (cellBlock == null)
+                return null;
+
+            rowIdx = dataBlock.cellBlockBase(row);
+
+            // We want to iterator over all columns
+            columnIdx = 0;
+            endColumnIdx = dataBlock.columns.complexColumnCount();
+
+            // Not every column might have cells, so set thing up so we're on the
+            // column having cells (with idx and endIdx sets properly for that column)
+            findNextColumnWithCells();
+            return columnIdx < endColumnIdx ? null : this;
+        }
+
+        private void findNextColumnWithCells()
+        {
+            while (columnIdx < endColumnIdx)
+            {
+                resetCellIdx();
+                if (idx < endIdx)
+                    return;
+                ++columnIdx;
+            }
+        }
+
+        // Provided that columnIdx and rowIdx are properly set, sets idx to the first
+        // cells of the pointed column, and endIdx to the first cell not for said column
+        private void resetCellIdx()
+        {
+            int i = rowIdx + 2 * columnIdx;
+            if (i >= cellBlock.cellIdx.length)
+            {
+                idx = 0;
+                endIdx = 0;
+            }
+            else
+            {
+                idx = cellBlock.cellIdx[i];
+                endIdx = cellBlock.cellIdx[i + 1];
+            }
+        }
+
+        public boolean hasNext()
+        {
+            if (cellBlock == null)
+                return false;
+
+            if (columnIdx >= endColumnIdx)
+                return false;
+
+            // checks if we have more cells for the current column
+            if (idx < endIdx)
+                return true;
+
+            // otherwise, find the next column that has cells.
+            ++columnIdx;
+            findNextColumnWithCells();
+
+            return columnIdx < endColumnIdx;
+        }
+
+        public Cell next()
+        {
+            return cell.setTo(cellBlock, cellBlock.columns.getComplex(columnIdx), idx++);
+        }
+    }
+
+    public class CellWriter
+    {
+        private final boolean inOrderCells;
+
+        private int base;
+        private int row;
+        private int lastColumnIdx;
+
+        public CellWriter(boolean inOrderCells)
+        {
+            this.inOrderCells = inOrderCells;
+        }
+
+        public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info, CellPath path)
+        {
+            assert path != null;
+
+            ComplexCellBlock cellBlock = cellBlockForWritting(row);
+
+            lastColumnIdx = columns.complexIdx(column, inOrderCells ? lastColumnIdx : 0);
+            assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns;
+
+            int idx = cellBlockBase(row) + 2 * lastColumnIdx;
+
+            int start = cellBlock.cellIdx[idx];
+            int end = cellBlock.cellIdx[idx + 1];
+
+            cellBlock.addCell(idx, value, info, path, end <= start);
+        }
+
+        public void setComplexDeletion(ColumnDefinition column, DeletionTime deletionTime)
+        {
+            int columnIdx = base + columns.complexIdx(column, 0);
+            ensureDelTimesCapacity(row);
+            complexDelTimes.set(columnIdx, deletionTime);
+        }
+
+        public void endOfRow()
+        {
+            base += columns.complexColumnCount();
+            lastColumnIdx = 0;
+            ++row;
+        }
+
+        public void reset()
+        {
+            base = 0;
+            row = 0;
+            lastColumnIdx = 0;
+            clearCellData();
+            complexDelTimes.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/CounterCells.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/CounterCells.java b/src/java/org/apache/cassandra/db/rows/CounterCells.java
new file mode 100644
index 0000000..732f195
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/CounterCells.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import org.apache.cassandra.db.context.CounterContext;
+
+public abstract class CounterCells
+{
+    private CounterCells() {}
+
+    private static final CounterContext contextManager = CounterContext.instance();
+
+    public static boolean hasLegacyShards(Cell cell)
+    {
+        return contextManager.hasLegacyShards(cell.value());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/FilteringRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/FilteringRow.java b/src/java/org/apache/cassandra/db/rows/FilteringRow.java
new file mode 100644
index 0000000..fb8f448
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/FilteringRow.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+
+public abstract class FilteringRow extends WrappingRow
+{
+    public static FilteringRow columnsFilteringRow(final Columns toInclude)
+    {
+        return new FilteringRow()
+        {
+            @Override
+            protected boolean include(ColumnDefinition column)
+            {
+                return toInclude.contains(column);
+            }
+        };
+    }
+
+    public static FilteringRow columnsFilteringRow(final ColumnFilter toInclude)
+    {
+        return new FilteringRow()
+        {
+            @Override
+            protected boolean include(ColumnDefinition column)
+            {
+                return toInclude.includes(column);
+            }
+
+            @Override
+            protected boolean include(Cell cell)
+            {
+                return toInclude.includes(cell);
+            }
+        };
+    }
+
+    public FilteringRow setTo(Row row)
+    {
+        super.setTo(row);
+        return this;
+    }
+
+    /**
+     * The following functions are meant to be overriden based on needs.
+     */
+    protected boolean include(Cell cell) { return true; }
+    protected boolean include(LivenessInfo info) { return true; }
+    protected boolean include(DeletionTime dt) { return true; }
+    protected boolean include(ColumnDefinition column) { return true; }
+    protected boolean include(ColumnDefinition c, DeletionTime dt) { return true; }
+
+    // Sublcasses that override this should be careful to call the overriden version first, or this might break FilteringRow (i.e. it might not
+    // filter what it should).
+    @Override
+    protected Cell filterCell(Cell cell)
+    {
+        return include(cell.column()) && include(cell.livenessInfo()) && include(cell) ? cell : null;
+    }
+
+    protected DeletionTime filterDeletionTime(DeletionTime deletion)
+    {
+        return deletion == null || !include(deletion)
+             ? DeletionTime.LIVE
+             : deletion;
+    }
+
+    @Override
+    public LivenessInfo primaryKeyLivenessInfo()
+    {
+        LivenessInfo info = super.primaryKeyLivenessInfo();
+        return include(info) ? info : LivenessInfo.NONE;
+    }
+
+    @Override
+    public DeletionTime deletion()
+    {
+        DeletionTime deletion = super.deletion();
+        return include(deletion) ? deletion : DeletionTime.LIVE;
+    }
+
+    @Override
+    public Iterator<Cell> getCells(ColumnDefinition c)
+    {
+        // slightly speed things up if we know we don't care at all about the column
+        if (!include(c))
+            return null;
+
+        return super.getCells(c);
+    }
+
+    @Override
+    public DeletionTime getDeletion(ColumnDefinition c)
+    {
+        if (!include(c))
+            return DeletionTime.LIVE;
+
+        DeletionTime dt = super.getDeletion(c);
+        return include(c, dt) ? dt : DeletionTime.LIVE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java b/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java
new file mode 100644
index 0000000..fd1c0a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import org.apache.cassandra.db.*;
+
+public class FilteringRowIterator extends WrappingUnfilteredRowIterator
+{
+    private final FilteringRow filter;
+    private Unfiltered next;
+
+    public FilteringRowIterator(UnfilteredRowIterator toFilter)
+    {
+        super(toFilter);
+        this.filter = makeRowFilter();
+    }
+
+    // Subclasses that want to filter withing row should overwrite this. Note that since FilteringRow
+    // is a reusable object, this method won't be called for every filtered row and the same filter will
+    // be used for every regular rows. However, this still can be called twice if we have a static row
+    // to filter, because we don't want to use the same object for them as this makes for weird behavior
+    // if calls to staticRow() are interleaved with hasNext().
+    protected FilteringRow makeRowFilter()
+    {
+        return null;
+    }
+
+    protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
+    {
+        return true;
+    }
+
+    // Allows to modify the range tombstone returned. This is called *after* includeRangeTombstoneMarker has been called.
+    protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
+    {
+        return marker;
+    }
+
+    protected boolean includeRow(Row row)
+    {
+        return true;
+    }
+
+    protected boolean includePartitionDeletion(DeletionTime dt)
+    {
+        return true;
+    }
+
+    @Override
+    public DeletionTime partitionLevelDeletion()
+    {
+        DeletionTime dt = wrapped.partitionLevelDeletion();
+        return includePartitionDeletion(dt) ? dt : DeletionTime.LIVE;
+    }
+
+    @Override
+    public Row staticRow()
+    {
+        Row row = super.staticRow();
+        if (row == Rows.EMPTY_STATIC_ROW)
+            return row;
+
+        FilteringRow filter = makeRowFilter();
+        if (filter != null)
+            row = filter.setTo(row);
+
+        return !row.isEmpty() && includeRow(row) ? row : Rows.EMPTY_STATIC_ROW;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        if (next != null)
+            return true;
+
+        while (super.hasNext())
+        {
+            Unfiltered unfiltered = super.next();
+            if (unfiltered.kind() == Unfiltered.Kind.ROW)
+            {
+                Row row = filter == null ? (Row) unfiltered : filter.setTo((Row) unfiltered);
+                if (!row.isEmpty() && includeRow(row))
+                {
+                    next = row;
+                    return true;
+                }
+            }
+            else
+            {
+                RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
+                if (includeRangeTombstoneMarker(marker))
+                {
+                    next = filterRangeTombstoneMarker(marker, isReverseOrder());
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Unfiltered next()
+    {
+        if (next == null)
+            hasNext();
+
+        Unfiltered toReturn = next;
+        next = null;
+        return toReturn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
new file mode 100644
index 0000000..6241a89
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * Abstract class to create UnfilteredRowIterator that lazily initialize themselves.
+ *
+ * This is used during partition range queries when we know the partition key but want
+ * to defer the initialization of the rest of the UnfilteredRowIterator until we need those informations.
+ * See {@link BigTableScanner#KeyScanningIterator} for instance.
+ */
+public abstract class LazilyInitializedUnfilteredRowIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator
+{
+    private final DecoratedKey partitionKey;
+
+    private UnfilteredRowIterator iterator;
+
+    public LazilyInitializedUnfilteredRowIterator(DecoratedKey partitionKey)
+    {
+        this.partitionKey = partitionKey;
+    }
+
+    protected abstract UnfilteredRowIterator initializeIterator();
+
+    private void maybeInit()
+    {
+        if (iterator == null)
+            iterator = initializeIterator();
+    }
+
+    public CFMetaData metadata()
+    {
+        maybeInit();
+        return iterator.metadata();
+    }
+
+    public PartitionColumns columns()
+    {
+        maybeInit();
+        return iterator.columns();
+    }
+
+    public boolean isReverseOrder()
+    {
+        maybeInit();
+        return iterator.isReverseOrder();
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return partitionKey;
+    }
+
+    public DeletionTime partitionLevelDeletion()
+    {
+        maybeInit();
+        return iterator.partitionLevelDeletion();
+    }
+
+    public Row staticRow()
+    {
+        maybeInit();
+        return iterator.staticRow();
+    }
+
+    public RowStats stats()
+    {
+        maybeInit();
+        return iterator.stats();
+    }
+
+    protected Unfiltered computeNext()
+    {
+        maybeInit();
+        return iterator.hasNext() ? iterator.next() : endOfData();
+    }
+
+    public void close()
+    {
+        if (iterator != null)
+            iterator.close();
+    }
+}


Mime
View raw message