cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [06/15] cassandra git commit: Simplify some 8099's implementations
Date Wed, 22 Jul 2015 16:05:35 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 545da7a..ad21c69 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -17,35 +17,31 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.Iterators;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.SearchIterator;
 
 /**
  * Storage engine representation of a row.
  *
- * A row is identified by it's clustering column values (it's an Unfiltered),
- * has row level informations (deletion and partition key liveness infos (see below))
- * and contains data (Cells) regarding the columns it contains.
+ * A row mainly contains the following informations:
+ *   1) Its {@code Clustering}, which holds the values for the clustering columns identifying the row.
+ *   2) Its row level informations: the primary key liveness infos and the row deletion (see
+ *      {@link #primaryKeyLivenessInfo()} and {@link #deletion()} for more details).
+ *   3) Data for the columns it contains, or in other words, it's a (sorted) collection of
+ *      {@code ColumnData}.
  *
- * A row implements {@code WithLivenessInfo} and has thus a timestamp, ttl and
- * local deletion time. Those information do not apply to the row content, they
- * apply to the partition key columns. In other words, the timestamp is the
- * timestamp for the partition key columns: it is what allows to distinguish
- * between a dead row, and a live row but for which only the partition key columns
- * are set. The ttl and local deletion time information are for the case where
- * a TTL is set on those partition key columns. Note however that a row can have
- * live cells but no partition key columns timestamp, because said timestamp (and
- * its corresponding ttl) is only set on INSERT (not UPDATE).
+ * Also note that as for every other storage engine object, a {@code Row} object cannot shadow
+ * it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own
+ * row deletion.
  */
-public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
+public interface Row extends Unfiltered, Iterable<ColumnData>
 {
     /**
      * The clustering values for this row.
@@ -79,17 +75,14 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
      * As a row is uniquely identified by its primary key, all its primary key columns
      * share the same {@code LivenessInfo}. This liveness information is what allows us
      * to distinguish between a dead row (it has no live cells and its primary key liveness
-     * info has no timestamp) and a live row but where all non PK columns are null (it has no
-     * live cells, but its primary key liveness has a timestamp). Please note that the ttl
-     * (and local deletion time) of the PK liveness information only apply to the
-     * liveness info timestamp, and not to the content of the row. Also note that because
-     * in practice there is not way to only delete the primary key columns (without deleting
-     * the row itself), the returned {@code LivenessInfo} can only have a local deletion time
-     * if it has a TTL.
+     * info is empty) and a live row but where all non PK columns are null (it has no
+     * live cells, but its primary key liveness is not empty). Please note that the liveness
+     * info (including it's eventually ttl/local deletion time) only apply to the primary key
+     * columns and has no impact on the row content.
      * <p>
-     * Lastly, note that it is possible for a row to have live cells but no PK liveness
-     * info timestamp, because said timestamp is only set on {@code INSERT} (which makes sense
-     * in itself, see #6782) but live cells can be add through {@code UPDATE} even if the row
+     * Note in particular that a row may have live cells but no PK liveness info, because the
+     * primary key liveness informations are only set on {@code INSERT} (which makes sense
+     * in itself, see #6782) but live cells can be added through {@code UPDATE} even if the row
      * wasn't pre-existing (which users are encouraged not to do, but we can't validate).
      */
     public LivenessInfo primaryKeyLivenessInfo();
@@ -102,10 +95,10 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
     public boolean isStatic();
 
     /**
-     * Whether the row has no information whatsoever. This means no row infos
-     * (timestamp, ttl, deletion), no cells and no complex deletion info.
+     * Whether the row has no information whatsoever. This means no PK liveness info, no row
+     * deletion, no cells and no complex deletion info.
      *
-     * @return {@code true} if the row has no data whatsoever, {@code false} otherwise.
+     * @return {@code true} if the row has no data, {@code false} otherwise.
      */
     public boolean isEmpty();
 
@@ -115,20 +108,8 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
     public boolean hasLiveData(int nowInSec);
 
     /**
-     * Whether or not this row contains any deletion for a complex column. That is if
-     * there is at least one column for which {@code getDeletion} returns a non
-     * live deletion time.
-     */
-    public boolean hasComplexDeletion();
-
-    /**
      * Returns a cell for a simple column.
      *
-     * Calls to this method are allowed to return the same Cell object, and hence the returned
-     * object is only valid until the next getCell/getCells call on the same Row object. You will need
-     * to copy the returned data if you plan on using a reference to the Cell object
-     * longer than that.
-     *
      * @param c the simple column for which to fetch the cell.
      * @return the corresponding cell or {@code null} if the row has no such cell.
      */
@@ -137,11 +118,6 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
     /**
      * Return a cell for a given complex column and cell path.
      *
-     * Calls to this method are allowed to return the same Cell object, and hence the returned
-     * object is only valid until the next getCell/getCells call on the same Row object. You will need
-     * to copy the returned data if you plan on using a reference to the Cell object
-     * longer than that.
-     *
      * @param c the complex column for which to fetch the cell.
      * @param path the cell path for which to fetch the cell.
      * @return the corresponding cell or {@code null} if the row has no such cell.
@@ -149,43 +125,35 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
     public Cell getCell(ColumnDefinition c, CellPath path);
 
     /**
-     * Returns an iterator on the cells of a complex column c.
-     *
-     * Calls to this method are allowed to return the same iterator object, and
-     * hence the returned object is only valid until the next getCell/getCells call
-     * on the same Row object. You will need to copy the returned data if you
-     * plan on using a reference to the Cell object longer than that.
+     * The data for a complex column.
+     * <p>
+     * The returned object groups all the cells for the column, as well as it's complex deletion (if relevant).
      *
-     * @param c the complex column for which to fetch the cells.
-     * @return an iterator on the cells of complex column {@code c} or {@code null} if the row has no
-     * cells for that column.
+     * @param c the complex column for which to return the complex data.
+     * @return the data for {@code c} or {@code null} is the row has no data for this column.
      */
-    public Iterator<Cell> getCells(ColumnDefinition c);
+    public ComplexColumnData getComplexColumnData(ColumnDefinition c);
 
     /**
-     * Deletion informations for complex columns.
+     * An iterable over the cells of this row.
+     * <p>
+     * The iterable guarantees that cells are returned in order of {@link Cell#comparator}.
      *
-     * @param c the complex column for which to fetch deletion info.
-     * @return the deletion time for complex column {@code c} in this row.
+     * @return an iterable over the cells of this row.
      */
-    public DeletionTime getDeletion(ColumnDefinition c);
+    public Iterable<Cell> cells();
 
     /**
-     * An iterator over the cells of this row.
-     *
-     * The iterator guarantees that for 2 rows of the same partition, columns
-     * are returned in a consistent order in the sense that if the cells for
-     * column c1 is returned before the cells for column c2 by the first iterator,
-     * it is also the case for the 2nd iterator.
-     *
-     * The object returned by a call to next() is only guaranteed to be valid until
-     * the next call to hasNext() or next(). If a consumer wants to keep a
-     * reference on the returned Cell objects for longer than the iteration, it must
-     * make a copy of it explicitly.
+     * Whether the row stores any (non-live) complex deletion for any complex column.
+     */
+    public boolean hasComplexDeletion();
+
+    /**
+     * Whether the row has any deletion info (row deletion, cell tombstone, expired cell or complex deletion).
      *
-     * @return an iterator over the cells of this row.
+     * @param nowInSec the current time in seconds to decid if a cell is expired.
      */
-    public Iterator<Cell> iterator();
+    public boolean hasDeletion(int nowInSec);
 
     /**
      * An iterator to efficiently search data for a given column.
@@ -195,134 +163,167 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
     public SearchIterator<ColumnDefinition, ColumnData> searchIterator();
 
     /**
-     * Copy this row to the provided writer.
+     * Returns a copy of this row that:
+     *   1) only includes the data for the column included by {@code filter}.
+     *   2) doesn't include any data that belongs to a dropped column (recorded in {@code metadata}).
+     */
+    public Row filter(ColumnFilter filter, CFMetaData metadata);
+
+    /**
+     * Returns a copy of this row that:
+     *   1) only includes the data for the column included by {@code filter}.
+     *   2) doesn't include any data that belongs to a dropped column (recorded in {@code metadata}).
+     *   3) doesn't include any data that is shadowed/deleted by {@code activeDeletion}.
+     *   4) uses {@code activeDeletion} as row deletion iff {@code setActiveDeletionToRow} and {@code activeDeletion} supersedes the row deletion.
+     */
+    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata);
+
+    /**
+     * Returns a copy of this row without any deletion info that should be purged according to {@code purger}.
      *
-     * @param writer the row writer to write this row to.
+     * @param purger the {@code DeletionPurger} to use to decide what can be purged.
+     * @param nowInSec the current time to decide what is deleted and what isn't (in the case of expired cells).
+     * @return this row but without any deletion info purged by {@code purger}.
+     */
+    public Row purge(DeletionPurger purger, int nowInSec);
+
+    /**
+     * Returns a copy of this row where all counter cells have they "local" shard marked for clearing.
+     */
+    public Row markCounterLocalToBeCleared();
+
+    /**
+     * returns a copy of this row where all live timestamp have been replaced by {@code newTimestamp} and every deletion timestamp
+     * by {@code newTimestamp - 1}. See {@link Commit} for why we need this.
      */
-    public void copyTo(Row.Writer writer);
+    public Row updateAllTimestamp(long newTimestamp);
+
+    public int dataSize();
+
+    public long unsharedHeapSizeExcludingData();
 
     public String toString(CFMetaData metadata, boolean fullDetails);
 
     /**
-     * Interface for writing a row.
+     * Interface for building rows.
      * <p>
-     * Clients of this interface should abid to the following assumptions:
-     *   1) if the row has a non empty clustering (it's not a static one and it doesn't belong to a table without
-     *      clustering columns), then that clustering should be the first thing written (through
-     *      {@link ClusteringPrefix.Writer#writeClusteringValue})).
-     *   2) for a given complex column, calls to {@link #writeCell} are performed consecutively (without
-     *      any call to {@code writeCell} for another column intermingled) and in {@code CellPath} order.
-     *   3) {@link #endOfRow} is always called to end the writing of a given row.
+     * The builder of a row should always abid to the following rules:
+     *   1) {@link #newRow} is always called as the first thing for the row.
+     *   2) {@link #addPrimaryKeyLivenessInfo} and {@link #addRowDeletion}, if called, are called before
+     *      any {@link #addCell}/{@link #addComplexDeletion} call.
+     *   3) {@link #build} is called to construct the new row. The builder can then be reused.
+     *
+     * There is 2 variants of a builder: sorted and unsorted ones. A sorted builder expects user to abid to the
+     * following additional rules:
+     *   4) Calls to {@link #addCell}/{@link #addComplexDeletion} are done in strictly increasing column order.
+     *      In other words, all calls to these methods for a give column {@code c} are done after any call for
+     *      any column before {@code c} and before any call for any column after {@code c}.
+     *   5) Calls to {@link #addCell} are further done in strictly increasing cell order (the one defined by
+     *      {@link Cell#comparator}. That is, for a give column, cells are passed in {@code CellPath} order.
+     *
+     * An unsorted builder will not expect those last rules however: {@link #addCell} and {@link #addComplexDeletion}
+     * can be done in any order. And in particular unsorted builder allows multiple calls for the same column/cell. In
+     * that latter case, the result will follow the usual reconciliation rules (so equal cells are reconciled with
+     * {@link Cells#reconcile} and the "biggest" of multiple complex deletion for the same column wins).
      */
-    public interface Writer extends ClusteringPrefix.Writer
+    public interface Builder
     {
         /**
-         * Writes the livness information for the partition key columns of this row.
+         * Whether the builder is a sorted one or not.
+         *
+         * @return if the builder requires calls to be done in sorted order or not (see above).
+         */
+        public boolean isSorted();
+
+        /**
+         * Prepares the builder to build a new row of clustering {@code clustering}.
+         * <p>
+         * This should always be the first call for a given row.
          *
-         * This call is optional: skipping it is equivalent to calling {@code writePartitionKeyLivenessInfo(LivenessInfo.NONE)}.
+         * @param clustering the clustering for the new row.
+         */
+        public void newRow(Clustering clustering);
+
+        /**
+         * The clustering for the row that is currently being built.
          *
-         * @param info the liveness information for the partition key columns of the written row.
+         * @return the clustering for the row that is currently being built, or {@code null} if {@link #newRow} hasn't
+         * yet been called.
          */
-        public void writePartitionKeyLivenessInfo(LivenessInfo info);
+        public Clustering clustering();
 
         /**
-         * Writes the deletion information for this row.
+         * Adds the liveness information for the partition key columns of this row.
+         *
+         * This call is optional (skipping it is equivalent to calling {@code addPartitionKeyLivenessInfo(LivenessInfo.NONE)}).
+         *
+         * @param info the liveness information for the partition key columns of the built row.
+         */
+        public void addPrimaryKeyLivenessInfo(LivenessInfo info);
+
+        /**
+         * Adds the deletion information for this row.
          *
          * This call is optional and can be skipped if the row is not deleted.
          *
          * @param deletion the row deletion time, or {@code DeletionTime.LIVE} if the row isn't deleted.
          */
-        public void writeRowDeletion(DeletionTime deletion);
+        public void addRowDeletion(DeletionTime deletion);
 
         /**
-         * Writes a cell to the writer.
+         * Adds a cell to this builder.
          *
-         * As mentionned above, add cells for a given column should be added consecutively (and in {@code CellPath} order for complex columns).
-         *
-         * @param column the column for the written cell.
-         * @param isCounter whether or not this is a counter cell.
-         * @param value the value for the cell. For tombstones, which don't have values, this should be an empty buffer.
-         * @param info the cell liveness information.
-         * @param path the {@link CellPath} for complex cells and {@code null} for regular cells.
+         * @param cell the cell to add.
          */
-        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path);
+        public void addCell(Cell cell);
 
         /**
-         * Writes a deletion for a complex column, that is one that apply to all cells of the complex column.
+         * Adds a complex deletion.
          *
-         * @param column the (complex) column this is a deletion for.
-         * @param complexDeletion the deletion time.
+         * @param column the column for which to add the {@code complexDeletion}.
+         * @param complexDeletion the complex deletion time to add.
          */
-        public void writeComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion);
+        public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion);
 
         /**
-         * Should be called to indicates that the row has been fully written.
+         * Builds and return built row.
+         *
+         * @return the last row built by this builder.
          */
-        public void endOfRow();
+        public Row build();
     }
 
     /**
      * Utility class to help merging rows from multiple inputs (UnfilteredRowIterators).
      */
-    public abstract static class Merger
+    public static class Merger
     {
-        private final CFMetaData metadata;
-        private final int nowInSec;
-        private final UnfilteredRowIterators.MergeListener listener;
         private final Columns columns;
+        private final Row[] rows;
+        private final List<Iterator<ColumnData>> columnDataIterators;
 
         private Clustering clustering;
-        private final Row[] rows;
         private int rowsToMerge;
+        private int lastRowSet = -1;
 
-        private LivenessInfo rowInfo = LivenessInfo.NONE;
-        private DeletionTime rowDeletion = DeletionTime.LIVE;
-
-        private final Cell[] cells;
-        private final List<Iterator<Cell>> complexCells;
-        private final ComplexColumnReducer complexReducer = new ComplexColumnReducer();
+        private final List<ColumnData> dataBuffer = new ArrayList<>();
+        private final ColumnDataReducer columnDataReducer;
 
-        // For the sake of the listener if there is one
-        private final DeletionTime[] complexDelTimes;
-
-        private boolean signaledListenerForRow;
-
-        public static Merger createStatic(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
-        {
-            return new StaticMerger(metadata, size, nowInSec, columns, listener);
-        }
-
-        public static Merger createRegular(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+        public Merger(int size, int nowInSec, Columns columns)
         {
-            return new RegularMerger(metadata, size, nowInSec, columns, listener);
-        }
-
-        protected Merger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
-        {
-            this.metadata = metadata;
-            this.nowInSec = nowInSec;
-            this.listener = listener;
             this.columns = columns;
             this.rows = new Row[size];
-            this.complexCells = new ArrayList<>(size);
-
-            this.cells = new Cell[size];
-            this.complexDelTimes = listener == null ? null : new DeletionTime[size];
+            this.columnDataIterators = new ArrayList<>(size);
+            this.columnDataReducer = new ColumnDataReducer(size, nowInSec, columns.hasComplex());
         }
 
         public void clear()
         {
+            dataBuffer.clear();
             Arrays.fill(rows, null);
-            Arrays.fill(cells, null);
-            if (complexDelTimes != null)
-                Arrays.fill(complexDelTimes, null);
-            complexCells.clear();
+            columnDataIterators.clear();
             rowsToMerge = 0;
-
-            rowInfo = LivenessInfo.NONE;
-            rowDeletion = DeletionTime.LIVE;
-
-            signaledListenerForRow = false;
+            lastRowSet = -1;
         }
 
         public void add(int i, Row row)
@@ -330,225 +331,187 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
             clustering = row.clustering();
             rows[i] = row;
             ++rowsToMerge;
+            lastRowSet = i;
         }
 
-        protected abstract Row.Writer getWriter();
-        protected abstract Row getRow();
-
         public Row merge(DeletionTime activeDeletion)
         {
             // If for this clustering we have only one row version and have no activeDeletion (i.e. nothing to filter out),
-            // then we can just return that single row (we also should have no listener)
-            if (rowsToMerge == 1 && activeDeletion.isLive() && listener == null)
+            // then we can just return that single row
+            if (rowsToMerge == 1 && activeDeletion.isLive())
             {
-                for (int i = 0; i < rows.length; i++)
-                    if (rows[i] != null)
-                        return rows[i];
-                throw new AssertionError();
+                Row row = rows[lastRowSet];
+                assert row != null;
+                return row;
             }
 
-            Row.Writer writer = getWriter();
-            Rows.writeClustering(clustering, writer);
-
-            for (int i = 0; i < rows.length; i++)
+            LivenessInfo rowInfo = LivenessInfo.EMPTY;
+            DeletionTime rowDeletion = DeletionTime.LIVE;
+            for (Row row : rows)
             {
-                if (rows[i] == null)
+                if (row == null)
                     continue;
 
-                rowInfo = rowInfo.mergeWith(rows[i].primaryKeyLivenessInfo());
-
-                if (rows[i].deletion().supersedes(rowDeletion))
-                    rowDeletion = rows[i].deletion();
+                if (row.primaryKeyLivenessInfo().supersedes(rowInfo))
+                    rowInfo = row.primaryKeyLivenessInfo();
+                if (row.deletion().supersedes(rowDeletion))
+                    rowDeletion = row.deletion();
             }
 
-            if (rowDeletion.supersedes(activeDeletion))
+            if (activeDeletion.supersedes(rowDeletion))
+                rowDeletion = DeletionTime.LIVE;
+            else
                 activeDeletion = rowDeletion;
 
             if (activeDeletion.deletes(rowInfo))
-                rowInfo = LivenessInfo.NONE;
+                rowInfo = LivenessInfo.EMPTY;
 
-            writer.writePartitionKeyLivenessInfo(rowInfo);
-            writer.writeRowDeletion(rowDeletion);
+            for (Row row : rows)
+                columnDataIterators.add(row == null ? Collections.emptyIterator() : row.iterator());
 
-            for (int i = 0; i < columns.simpleColumnCount(); i++)
+            columnDataReducer.setActiveDeletion(activeDeletion);
+            Iterator<ColumnData> merged = MergeIterator.get(columnDataIterators, ColumnData.comparator, columnDataReducer);
+            while (merged.hasNext())
             {
-                ColumnDefinition c = columns.getSimple(i);
-                for (int j = 0; j < rows.length; j++)
-                    cells[j] = rows[j] == null ? null : rows[j].getCell(c);
-
-                reconcileCells(activeDeletion, writer);
+                ColumnData data = merged.next();
+                if (data != null)
+                    dataBuffer.add(data);
             }
 
-            complexReducer.activeDeletion = activeDeletion;
-            complexReducer.writer = writer;
-            for (int i = 0; i < columns.complexColumnCount(); i++)
-            {
-                ColumnDefinition c = columns.getComplex(i);
-
-                DeletionTime maxComplexDeletion = DeletionTime.LIVE;
-                for (int j = 0; j < rows.length; j++)
-                {
-                    if (rows[j] == null)
-                        continue;
-
-                    DeletionTime dt = rows[j].getDeletion(c);
-                    if (complexDelTimes != null)
-                        complexDelTimes[j] = dt;
-
-                    if (dt.supersedes(maxComplexDeletion))
-                        maxComplexDeletion = dt;
-                }
-
-                boolean overrideActive = maxComplexDeletion.supersedes(activeDeletion);
-                maxComplexDeletion =  overrideActive ? maxComplexDeletion : DeletionTime.LIVE;
-                writer.writeComplexDeletion(c, maxComplexDeletion);
-                if (listener != null)
-                    listener.onMergedComplexDeletion(c, maxComplexDeletion, complexDelTimes);
-
-                mergeComplex(overrideActive ? maxComplexDeletion : activeDeletion, c);
-            }
-            writer.endOfRow();
-
-            Row row = getRow();
-            // Because shadowed cells are skipped, the row could be empty. In which case
-            // we return null (we also don't want to signal anything in that case since that
-            // means everything in the row was shadowed and the listener will have been signalled
-            // for whatever shadows it).
-            if (row.isEmpty())
-                return null;
-
-            maybeSignalEndOfRow();
-            return row;
+            // Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
+            return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
+                 ? null
+                 : ArrayBackedRow.create(clustering, columns, rowInfo, rowDeletion, dataBuffer.size(), dataBuffer.toArray(new ColumnData[dataBuffer.size()]));
         }
 
-        private void maybeSignalListenerForRow()
+        public Clustering mergedClustering()
         {
-            if (listener != null && !signaledListenerForRow)
-            {
-                listener.onMergingRows(clustering, rowInfo, rowDeletion, rows);
-                signaledListenerForRow = true;
-            }
+            return clustering;
         }
 
-        private void maybeSignalListenerForCell(Cell merged, Cell[] versions)
+        public Row[] mergedRows()
         {
-            if (listener != null)
-            {
-                maybeSignalListenerForRow();
-                listener.onMergedCells(merged, versions);
-            }
+            return rows;
         }
 
-        private void maybeSignalEndOfRow()
+        private static class ColumnDataReducer extends MergeIterator.Reducer<ColumnData, ColumnData>
         {
-            if (listener != null)
-            {
-                // If we haven't signaled the listener yet (we had no cells but some deletion info), do it now
-                maybeSignalListenerForRow();
-                listener.onRowDone();
-            }
-        }
+            private final int nowInSec;
 
-        private void reconcileCells(DeletionTime activeDeletion, Row.Writer writer)
-        {
-            Cell reconciled = null;
-            for (int j = 0; j < cells.length; j++)
-            {
-                Cell cell = cells[j];
-                if (cell != null && !activeDeletion.deletes(cell.livenessInfo()))
-                    reconciled = Cells.reconcile(reconciled, cell, nowInSec);
-            }
+            private ColumnDefinition column;
+            private final List<ColumnData> versions;
 
-            if (reconciled != null)
+            private DeletionTime activeDeletion;
+
+            private final ComplexColumnData.Builder complexBuilder;
+            private final List<Iterator<Cell>> complexCells;
+            private final CellReducer cellReducer;
+
+            public ColumnDataReducer(int size, int nowInSec, boolean hasComplex)
             {
-                reconciled.writeTo(writer);
-                maybeSignalListenerForCell(reconciled, cells);
+                this.nowInSec = nowInSec;
+                this.versions = new ArrayList<>(size);
+                this.complexBuilder = hasComplex ? ComplexColumnData.builder() : null;
+                this.complexCells = hasComplex ? new ArrayList<>(size) : null;
+                this.cellReducer = new CellReducer(nowInSec);
             }
-        }
 
-        private void mergeComplex(DeletionTime activeDeletion, ColumnDefinition c)
-        {
-            complexCells.clear();
-            for (int j = 0; j < rows.length; j++)
+            public void setActiveDeletion(DeletionTime activeDeletion)
             {
-                Row row = rows[j];
-                Iterator<Cell> iter = row == null ? null : row.getCells(c);
-                complexCells.add(iter == null ? Iterators.<Cell>emptyIterator() : iter);
+                this.activeDeletion = activeDeletion;
             }
 
-            complexReducer.column = c;
-            complexReducer.activeDeletion = activeDeletion;
-
-            // Note that we use the mergeIterator only to group cells to merge, but we
-            // write the result to the writer directly in the reducer, so all we care
-            // about is iterating over the result.
-            Iterator<Void> iter = MergeIterator.get(complexCells, c.cellComparator(), complexReducer);
-            while (iter.hasNext())
-                iter.next();
-        }
-
-        private class ComplexColumnReducer extends MergeIterator.Reducer<Cell, Void>
-        {
-            private DeletionTime activeDeletion;
-            private Row.Writer writer;
-            private ColumnDefinition column;
-
-            public void reduce(int idx, Cell current)
+            public void reduce(int idx, ColumnData data)
             {
-                cells[idx] = current;
+                column = data.column();
+                versions.add(data);
             }
 
-            protected Void getReduced()
+            protected ColumnData getReduced()
             {
-                reconcileCells(activeDeletion, writer);
-                return null;
+                if (column.isSimple())
+                {
+                    Cell merged = null;
+                    for (ColumnData data : versions)
+                    {
+                        Cell cell = (Cell)data;
+                        if (!activeDeletion.deletes(cell))
+                            merged = merged == null ? cell : Cells.reconcile(merged, cell, nowInSec);
+                    }
+                    return merged;
+                }
+                else
+                {
+                    complexBuilder.newColumn(column);
+                    complexCells.clear();
+                    DeletionTime complexDeletion = DeletionTime.LIVE;
+                    for (ColumnData data : versions)
+                    {
+                        ComplexColumnData cd = (ComplexColumnData)data;
+                        if (cd.complexDeletion().supersedes(complexDeletion))
+                            complexDeletion = cd.complexDeletion();
+                        complexCells.add(cd.iterator());
+                    }
+
+                    if (complexDeletion.supersedes(activeDeletion))
+                    {
+                        cellReducer.setActiveDeletion(complexDeletion);
+                        complexBuilder.addComplexDeletion(complexDeletion);
+                    }
+                    else
+                    {
+                        cellReducer.setActiveDeletion(activeDeletion);
+                    }
+
+                    Iterator<Cell> cells = MergeIterator.get(complexCells, ColumnData.cellComparator, cellReducer);
+                    while (cells.hasNext())
+                    {
+                        Cell merged = cells.next();
+                        if (merged != null)
+                            complexBuilder.addCell(merged);
+                    }
+                    return complexBuilder.build();
+                }
             }
 
             protected void onKeyChange()
             {
-                Arrays.fill(cells, null);
+                versions.clear();
             }
         }
 
-        private static class StaticMerger extends Merger
+        private static class CellReducer extends MergeIterator.Reducer<Cell, Cell>
         {
-            private final StaticRow.Builder builder;
+            private final int nowInSec;
 
-            private StaticMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
-            {
-                super(metadata, size, nowInSec, columns, listener);
-                this.builder = StaticRow.builder(columns, true, metadata.isCounter());
-            }
+            private DeletionTime activeDeletion;
+            private Cell merged;
 
-            protected Row.Writer getWriter()
+            public CellReducer(int nowInSec)
             {
-                return builder;
+                this.nowInSec = nowInSec;
             }
 
-            protected Row getRow()
+            public void setActiveDeletion(DeletionTime activeDeletion)
             {
-                return builder.build();
+                this.activeDeletion = activeDeletion;
+                onKeyChange();
             }
-        }
-
-        private static class RegularMerger extends Merger
-        {
-            private final ReusableRow row;
 
-            private RegularMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+            public void reduce(int idx, Cell cell)
             {
-                super(metadata, size, nowInSec, columns, listener);
-                this.row = new ReusableRow(metadata.clusteringColumns().size(), columns, true, metadata.isCounter());
+                if (!activeDeletion.deletes(cell))
+                    merged = merged == null ? cell : Cells.reconcile(merged, cell, nowInSec);
             }
 
-            protected Row.Writer getWriter()
+            protected Cell getReduced()
             {
-                return row.writer();
+                return merged;
             }
 
-            protected Row getRow()
+            protected void onKeyChange()
             {
-                return row;
+                merged = null;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
new file mode 100644
index 0000000..2a10199
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+
+/**
+ * An iterator that merges a source of rows with the range tombstone and partition level deletion of a give partition.
+ * <p>
+ * This is used by our {@code Partition} implementations to produce a {@code UnfilteredRowIterator} by merging the rows
+ * and deletion infos that are kept separate. This has also 2 additional role:
+ *   1) this make sure the row returned only includes the columns selected for the resulting iterator.
+ *   2) this (optionally) remove any data that can be shadowed (see commet on 'removeShadowedData' below for more details)
+ */
+public class RowAndDeletionMergeIterator extends AbstractUnfilteredRowIterator
+{
+    // For some of our Partition implementation, we can't guarantee that the deletion information (partition level
+    // deletion and range tombstones) don't shadow data in the rows. If that is the case, this class also take
+    // cares of skipping such shadowed data (since it is the contract of an UnfilteredRowIterator that it doesn't
+    // shadow its own data). Sometimes however, we know this can't happen, in which case we can skip that step.
+    private final boolean removeShadowedData;
+    private final Comparator<Clusterable> comparator;
+    private final ColumnFilter selection;
+
+    private final Iterator<Row> rows;
+    private Row nextRow;
+
+    private final Iterator<RangeTombstone> ranges;
+    private RangeTombstone nextRange;
+
+    // The currently open tombstone. Note that unless this is null, there is no point in checking nextRange.
+    private RangeTombstone openRange;
+
+    public RowAndDeletionMergeIterator(CFMetaData metadata,
+                                       DecoratedKey partitionKey,
+                                       DeletionTime partitionLevelDeletion,
+                                       ColumnFilter selection,
+                                       Row staticRow,
+                                       boolean isReversed,
+                                       RowStats stats,
+                                       Iterator<Row> rows,
+                                       Iterator<RangeTombstone> ranges,
+                                       boolean removeShadowedData)
+    {
+        super(metadata, partitionKey, partitionLevelDeletion, selection.fetchedColumns(), staticRow, isReversed, stats);
+        this.comparator = isReversed ? metadata.comparator.reversed() : metadata.comparator;
+        this.selection = selection;
+        this.removeShadowedData = removeShadowedData;
+        this.rows = rows;
+        this.ranges = ranges;
+    }
+
+    protected Unfiltered computeNext()
+    {
+        while (true)
+        {
+            updateNextRow();
+            if (nextRow == null)
+            {
+                if (openRange != null)
+                    return closeOpenedRange();
+
+                updateNextRange();
+                return nextRange == null ? endOfData() : openRange();
+            }
+
+            // We have a next row
+
+            if (openRange == null)
+            {
+                // We have no currently open tombstone range. So check if we have a next range and if it sorts before this row.
+                // If it does, the opening of that range should go first. Otherwise, the row goes first.
+                updateNextRange();
+                if (nextRange != null && comparator.compare(openBound(nextRange), nextRow.clustering()) < 0)
+                    return openRange();
+
+                Row row = consumeNextRow();
+                // it's possible for the row to be fully shadowed by the current range tombstone
+                if (row != null)
+                    return row;
+            }
+            else
+            {
+                // We have both a next row and a currently opened tombstone. Check which goes first between the range closing and the row.
+                if (comparator.compare(closeBound(openRange), nextRow.clustering()) < 0)
+                    return closeOpenedRange();
+
+                Row row = consumeNextRow();
+                if (row != null)
+                    return row;
+            }
+        }
+    }
+
+    private void updateNextRow()
+    {
+        if (nextRow == null && rows.hasNext())
+            nextRow = rows.next();
+    }
+
+    private void updateNextRange()
+    {
+        while (nextRange == null && ranges.hasNext())
+        {
+            nextRange = ranges.next();
+            if (removeShadowedData && partitionLevelDeletion().supersedes(nextRange.deletionTime()))
+                nextRange = null;
+        }
+    }
+
+    private Row consumeNextRow()
+    {
+        Row row = nextRow;
+        nextRow = null;
+        if (!removeShadowedData)
+            return row.filter(selection, metadata());
+
+        DeletionTime activeDeletion = openRange == null ? partitionLevelDeletion() : openRange.deletionTime();
+        return row.filter(selection, activeDeletion, false, metadata());
+    }
+
+    private RangeTombstone consumeNextRange()
+    {
+        RangeTombstone range = nextRange;
+        nextRange = null;
+        return range;
+    }
+
+    private RangeTombstone consumeOpenRange()
+    {
+        RangeTombstone range = openRange;
+        openRange = null;
+        return range;
+    }
+
+    private Slice.Bound openBound(RangeTombstone range)
+    {
+        return range.deletedSlice().open(isReverseOrder());
+    }
+
+    private Slice.Bound closeBound(RangeTombstone range)
+    {
+        return range.deletedSlice().close(isReverseOrder());
+    }
+
+    private RangeTombstoneMarker closeOpenedRange()
+    {
+        // Check if that close if actually a boundary between markers
+        updateNextRange();
+        RangeTombstoneMarker marker;
+        if (nextRange != null && comparator.compare(closeBound(openRange), openBound(nextRange)) == 0)
+        {
+            marker = RangeTombstoneBoundaryMarker.makeBoundary(isReverseOrder(), closeBound(openRange), openBound(nextRange), openRange.deletionTime(), nextRange.deletionTime());
+            openRange = consumeNextRange();
+        }
+        else
+        {
+            RangeTombstone toClose = consumeOpenRange();
+            marker = new RangeTombstoneBoundMarker(closeBound(toClose), toClose.deletionTime());
+        }
+        return marker;
+    }
+
+    private RangeTombstoneMarker openRange()
+    {
+        assert openRange == null && nextRange != null;
+        openRange = consumeNextRange();
+        return new RangeTombstoneBoundMarker(openBound(openRange), openRange.deletionTime());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
deleted file mode 100644
index 3d204d3..0000000
--- a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.util.Comparator;
-import java.util.Iterator;
-
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.apache.cassandra.db.*;
-
-public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltered> implements PeekingIterator<Unfiltered>
-{
-    private final Comparator<Clusterable> comparator;
-    private final boolean reversed;
-
-    private Iterator<Row> rowIter;
-    private Row nextRow;
-
-    private Iterator<RangeTombstone> tombstoneIter;
-    private RangeTombstone nextTombstone;
-    private boolean inTombstone;
-
-    private Unfiltered next;
-
-    public RowAndTombstoneMergeIterator(ClusteringComparator comparator, boolean reversed)
-    {
-        this.comparator = reversed ? comparator.reversed() : comparator;
-        this.reversed = reversed;
-    }
-
-    public RowAndTombstoneMergeIterator setTo(Iterator<Row> rowIter, Iterator<RangeTombstone> tombstoneIter)
-    {
-        this.rowIter = rowIter;
-        this.tombstoneIter = tombstoneIter;
-        this.nextRow = null;
-        this.nextTombstone = null;
-        this.next = null;
-        this.inTombstone = false;
-        return this;
-    }
-
-    public boolean isSet()
-    {
-        return rowIter != null;
-    }
-
-    private void prepareNext()
-    {
-        if (next != null)
-            return;
-
-        if (nextTombstone == null && tombstoneIter.hasNext())
-            nextTombstone = tombstoneIter.next();
-        if (nextRow == null && rowIter.hasNext())
-            nextRow = rowIter.next();
-
-        if (nextTombstone == null)
-        {
-            if (nextRow == null)
-                return;
-
-            next = nextRow;
-            nextRow = null;
-        }
-        else if (nextRow == null)
-        {
-            if (inTombstone)
-            {
-                RangeTombstone rt = nextTombstone;
-                nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null;
-                // An end and a start makes a boundary if they sort similarly
-                if (nextTombstone != null
-                        && comparator.compare(rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)) == 0)
-                {
-                    next = RangeTombstoneBoundaryMarker.makeBoundary(reversed,
-                                                                     rt.deletedSlice().close(reversed),
-                                                                     nextTombstone.deletedSlice().open(reversed),
-                                                                     rt.deletionTime(),
-                                                                     nextTombstone.deletionTime());
-                }
-                else
-                {
-                    inTombstone = false;
-                    next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime());
-                }
-            }
-            else
-            {
-                inTombstone = true;
-                next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime());
-            }
-        }
-        else if (inTombstone)
-        {
-            if (comparator.compare(nextTombstone.deletedSlice().close(reversed), nextRow.clustering()) < 0)
-            {
-                RangeTombstone rt = nextTombstone;
-                nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null;
-                if (nextTombstone != null
-                        && comparator.compare(rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)) == 0)
-                {
-                    next = RangeTombstoneBoundaryMarker.makeBoundary(reversed,
-                                                                     rt.deletedSlice().close(reversed),
-                                                                     nextTombstone.deletedSlice().open(reversed),
-                                                                     rt.deletionTime(),
-                                                                     nextTombstone.deletionTime());
-                }
-                else
-                {
-                    inTombstone = false;
-                    next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime());
-                }
-            }
-            else
-            {
-                next = nextRow;
-                nextRow = null;
-            }
-        }
-        else
-        {
-            if (comparator.compare(nextTombstone.deletedSlice().open(reversed), nextRow.clustering()) < 0)
-            {
-                inTombstone = true;
-                next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime());
-            }
-            else
-            {
-                next = nextRow;
-                nextRow = null;
-            }
-        }
-    }
-
-    public boolean hasNext()
-    {
-        prepareNext();
-        return next != null;
-    }
-
-    public Unfiltered next()
-    {
-        prepareNext();
-        Unfiltered toReturn = next;
-        next = null;
-        return toReturn;
-    }
-
-    public Unfiltered peek()
-    {
-        prepareNext();
-        return next();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowDataBlock.java b/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
deleted file mode 100644
index b1e2b13..0000000
--- a/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * A {@code RowDataBlock} holds data for one or more row (of a given table). More precisely, it contains
- * cell data  and complex deletion data (for complex columns) and allow access to this data. Please note
- * however that {@code RowDataBlock} only holds the data inside the row, it does not hold the data
- * pertaining to the row itself: clustering, partition key liveness info and row deletion.
- * <p>
- * {@code RowDataBlock} is largely an implementation detail: it is only there to be reused by
- * {@link AbstractPartitionData} and every concrete row implementation.
- */
-public class RowDataBlock
-{
-    private static final Logger logger = LoggerFactory.getLogger(RowDataBlock.class);
-
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new RowDataBlock(Columns.NONE, 0, false, false));
-
-    // We distinguish 2 sub-objects: SimpleRowDataBlock that contains the data for the simple columns only,
-    // and ComplexRowDataBlock that only contains data for complex columns. The reason for having 2 separate
-    // objects is that simple columns are much easier to handle since we have only a single cell per-object
-    // and thus having a more specialized object allow a simpler and more efficient handling.
-    final SimpleRowDataBlock simpleData;
-    final ComplexRowDataBlock complexData;
-
-    public RowDataBlock(Columns columns, int rows, boolean sortable, boolean isCounter)
-    {
-        this.simpleData = columns.hasSimple() ? new SimpleRowDataBlock(columns, rows, isCounter) : null;
-        this.complexData = columns.hasComplex() ? ComplexRowDataBlock.create(columns, rows, sortable, isCounter) : null;
-    }
-
-    public Columns columns()
-    {
-        if (simpleData != null)
-            return simpleData.columns();
-        if (complexData != null)
-            return complexData.columns();
-        return Columns.NONE;
-    }
-
-    /**
-     * Return the cell value for a given column of a given row.
-     *
-     * @param row the row for which to return the cell value.
-     * @param column the column for which to return the cell value.
-     * @param path the cell path for which to return the cell value. Can be null for
-     * simple columns.
-     *
-     * @return the value of the cell of path {@code path} for {@code column} in row {@code row}, or
-     * {@code null} if their is no such cell.
-     */
-    public ByteBuffer getValue(int row, ColumnDefinition column, CellPath path)
-    {
-        if (column.isComplex())
-        {
-            return complexData.getValue(row, column, path);
-        }
-        else
-        {
-            int idx = columns().simpleIdx(column, 0);
-            assert idx >= 0;
-            return simpleData.data.value((row * columns().simpleColumnCount()) + idx);
-        }
-    }
-
-    /**
-     * Sets the cell value for a given simple column of a given row.
-     *
-     * @param row the row for which to set the cell value.
-     * @param column the simple column for which to set the cell value.
-     * @param path the cell path for which to return the cell value. Can be null for
-     * simple columns.
-     * @param value the value to set.
-     */
-    public void setValue(int row, ColumnDefinition column, CellPath path, ByteBuffer value)
-    {
-        if (column.isComplex())
-        {
-            complexData.setValue(row, column, path, value);
-        }
-        else
-        {
-            int idx = columns().simpleIdx(column, 0);
-            assert idx >= 0;
-            simpleData.data.setValue((row * columns().simpleColumnCount()) + idx, value);
-        }
-    }
-
-    public static ReusableIterator reusableIterator()
-    {
-        return new ReusableIterator();
-    }
-
-    // Swap row i and j
-    public void swap(int i, int j)
-    {
-        if (simpleData != null)
-            simpleData.swap(i, j);
-        if (complexData != null)
-            complexData.swap(i, j);
-    }
-
-    // Merge row i into j
-    public void merge(int i, int j, int nowInSec)
-    {
-        if (simpleData != null)
-            simpleData.merge(i, j, nowInSec);
-        if (complexData != null)
-            complexData.merge(i, j, nowInSec);
-    }
-
-    // Move row i into j
-    public void move(int i, int j)
-    {
-        if (simpleData != null)
-            simpleData.move(i, j);
-        if (complexData != null)
-            complexData.move(i, j);
-    }
-
-    public boolean hasComplexDeletion(int row)
-    {
-        return complexData != null && complexData.hasComplexDeletion(row);
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        return EMPTY_SIZE
-             + (simpleData == null ? 0 : simpleData.unsharedHeapSizeExcludingData())
-             + (complexData == null ? 0 : complexData.unsharedHeapSizeExcludingData());
-    }
-
-    public static int computeNewCapacity(int currentCapacity, int idxToSet)
-    {
-        int newCapacity = currentCapacity == 0 ? 4 : currentCapacity;
-        while (idxToSet >= newCapacity)
-            newCapacity = 1 + (newCapacity * 3) / 2;
-        return newCapacity;
-    }
-
-    public int dataSize()
-    {
-        return (simpleData == null ? 0 : simpleData.dataSize())
-             + (complexData == null ? 0 : complexData.dataSize());
-    }
-
-    public void clear()
-    {
-        if (simpleData != null)
-            simpleData.clear();
-        if (complexData != null)
-            complexData.clear();
-    }
-
-    public abstract static class Writer implements Row.Writer
-    {
-        private final boolean inOrderCells;
-
-        protected int row;
-
-        protected SimpleRowDataBlock.CellWriter simpleWriter;
-        protected ComplexRowDataBlock.CellWriter complexWriter;
-
-        protected Writer(boolean inOrderCells)
-        {
-            this.inOrderCells = inOrderCells;
-        }
-
-        protected Writer(RowDataBlock data, boolean inOrderCells)
-        {
-            this(inOrderCells);
-            updateWriter(data);
-        }
-
-        protected void updateWriter(RowDataBlock data)
-        {
-            this.simpleWriter = data.simpleData == null ? null : data.simpleData.cellWriter(inOrderCells);
-            this.complexWriter = data.complexData == null ? null : data.complexData.cellWriter(inOrderCells);
-        }
-
-        public Writer reset()
-        {
-            row = 0;
-
-            if (simpleWriter != null)
-                simpleWriter.reset();
-            if (complexWriter != null)
-                complexWriter.reset();
-
-            return this;
-        }
-
-        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
-        {
-            if (column.isComplex())
-                complexWriter.addCell(column, value, info, path);
-            else
-                simpleWriter.addCell(column, value, info);
-        }
-
-        public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion)
-        {
-            if (complexDeletion.isLive())
-                return;
-
-            complexWriter.setComplexDeletion(c, complexDeletion);
-        }
-
-        public void endOfRow()
-        {
-            ++row;
-            if (simpleWriter != null)
-                simpleWriter.endOfRow();
-            if (complexWriter != null)
-                complexWriter.endOfRow();
-        }
-    }
-
-    static class ReusableIterator extends UnmodifiableIterator<Cell> implements Iterator<Cell>
-    {
-        private SimpleRowDataBlock.ReusableIterator simpleIterator;
-        private ComplexRowDataBlock.ReusableIterator complexIterator;
-
-        public ReusableIterator()
-        {
-            this.simpleIterator = SimpleRowDataBlock.reusableIterator();
-            this.complexIterator = ComplexRowDataBlock.reusableIterator();
-        }
-
-        public ReusableIterator setTo(RowDataBlock dataBlock, int row)
-        {
-            simpleIterator.setTo(dataBlock.simpleData, row);
-            complexIterator.setTo(dataBlock.complexData, row);
-            return this;
-        }
-
-        public boolean hasNext()
-        {
-            return simpleIterator.hasNext() || complexIterator.hasNext();
-        }
-
-        public Cell next()
-        {
-            return simpleIterator.hasNext() ? simpleIterator.next() : complexIterator.next();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
new file mode 100644
index 0000000..50d6d32
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+
+/**
+ * Interface that allows to act on the result of merging multiple rows.
+ *
+ * More precisely, given N rows and the result of merging them, one can call {@link Rows#diff()}
+ * with a {@code RowDiffListener} and that listener will be informed for each input row of the diff between
+ * that input and merge row.
+ */
+public interface RowDiffListener
+{
+    /**
+     * Called for the row primary key liveness info of input {@code i}.
+     *
+     * @param i the input row from which {@code original} is from.
+     * @param clustering the clustering for the row that is merged.
+     * @param merged the primary key liveness info of the merged row. Will be {@code null} if input {@code i} had
+     * a {@code LivenessInfo}, but the merged result don't (i.e. the original info has been shadowed/deleted).
+     * @param original the primary key liveness info of input {@code i}. May be {@code null} if input {@code i}
+     * has not primary key liveness info (i.e. it has {@code LivenessInfo.NONE}) but the merged result has.
+     */
+    public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original);
+
+    /**
+     * Called for the row deletion of input {@code i}.
+     *
+     * @param i the input row from which {@code original} is from.
+     * @param clustering the clustering for the row that is merged.
+     * @param merged the deletion of the merged row. Will be {@code null} if input {@code i} had deletion
+     * but the merged result doesn't (i.e. the deletion has been shadowed).
+     * @param original the deletion of input {@code i}. May be {@code null} if input {@code i} had no deletion but the merged row has.
+     */
+    public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original);
+
+    /**
+     * Called for every (non-live) complex deletion of any complex column present in either the merged row of input {@code i}.
+     *
+     * @param i the input row from which {@code original} is from.
+     * @param clustering the clustering for the row that is merged.
+     * @param column the column for which this is a complex deletion of.
+     * @param merged the complex deletion of the merged row. Will be {@code null} if input {@code i} had a complex deletion
+     * for {@code column} but the merged result doesn't (i.e. the deletion has been shadowed).
+     * @param original the complex deletion of input {@code i} for column {@code column}. May be {@code null} if input {@code i}
+     * had no complex deletion but the merged row has.
+     */
+    public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original);
+
+    /**
+     * Called for any cell that is either in the merged row or in input {@code i}.
+     *
+     * @param i the input row from which {@code original} is from.
+     * @param clustering the clustering for the row that is merged.
+     * @param merged the cell of the merged row. Will be {@code null} if input {@code i} had a cell but that cell is no present
+     * in the mergd result (it has been deleted/shadowed).
+     * @param original the cell of input {@code i}. May be {@code null} if input {@code i} had cell corresponding to {@code merged}.
+     */
+    public void onCell(int i, Clustering clustering, Cell merged, Cell original);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index a3bd913..766cf19 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -37,19 +36,6 @@ public abstract class RowIterators
 
     private RowIterators() {}
 
-    public static PartitionUpdate toUpdate(RowIterator iterator)
-    {
-        PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1);
-
-        if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
-            iterator.staticRow().copyTo(update.staticWriter());
-
-        while (iterator.hasNext())
-            iterator.next().copyTo(update.writer());
-
-        return update;
-    }
-
     public static void digest(RowIterator iterator, MessageDigest digest)
     {
         // TODO: we're not computing digest the same way that old nodes so we'll need
@@ -123,11 +109,11 @@ public abstract class RowIterators
     {
         CFMetaData metadata = iterator.metadata();
         logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}",
-                    new Object[]{ id,
-                                  metadata.ksName,
-                                  metadata.cfName,
-                                  metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
-                                  iterator.isReverseOrder() });
+                    id,
+                    metadata.ksName,
+                    metadata.cfName,
+                    metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+                    iterator.isReverseOrder());
 
         return new WrappingRowIterator(iterator)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowStats.java b/src/java/org/apache/cassandra/db/rows/RowStats.java
index c672490..5b0d3bd 100644
--- a/src/java/org/apache/cassandra/db/rows/RowStats.java
+++ b/src/java/org/apache/cassandra/db/rows/RowStats.java
@@ -17,17 +17,17 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.Objects;
 
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 import static org.apache.cassandra.db.LivenessInfo.NO_TIMESTAMP;
 import static org.apache.cassandra.db.LivenessInfo.NO_TTL;
-import static org.apache.cassandra.db.LivenessInfo.NO_DELETION_TIME;
+import static org.apache.cassandra.db.LivenessInfo.NO_EXPIRATION_TIME;
 
 /**
  * General statistics on rows (and and tombstones) for a given source.
@@ -45,7 +45,7 @@ import static org.apache.cassandra.db.LivenessInfo.NO_DELETION_TIME;
 public class RowStats
 {
     // We should use this sparingly obviously
-    public static final RowStats NO_STATS = new RowStats(NO_TIMESTAMP, NO_DELETION_TIME, NO_TTL, -1);
+    public static final RowStats NO_STATS = new RowStats(NO_TIMESTAMP, NO_EXPIRATION_TIME, NO_TTL, -1);
 
     public static final Serializer serializer = new Serializer();
 
@@ -74,7 +74,7 @@ public class RowStats
 
     public boolean hasMinLocalDeletionTime()
     {
-        return minLocalDeletionTime != NO_DELETION_TIME;
+        return minLocalDeletionTime != NO_EXPIRATION_TIME;
     }
 
     /**
@@ -89,9 +89,9 @@ public class RowStats
                           ? that.minTimestamp
                           : (that.minTimestamp == NO_TIMESTAMP ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
 
-        int minDelTime = this.minLocalDeletionTime == NO_DELETION_TIME
+        int minDelTime = this.minLocalDeletionTime == NO_EXPIRATION_TIME
                        ? that.minLocalDeletionTime
-                       : (that.minLocalDeletionTime == NO_DELETION_TIME ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
+                       : (that.minLocalDeletionTime == NO_EXPIRATION_TIME ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
 
         int minTTL = this.minTTL == NO_TTL
                    ? that.minTTL
@@ -132,7 +132,7 @@ public class RowStats
         return String.format("RowStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
     }
 
-    public static class Collector
+    public static class Collector implements PartitionStatisticsCollector
     {
         private boolean isTimestampSet;
         private long minTimestamp = Long.MAX_VALUE;
@@ -147,6 +147,27 @@ public class RowStats
         private long totalColumnsSet;
         private long rows;
 
+        public void update(LivenessInfo info)
+        {
+            if (info.isEmpty())
+                return;
+
+            updateTimestamp(info.timestamp());
+
+            if (info.isExpiring())
+            {
+                updateTTL(info.ttl());
+                updateLocalDeletionTime(info.localExpirationTime());
+            }
+        }
+
+        public void update(Cell cell)
+        {
+            updateTimestamp(cell.timestamp());
+            updateTTL(cell.ttl());
+            updateLocalDeletionTime(cell.localDeletionTime());
+        }
+
         public void updateTimestamp(long timestamp)
         {
             if (timestamp == NO_TIMESTAMP)
@@ -158,14 +179,14 @@ public class RowStats
 
         public void updateLocalDeletionTime(int deletionTime)
         {
-            if (deletionTime == NO_DELETION_TIME)
+            if (deletionTime == NO_EXPIRATION_TIME)
                 return;
 
             isDelTimeSet = true;
             minDeletionTime = Math.min(minDeletionTime, deletionTime);
         }
 
-        public void updateDeletionTime(DeletionTime deletionTime)
+        public void update(DeletionTime deletionTime)
         {
             if (deletionTime.isLive())
                 return;
@@ -183,7 +204,7 @@ public class RowStats
             minTTL = Math.min(minTTL, ttl);
         }
 
-        public void updateColumnSetPerRow(int columnSetInRow)
+        public void updateColumnSetPerRow(long columnSetInRow)
         {
             updateColumnSetPerRow(columnSetInRow, 1);
         }
@@ -198,12 +219,17 @@ public class RowStats
             this.rows += rows;
         }
 
+        public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
+        {
+            // We don't care about this but this come with PartitionStatisticsCollector
+        }
+
         public RowStats get()
         {
             return new RowStats(isTimestampSet ? minTimestamp : NO_TIMESTAMP,
-                                 isDelTimeSet ? minDeletionTime : NO_DELETION_TIME,
-                                 isTTLSet ? minTTL : NO_TTL,
-                                 isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
+                                isDelTimeSet ? minDeletionTime : NO_EXPIRATION_TIME,
+                                isTTLSet ? minTTL : NO_TTL,
+                                isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
         }
     }
 
@@ -211,26 +237,26 @@ public class RowStats
     {
         public void serialize(RowStats stats, DataOutputPlus out) throws IOException
         {
-            out.writeLong(stats.minTimestamp);
-            out.writeInt(stats.minLocalDeletionTime);
-            out.writeInt(stats.minTTL);
-            out.writeInt(stats.avgColumnSetPerRow);
+            out.writeVInt(stats.minTimestamp);
+            out.writeVInt(stats.minLocalDeletionTime);
+            out.writeVInt(stats.minTTL);
+            out.writeVInt(stats.avgColumnSetPerRow);
         }
 
         public int serializedSize(RowStats stats)
         {
-            return TypeSizes.sizeof(stats.minTimestamp)
-                 + TypeSizes.sizeof(stats.minLocalDeletionTime)
-                 + TypeSizes.sizeof(stats.minTTL)
-                 + TypeSizes.sizeof(stats.avgColumnSetPerRow);
+            return TypeSizes.sizeofVInt(stats.minTimestamp)
+                 + TypeSizes.sizeofVInt(stats.minLocalDeletionTime)
+                 + TypeSizes.sizeofVInt(stats.minTTL)
+                 + TypeSizes.sizeofVInt(stats.avgColumnSetPerRow);
         }
 
-        public RowStats deserialize(DataInput in) throws IOException
+        public RowStats deserialize(DataInputPlus in) throws IOException
         {
-            long minTimestamp = in.readLong();
-            int minLocalDeletionTime = in.readInt();
-            int minTTL = in.readInt();
-            int avgColumnSetPerRow = in.readInt();
+            long minTimestamp = in.readVInt();
+            int minLocalDeletionTime = (int)in.readVInt();
+            int minTTL = (int)in.readVInt();
+            int avgColumnSetPerRow = (int)in.readVInt();
             return new RowStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
         }
     }


Mime
View raw message