cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] cassandra git commit: Adds "shadowable" row tombstones for MV timestamp issues
Date Fri, 11 Sep 2015 15:26:18 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk c59a352e1 -> 43e13d908


Adds "shadowable" row tombstones for MV timestamp issues

The patch allow row tombstones to be either regular (as before this
patch) or shadowable. If a row tombstone is shadowable, it only applies to the row
and its content as long as the row timestamp is less than the tombstone
timestamp. As soon as the row is updated with a timestamp greater than
the one of the shadowable tombstone, that tombstone is removed, even if some
cells inside the row may have older timestamps.

Patch by Sylvain Lebresne and tjake; reviewed by Tyler Hobbs for CASSANDRA-10261


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/665f7474
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/665f7474
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/665f7474

Branch: refs/heads/trunk
Commit: 665f74740ce48bf476a9941793d44d9c588eca3c
Parents: 30f39a2
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Sep 9 11:42:24 2015 +0200
Committer: T Jake Luciani <jake@apache.org>
Committed: Fri Sep 11 11:23:58 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cql3/UpdateParameters.java |   2 +-
 .../apache/cassandra/db/ClusteringPrefix.java   |   4 +-
 .../org/apache/cassandra/db/LegacyLayout.java   |   4 +-
 .../apache/cassandra/db/RowUpdateBuilder.java   |   2 +-
 .../cassandra/db/UnfilteredDeserializer.java    |   9 +-
 .../db/partitions/AbstractBTreePartition.java   |   2 +-
 .../apache/cassandra/db/rows/AbstractRow.java   |   2 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  56 +++++---
 src/java/org/apache/cassandra/db/rows/Row.java  | 143 ++++++++++++++++++-
 .../cassandra/db/rows/RowDiffListener.java      |   2 +-
 src/java/org/apache/cassandra/db/rows/Rows.java |  16 ++-
 .../cassandra/db/rows/UnfilteredSerializer.java |  92 ++++++++----
 .../cassandra/db/view/MaterializedView.java     |   8 +-
 .../apache/cassandra/db/view/TemporalRow.java   |   4 +-
 .../cassandra/index/SecondaryIndexManager.java  |   4 +-
 .../index/internal/CassandraIndex.java          |   6 +-
 .../apache/cassandra/service/DataResolver.java  |   2 +-
 .../cassandra/thrift/CassandraServer.java       |   2 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |   4 +-
 .../cassandra/cql3/MaterializedViewTest.java    | 119 +++++++++++++++
 .../rows/UnfilteredRowIteratorsMergeTest.java   |   4 +-
 .../index/internal/CustomCassandraIndex.java    |   6 +-
 23 files changed, 403 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 124b797..3397369 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Add "shadowable" row tombstones to deal with mv timestamp issues (CASSANDRA-10261)
  * CFS.loadNewSSTables() broken for pre-3.0 sstables
  * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
  * Small optimizations of sstable index serialization (CASSANDRA-10232)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index dbcf803..03468f0 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -153,7 +153,7 @@ public class UpdateParameters
         if (metadata.isCompactTable() && builder.clustering() != Clustering.STATIC_CLUSTERING)
             addTombstone(metadata.compactValueColumn());
         else
-            builder.addRowDeletion(deletionTime);
+            builder.addRowDeletion(Row.Deletion.regular(deletionTime));
     }
 
     public void addTombstone(ColumnDefinition column) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 713ad1b..9477651 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -416,9 +416,9 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
             this.serializationHeader = header;
         }
 
-        public void prepare(int flags) throws IOException
+        public void prepare(int flags, int extendedFlags) throws IOException
         {
-            assert !UnfilteredSerializer.isStatic(flags) : "Flags = " + flags;
+            assert !UnfilteredSerializer.isStatic(extendedFlags) : "Flags = " + flags;
             this.nextIsRow = UnfilteredSerializer.kind(flags) == Unfiltered.Kind.ROW;
             this.nextKind = nextIsRow ? Kind.CLUSTERING : ClusteringPrefix.Kind.values()[in.readByte()];
             this.nextSize = nextIsRow ? comparator.size() : in.readUnsignedShort();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index d73d9cb..2719105 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -682,7 +682,7 @@ public abstract class LegacyLayout
             LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, null);
             LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, null);
 
-            deletions.add(start, end, row.deletion().markedForDeleteAt(), row.deletion().localDeletionTime());
+            deletions.add(start, end, row.deletion().time().markedForDeleteAt(), row.deletion().time().localDeletionTime());
         }
 
         for (ColumnData cd : row)
@@ -1155,7 +1155,7 @@ public abstract class LegacyLayout
 
                 clustering = tombstone.start.getAsClustering(metadata);
                 builder.newRow(clustering);
-                builder.addRowDeletion(tombstone.deletionTime);
+                builder.addRowDeletion(Row.Deletion.regular(tombstone.deletionTime));
                 rowDeletion = tombstone;
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index 8e6ce3e..8ace988 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -194,7 +194,7 @@ public class RowUpdateBuilder
             builder.newRow(Clustering.STATIC_CLUSTERING);
         else
             builder.newRow(clusteringValues.length == 0 ? Clustering.EMPTY : update.metadata().comparator.make(clusteringValues));
-        builder.addRowDeletion(new DeletionTime(timestamp, localDeletionTime));
+        builder.addRowDeletion(Row.Deletion.regular(new DeletionTime(timestamp, localDeletionTime)));
 
         update.add(builder.build());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index d47da3c..5c76c63 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -108,6 +108,7 @@ public abstract class UnfilteredDeserializer
         private final SerializationHeader header;
 
         private int nextFlags;
+        private int nextExtendedFlags;
         private boolean isReady;
         private boolean isDone;
 
@@ -146,7 +147,9 @@ public abstract class UnfilteredDeserializer
                 return;
             }
 
-            clusteringDeserializer.prepare(nextFlags);
+            nextExtendedFlags = UnfilteredSerializer.isExtended(nextFlags) ? in.readUnsignedByte() : 0;
+
+            clusteringDeserializer.prepare(nextFlags, nextExtendedFlags);
             isReady = true;
         }
 
@@ -185,7 +188,7 @@ public abstract class UnfilteredDeserializer
             else
             {
                 builder.newRow(clusteringDeserializer.deserializeNextClustering());
-                return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, builder);
+                return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, nextExtendedFlags, builder);
             }
         }
 
@@ -199,7 +202,7 @@ public abstract class UnfilteredDeserializer
             }
             else
             {
-                UnfilteredSerializer.serializer.skipRowBody(in, header, nextFlags);
+                UnfilteredSerializer.serializer.skipRowBody(in, header, nextFlags, nextExtendedFlags);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index da20c91..b7fa691 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -160,7 +160,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
                     activeDeletion = rt.deletionTime();
 
                 if (row == null)
-                    return activeDeletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, activeDeletion);
+                    return activeDeletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
 
                 return row.filter(columns, activeDeletion, true, metadata);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 555146e..8958575 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -78,7 +78,7 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
         }
 
         primaryKeyLivenessInfo().validate();
-        if (deletion().localDeletionTime() < 0)
+        if (deletion().time().localDeletionTime() < 0)
             throw new MarshalException("A local deletion time should not be negative");
 
         for (ColumnData cd : this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index af09cb5..1bc1162 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -48,7 +48,7 @@ public class BTreeRow extends AbstractRow
 
     private final Clustering clustering;
     private final LivenessInfo primaryKeyLivenessInfo;
-    private final DeletionTime deletion;
+    private final Deletion deletion;
 
     // The data for each columns present in this row in column sorted order.
     private final Object[] btree;
@@ -62,8 +62,9 @@ public class BTreeRow extends AbstractRow
     // no expiring cells, this will be Integer.MAX_VALUE;
     private final int minLocalDeletionTime;
 
-    private BTreeRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int minLocalDeletionTime)
+    private BTreeRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree, int minLocalDeletionTime)
     {
+        assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
         this.clustering = clustering;
         this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
         this.deletion = deletion;
@@ -71,10 +72,15 @@ public class BTreeRow extends AbstractRow
         this.minLocalDeletionTime = minLocalDeletionTime;
     }
 
+    private BTreeRow(Clustering clustering, Object[] btree, int minLocalDeletionTime)
+    {
+        this(clustering, LivenessInfo.EMPTY, Deletion.LIVE, btree, minLocalDeletionTime);
+    }
+
     // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
-    public static BTreeRow create(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree)
+    public static BTreeRow create(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree)
     {
-        int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+        int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time()));
         if (minDeletionTime != Integer.MIN_VALUE)
         {
             for (ColumnData cd : BTree.<ColumnData>iterable(btree))
@@ -86,19 +92,19 @@ public class BTreeRow extends AbstractRow
 
     public static BTreeRow emptyRow(Clustering clustering)
     {
-        return new BTreeRow(clustering, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE);
+        return new BTreeRow(clustering, BTree.empty(), Integer.MAX_VALUE);
     }
 
     public static BTreeRow singleCellRow(Clustering clustering, Cell cell)
     {
         if (cell.column().isSimple())
-            return new BTreeRow(clustering, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), minDeletionTime(cell));
+            return new BTreeRow(clustering, BTree.singleton(cell), minDeletionTime(cell));
 
         ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
-        return new BTreeRow(clustering, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), minDeletionTime(cell));
+        return new BTreeRow(clustering, BTree.singleton(complexData), minDeletionTime(cell));
     }
 
-    public static BTreeRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
+    public static BTreeRow emptyDeletedRow(Clustering clustering, Deletion deletion)
     {
         assert !deletion.isLive();
         return new BTreeRow(clustering, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE);
@@ -107,7 +113,7 @@ public class BTreeRow extends AbstractRow
     public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
     {
         assert !primaryKeyLivenessInfo.isEmpty();
-        return new BTreeRow(clustering, primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+        return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
     }
 
     private static int minDeletionTime(Cell cell)
@@ -176,7 +182,7 @@ public class BTreeRow extends AbstractRow
                && BTree.isEmpty(btree);
     }
 
-    public DeletionTime deletion()
+    public Deletion deletion()
     {
         return deletion;
     }
@@ -234,17 +240,17 @@ public class BTreeRow extends AbstractRow
         if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
             return this;
 
-        boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
+        boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time());
 
         LivenessInfo newInfo = primaryKeyLivenessInfo;
-        DeletionTime newDeletion = deletion;
+        Deletion newDeletion = deletion;
         if (mayHaveShadowed)
         {
             if (activeDeletion.deletes(newInfo.timestamp()))
                 newInfo = LivenessInfo.EMPTY;
             // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
             // the row deletion is shadowed and we shouldn't return it.
-            newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
+            newDeletion = setActiveDeletionToRow ? Deletion.regular(activeDeletion) : Deletion.LIVE;
         }
 
         Columns columns = filter.fetchedColumns().columns(isStatic());
@@ -307,7 +313,11 @@ public class BTreeRow extends AbstractRow
     public Row updateAllTimestamp(long newTimestamp)
     {
         LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
-        DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
+        // If the deletion is shadowable and the row has a timestamp, we'll forced the deletion timestamp to be less than the row one, so we
+        // should get rid of said deletion.
+        Deletion newDeletion = deletion.isLive() || (deletion.isShadowable() && !primaryKeyLivenessInfo.isEmpty())
+                             ? Deletion.LIVE
+                             : new Deletion(new DeletionTime(newTimestamp - 1, deletion.time().localDeletionTime()), deletion.isShadowable());
 
         return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp));
     }
@@ -318,12 +328,12 @@ public class BTreeRow extends AbstractRow
             return this;
 
         LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
-        DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
+        Deletion newDeletion = purger.shouldPurge(deletion.time()) ? Deletion.LIVE : deletion;
 
         return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec));
     }
 
-    private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, Function<ColumnData, ColumnData> function)
+    private Row transformAndFilter(LivenessInfo info, Deletion deletion, Function<ColumnData, ColumnData> function)
     {
         Object[] transformed = BTree.transformAndFilter(btree, function);
 
@@ -333,7 +343,7 @@ public class BTreeRow extends AbstractRow
         if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed))
             return null;
 
-        int minDeletionTime = minDeletionTime(transformed, info, deletion);
+        int minDeletionTime = minDeletionTime(transformed, info, deletion.time());
         return new BTreeRow(clustering, info, deletion, transformed, minDeletionTime);
     }
 
@@ -523,7 +533,7 @@ public class BTreeRow extends AbstractRow
         };
         protected Clustering clustering;
         protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
-        protected DeletionTime deletion = DeletionTime.LIVE;
+        protected Deletion deletion = Deletion.LIVE;
 
         private final boolean isSorted;
         private final BTree.Builder<Cell> cells;
@@ -565,7 +575,7 @@ public class BTreeRow extends AbstractRow
         {
             this.clustering = null;
             this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
-            this.deletion = DeletionTime.LIVE;
+            this.deletion = Deletion.LIVE;
             this.cells.reuse();
         }
 
@@ -574,7 +584,7 @@ public class BTreeRow extends AbstractRow
             this.primaryKeyLivenessInfo = info;
         }
 
-        public void addRowDeletion(DeletionTime deletion)
+        public void addRowDeletion(Deletion deletion)
         {
             this.deletion = deletion;
         }
@@ -601,7 +611,11 @@ public class BTreeRow extends AbstractRow
             if (!isSorted | hasComplex)
                 cells.resolve(resolver);
             Object[] btree = cells.build();
-            int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion);
+
+            if (deletion.isShadowedBy(primaryKeyLivenessInfo))
+                deletion = Deletion.LIVE;
+
+            int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion.time());
             Row row = new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
             reset();
             return row;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index a351306..a80325f 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -18,12 +18,14 @@
 package org.apache.cassandra.db.rows;
 
 import java.util.*;
+import java.security.MessageDigest;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
@@ -64,7 +66,7 @@ public interface Row extends Unfiltered, Collection<ColumnData>
      *
      * @return the row deletion.
      */
-    public DeletionTime deletion();
+    public Deletion deletion();
 
     /**
      * Liveness information for the primary key columns of this row.
@@ -219,6 +221,130 @@ public interface Row extends Unfiltered, Collection<ColumnData>
     public String toString(CFMetaData metadata, boolean fullDetails);
 
     /**
+     * A row deletion/tombstone.
+     * <p>
+     * A row deletion mostly consists of the time of said deletion, but there is 2 variants: shadowable
+     * and regular row deletion.
+     * <p>
+     * A shadowable row deletion only exists if the row timestamp ({@code primaryKeyLivenessInfo().timestamp()})
+     * is lower than the deletion timestamp. That is, if a row has a shadowable deletion with timestamp A and an update is made
+     * to that row with a timestamp B such that B > A, then the shadowable deletion is 'shadowed' by that update. A concrete
+     * consequence is that if said update has cells with timestamp lower than A, then those cells are preserved
+     * (since the deletion is removed), and this contrarily to a normal (regular) deletion where the deletion is preserved
+     * and such cells are removed.
+     * <p>
+     * Currently, the only use of shadowable row deletions is Materialized Views, see CASSANDRA-10261.
+     */
+    public static class Deletion
+    {
+        public static final Deletion LIVE = new Deletion(DeletionTime.LIVE, false);
+
+        private final DeletionTime time;
+        private final boolean isShadowable;
+
+        public Deletion(DeletionTime time, boolean isShadowable)
+        {
+            assert !time.isLive() || !isShadowable;
+            this.time = time;
+            this.isShadowable = isShadowable;
+        }
+
+        public static Deletion regular(DeletionTime time)
+        {
+            return time.isLive() ? LIVE : new Deletion(time, false);
+        }
+
+        public static Deletion shadowable(DeletionTime time)
+        {
+            return new Deletion(time, true);
+        }
+
+        /**
+         * The time of the row deletion.
+         *
+         * @return the time of the row deletion.
+         */
+        public DeletionTime time()
+        {
+            return time;
+        }
+
+        /**
+         * Whether the deletion is a shadowable one or not.
+         *
+         * @return whether the deletion is a shadowable one. Note that if {@code isLive()}, then this is
+         * guarantee to return {@code false}.
+         */
+        public boolean isShadowable()
+        {
+            return isShadowable;
+        }
+
+        /**
+         * Wether the deletion is live or not, that is if its an actual deletion or not.
+         *
+         * @return {@code true} if this represents no deletion of the row, {@code false} if that's an actual
+         * deletion.
+         */
+        public boolean isLive()
+        {
+            return time().isLive();
+        }
+
+        public boolean supersedes(DeletionTime that)
+        {
+            return time.supersedes(that);
+        }
+
+        public boolean supersedes(Deletion that)
+        {
+            return time.supersedes(that.time);
+        }
+
+        public boolean isShadowedBy(LivenessInfo primaryKeyLivenessInfo)
+        {
+            return isShadowable && primaryKeyLivenessInfo.timestamp() > time.markedForDeleteAt();
+        }
+
+        public boolean deletes(LivenessInfo info)
+        {
+            return time.deletes(info);
+        }
+
+        public void digest(MessageDigest digest)
+        {
+            time.digest(digest);
+            FBUtilities.updateWithBoolean(digest, isShadowable);
+        }
+
+        public int dataSize()
+        {
+            return time.dataSize() + 1;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if(!(o instanceof Deletion))
+                return false;
+            Deletion that = (Deletion)o;
+            return this.time.equals(that.time) && this.isShadowable == that.isShadowable;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            return Objects.hash(time, isShadowable);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s%s", time, isShadowable ? "(shadowable)" : "");
+        }
+    }
+
+    /**
      * Interface for building rows.
      * <p>
      * The builder of a row should always abid to the following rules:
@@ -280,9 +406,9 @@ public interface Row extends Unfiltered, Collection<ColumnData>
          *
          * This call is optional and can be skipped if the row is not deleted.
          *
-         * @param deletion the row deletion time, or {@code DeletionTime.LIVE} if the row isn't deleted.
+         * @param deletion the row deletion time, or {@code Deletion.LIVE} if the row isn't deleted.
          */
-        public void addRowDeletion(DeletionTime deletion);
+        public void addRowDeletion(Deletion deletion);
 
         /**
          * Adds a cell to this builder.
@@ -358,7 +484,7 @@ public interface Row extends Unfiltered, Collection<ColumnData>
             }
 
             LivenessInfo rowInfo = LivenessInfo.EMPTY;
-            DeletionTime rowDeletion = DeletionTime.LIVE;
+            Deletion rowDeletion = Deletion.LIVE;
             for (Row row : rows)
             {
                 if (row == null)
@@ -370,10 +496,13 @@ public interface Row extends Unfiltered, Collection<ColumnData>
                     rowDeletion = row.deletion();
             }
 
-            if (activeDeletion.supersedes(rowDeletion))
-                rowDeletion = DeletionTime.LIVE;
+            if (rowDeletion.isShadowedBy(rowInfo))
+                rowDeletion = Deletion.LIVE;
+
+            if (rowDeletion.supersedes(activeDeletion))
+                activeDeletion = rowDeletion.time();
             else
-                activeDeletion = rowDeletion;
+                rowDeletion = Deletion.LIVE;
 
             if (activeDeletion.deletes(rowInfo))
                 rowInfo = LivenessInfo.EMPTY;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
index 50d6d32..f209bfc 100644
--- a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
+++ b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
@@ -50,7 +50,7 @@ public interface RowDiffListener
      * but the merged result doesn't (i.e. the deletion has been shadowed).
      * @param original the deletion of input {@code i}. May be {@code null} if input {@code i} had no deletion but the merged row has.
      */
-    public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original);
+    public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original);
 
     /**
      * Called for every (non-live) complex deletion of any complex column present in either the merged row of input {@code i}.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index bf9ed5e..c3b4a92 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -70,7 +70,7 @@ public abstract class Rows
         assert !row.isEmpty();
 
         collector.update(row.primaryKeyLivenessInfo());
-        collector.update(row.deletion());
+        collector.update(row.deletion().time());
 
         int columnCount = 0;
         int cellCount = 0;
@@ -115,12 +115,12 @@ public abstract class Rows
     {
         Clustering clustering = merged.clustering();
         LivenessInfo mergedInfo = merged.primaryKeyLivenessInfo().isEmpty() ? null : merged.primaryKeyLivenessInfo();
-        DeletionTime mergedDeletion = merged.deletion().isLive() ? null : merged.deletion();
+        Row.Deletion mergedDeletion = merged.deletion().isLive() ? null : merged.deletion();
         for (int i = 0; i < inputs.length; i++)
         {
             Row input = inputs[i];
             LivenessInfo inputInfo = input == null || input.primaryKeyLivenessInfo().isEmpty() ? null : input.primaryKeyLivenessInfo();
-            DeletionTime inputDeletion = input == null || input.deletion().isLive() ? null : input.deletion();
+            Row.Deletion inputDeletion = input == null || input.deletion().isLive() ? null : input.deletion();
 
             if (mergedInfo != null || inputInfo != null)
                 diffListener.onPrimaryKeyLivenessInfo(i, clustering, mergedInfo, inputInfo);
@@ -237,13 +237,17 @@ public abstract class Rows
 
         long timeDelta = Math.abs(existingInfo.timestamp() - mergedInfo.timestamp());
 
-        DeletionTime deletion = existing.deletion().supersedes(update.deletion()) ? existing.deletion() : update.deletion();
+        Row.Deletion rowDeletion = existing.deletion().supersedes(update.deletion()) ? existing.deletion() : update.deletion();
 
-        if (deletion.deletes(mergedInfo))
+        if (rowDeletion.deletes(mergedInfo))
             mergedInfo = LivenessInfo.EMPTY;
+        else if (rowDeletion.isShadowedBy(mergedInfo))
+            rowDeletion = Row.Deletion.LIVE;
 
         builder.addPrimaryKeyLivenessInfo(mergedInfo);
-        builder.addRowDeletion(deletion);
+        builder.addRowDeletion(rowDeletion);
+
+        DeletionTime deletion = rowDeletion.time();
 
         Iterator<ColumnData> a = existing.iterator();
         Iterator<ColumnData> b = update.iterator();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 605a67b..b83ccf9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -31,8 +31,10 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  *
  * The encoded format for an unfiltered is <flags>(<row>|<marker>) where:
  *
- *   <flags> is a byte whose bits are flags used by the rest of the serialization. Each
- *       flag is defined/explained below as the "Unfiltered flags" constants.
+ *   <flags> is a byte (or two) whose bits are flags used by the rest of the serialization. Each
+ *       flag is defined/explained below as the "Unfiltered flags" constants. One of those flags
+ *       is an extension flag, and if present, trigger the rid of another byte that contains more
+ *       flags. If the extension is not set, defaults are assumed for the flags of that 2nd byte.
  *   <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where
  *       <clustering> is the row clustering as serialized by
  *       {@code Clustering.serializer}. Note that static row are an exception and
@@ -72,12 +74,18 @@ public class UnfilteredSerializer
      */
     private final static int END_OF_PARTITION     = 0x01; // Signal the end of the partition. Nothing follows a <flags> field with that flag.
     private final static int IS_MARKER            = 0x02; // Whether the encoded unfiltered is a marker or a row. All following markers applies only to rows.
-    private final static int IS_STATIC            = 0x04; // Whether the encoded row is a static.
-    private final static int HAS_TIMESTAMP        = 0x08; // Whether the encoded row has a timestamp (i.e. if row.partitionKeyLivenessInfo().hasTimestamp() == true).
-    private final static int HAS_TTL              = 0x10; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true).
-    private final static int HAS_DELETION         = 0x20; // Whether the encoded row has some deletion info.
+    private final static int HAS_TIMESTAMP        = 0x04; // Whether the encoded row has a timestamp (i.e. if row.partitionKeyLivenessInfo().hasTimestamp() == true).
+    private final static int HAS_TTL              = 0x08; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true).
+    private final static int HAS_DELETION         = 0x10; // Whether the encoded row has some deletion info.
+    private final static int HAS_ALL_COLUMNS      = 0x20; // Whether the encoded row has all of the columns from the header present.
     private final static int HAS_COMPLEX_DELETION = 0x40; // Whether the encoded row has some complex deletion for at least one of its columns.
-    private final static int HAS_ALL_COLUMNS      = 0x80; // Whether the encoded row has all of the columns from the header present
+    private final static int EXTENSION_FLAG       = 0x80; // If present, another byte is read containing the "extended flags" above.
+
+    /*
+     * Extended flags
+     */
+    private final static int IS_STATIC               = 0x01; // Whether the encoded row is a static. If there is no extended flag, the row is assumed not static.
+    private final static int HAS_SHADOWABLE_DELETION = 0x02; // Whether the row deletion is shadowable. If there is no extended flag (or no row deletion), the deletion is assumed not shadowable.
 
     public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
     throws IOException
@@ -96,28 +104,47 @@ public class UnfilteredSerializer
     throws IOException
     {
         int flags = 0;
-        boolean isStatic = row.isStatic();
+        int extendedFlags = 0;
+        boolean hasExtendedFlags = false;
 
+        boolean isStatic = row.isStatic();
         Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
-        DeletionTime deletion = row.deletion();
+        Row.Deletion deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
         boolean hasAllColumns = (row.size() == headerColumns.size());
 
         if (isStatic)
-            flags |= IS_STATIC;
+        {
+            hasExtendedFlags = true;
+            extendedFlags |= IS_STATIC;
+        }
+
         if (!pkLiveness.isEmpty())
             flags |= HAS_TIMESTAMP;
         if (pkLiveness.isExpiring())
             flags |= HAS_TTL;
         if (!deletion.isLive())
+        {
             flags |= HAS_DELETION;
+            if (deletion.isShadowable())
+            {
+                hasExtendedFlags = true;
+                extendedFlags |= HAS_SHADOWABLE_DELETION;
+            }
+        }
         if (hasComplexDeletion)
             flags |= HAS_COMPLEX_DELETION;
         if (hasAllColumns)
             flags |= HAS_ALL_COLUMNS;
 
+        if (hasExtendedFlags)
+            flags |= EXTENSION_FLAG;
+
         out.writeByte((byte)flags);
+        if (hasExtendedFlags)
+            out.writeByte((byte)extendedFlags);
+
         if (!isStatic)
             Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
 
@@ -129,7 +156,7 @@ public class UnfilteredSerializer
             header.writeLocalDeletionTime(pkLiveness.localExpirationTime(), out);
         }
         if ((flags & HAS_DELETION) != 0)
-            header.writeDeletionTime(deletion, out);
+            header.writeDeletionTime(deletion.time(), out);
 
         if (!hasAllColumns)
             Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out);
@@ -186,10 +213,13 @@ public class UnfilteredSerializer
         boolean isStatic = row.isStatic();
         Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
-        DeletionTime deletion = row.deletion();
+        Row.Deletion deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
         boolean hasAllColumns = (row.size() == headerColumns.size());
 
+        if (isStatic || deletion.isShadowable())
+            size += 1; // extended flags
+
         if (!isStatic)
             size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
 
@@ -201,7 +231,7 @@ public class UnfilteredSerializer
             size += header.localDeletionTimeSerializedSize(pkLiveness.localExpirationTime());
         }
         if (!deletion.isLive())
-            size += header.deletionTimeSerializedSize(deletion);
+            size += header.deletionTimeSerializedSize(deletion.time());
 
         if (!hasAllColumns)
             size += Columns.serializer.serializedSubsetSize(Collections2.transform(row, ColumnData::column), header.columns(isStatic));
@@ -269,6 +299,8 @@ public class UnfilteredSerializer
         if (isEndOfPartition(flags))
             return null;
 
+        int extendedFlags = isExtended(flags) ? in.readUnsignedByte() : 0;
+
         if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
         {
             RangeTombstone.Bound bound = RangeTombstone.Bound.serializer.deserialize(in, helper.version, header.clusteringTypes());
@@ -276,9 +308,9 @@ public class UnfilteredSerializer
         }
         else
         {
-            assert !isStatic(flags); // deserializeStaticRow should be used for that.
+            assert !isStatic(extendedFlags); // deserializeStaticRow should be used for that.
             builder.newRow(Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes()));
-            return deserializeRowBody(in, header, helper, flags, builder);
+            return deserializeRowBody(in, header, helper, flags, extendedFlags, builder);
         }
     }
 
@@ -286,10 +318,11 @@ public class UnfilteredSerializer
     throws IOException
     {
         int flags = in.readUnsignedByte();
-        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : flags;
+        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isExtended(flags) : flags;
+        int extendedFlags = in.readUnsignedByte();
         Row.Builder builder = BTreeRow.sortedBuilder();
         builder.newRow(Clustering.STATIC_CLUSTERING);
-        return deserializeRowBody(in, header, helper, flags, builder);
+        return deserializeRowBody(in, header, helper, flags, extendedFlags, builder);
     }
 
     public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader header, RangeTombstone.Bound bound)
@@ -305,15 +338,17 @@ public class UnfilteredSerializer
                                   SerializationHeader header,
                                   SerializationHelper helper,
                                   int flags,
+                                  int extendedFlags,
                                   Row.Builder builder)
     throws IOException
     {
         try
         {
-            boolean isStatic = isStatic(flags);
+            boolean isStatic = isStatic(extendedFlags);
             boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
             boolean hasTTL = (flags & HAS_TTL) != 0;
             boolean hasDeletion = (flags & HAS_DELETION) != 0;
+            boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
             boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
             boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
             Columns headerColumns = header.columns(isStatic);
@@ -328,7 +363,7 @@ public class UnfilteredSerializer
             }
 
             builder.addPrimaryKeyLivenessInfo(rowLiveness);
-            builder.addRowDeletion(hasDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE);
+            builder.addRowDeletion(hasDeletion ? new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable) : Row.Deletion.LIVE);
 
             Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
             for (ColumnDefinition column : columns)
@@ -395,9 +430,9 @@ public class UnfilteredSerializer
         }
     }
 
-    public void skipRowBody(DataInputPlus in, SerializationHeader header, int flags) throws IOException
+    public void skipRowBody(DataInputPlus in, SerializationHeader header, int flags, int extendedFlags) throws IOException
     {
-        boolean isStatic = isStatic(flags);
+        boolean isStatic = isStatic(extendedFlags);
         boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
         boolean hasTTL = (flags & HAS_TTL) != 0;
         boolean hasDeletion = (flags & HAS_DELETION) != 0;
@@ -430,8 +465,10 @@ public class UnfilteredSerializer
     public void skipStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) throws IOException
     {
         int flags = in.readUnsignedByte();
-        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : "Flags is " + flags;
-        skipRowBody(in, header, flags);
+        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isExtended(flags) : "Flags is " + flags;
+        int extendedFlags = in.readUnsignedByte();
+        assert isStatic(extendedFlags);
+        skipRowBody(in, header, flags, extendedFlags);
     }
 
     public void skipMarkerBody(DataInputPlus in, SerializationHeader header, boolean isBoundary) throws IOException
@@ -468,8 +505,13 @@ public class UnfilteredSerializer
         return (flags & IS_MARKER) != 0 ? Unfiltered.Kind.RANGE_TOMBSTONE_MARKER : Unfiltered.Kind.ROW;
     }
 
-    public static boolean isStatic(int flags)
+    public static boolean isStatic(int extendedFlags)
+    {
+        return (extendedFlags & IS_STATIC) != 0;
+    }
+
+    public static boolean isExtended(int flags)
     {
-        return (flags & IS_MARKER) == 0 && (flags & IS_STATIC) != 0;
+        return (flags & EXTENSION_FLAG) != 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 5128b7c..52034cc 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -263,14 +263,14 @@ public class MaterializedView
      */
     private PartitionUpdate createTombstone(TemporalRow temporalRow,
                                             DecoratedKey partitionKey,
-                                            DeletionTime deletionTime,
+                                            Row.Deletion deletion,
                                             TemporalRow.Resolver resolver,
                                             int nowInSec)
     {
         CFMetaData viewCfm = getViewCfs().metadata;
         Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
         builder.newRow(viewClustering(temporalRow, resolver));
-        builder.addRowDeletion(deletionTime);
+        builder.addRowDeletion(deletion);
         return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
     }
 
@@ -342,7 +342,7 @@ public class MaterializedView
         TemporalRow.Resolver resolver = TemporalRow.earliest;
         return createTombstone(temporalRow,
                                viewPartitionKey(temporalRow, resolver),
-                               new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec),
+                               Row.Deletion.shadowable(new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec)),
                                resolver,
                                temporalRow.nowInSec);
     }
@@ -523,7 +523,7 @@ public class MaterializedView
                     DecoratedKey value = viewPartitionKey(temporalRow, resolver);
                     if (value != null)
                     {
-                        PartitionUpdate update = createTombstone(temporalRow, value, deletionTime, resolver, temporalRow.nowInSec);
+                        PartitionUpdate update = createTombstone(temporalRow, value, Row.Deletion.regular(deletionTime), resolver, temporalRow.nowInSec);
                         if (update != null)
                             mutations.add(new Mutation(update));
                     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index be1c584..6eb9071 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -256,7 +256,7 @@ public class TemporalRow
         this.nowInSec = nowInSec;
 
         LivenessInfo liveness = row.primaryKeyLivenessInfo();
-        this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime, row.deletion().localDeletionTime(), NO_DELETION_TIME);
+        this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime, row.deletion().time().localDeletionTime(), NO_DELETION_TIME);
         this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, liveness.timestamp(), NO_TIMESTAMP);
         this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, liveness.ttl(), NO_TTL);
 
@@ -402,7 +402,7 @@ public class TemporalRow
             return clusterTombstone.deletionTime();
 
         Row row = partition.getRow(baseClustering);
-        return row == null || row.deletion().isLive() ? DeletionTime.LIVE : row.deletion();
+        return row == null || row.deletion().isLive() ? DeletionTime.LIVE : row.deletion().time();
     }
 
     public Collection<org.apache.cassandra.db.rows.Cell> values(ColumnDefinition definition, Resolver resolver)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index bd3202d..a916bd2 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -742,7 +742,7 @@ public class SecondaryIndexManager implements IndexRegistry
                         toInsert.addPrimaryKeyLivenessInfo(merged);
                 }
 
-                public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
+                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
                 {
                 }
 
@@ -834,7 +834,7 @@ public class SecondaryIndexManager implements IndexRegistry
                 {
                 }
 
-                public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
+                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
                 {
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index fd19e7a..1e2deee 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -411,13 +411,13 @@ public abstract class CassandraIndex implements Index
 
             private void indexPrimaryKey(final Clustering clustering,
                                          final LivenessInfo liveness,
-                                         final DeletionTime deletion)
+                                         final Row.Deletion deletion)
             {
                 if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
                     insert(key.getKey(), clustering, null, liveness, opGroup);
 
                 if (!deletion.isLive())
-                    delete(key.getKey(), clustering, deletion, opGroup);
+                    delete(key.getKey(), clustering, deletion.time(), opGroup);
             }
 
             private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
@@ -516,7 +516,7 @@ public abstract class CassandraIndex implements Index
                           DeletionTime deletion,
                           OpOrder.Group opGroup)
     {
-        Row row = BTreeRow.emptyDeletedRow(indexClustering, deletion);
+        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
         PartitionUpdate upd = partitionUpdate(indexKey, row);
         indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
         logger.debug("Removed index entry for value {}", indexKey);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 0eb13c3..f24c29f 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -180,7 +180,7 @@ public class DataResolver extends ResponseResolver
                             currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
                     }
 
-                    public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
+                    public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
                     {
                         if (merged != null && !merged.equals(original))
                             currentRow(i, clustering).addRowDeletion(merged);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 14cd812..207bb6f 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1343,7 +1343,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         else if (column_path.super_column != null && column_path.column == null)
         {
-            Row row = BTreeRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
+            Row row = BTreeRow.emptyDeletedRow(new Clustering(column_path.super_column), Row.Deletion.regular(new DeletionTime(timestamp, nowInSec)));
             update = PartitionUpdate.singleRowUpdate(metadata, dk, row);
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 25dfc28..61e4fc2 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -753,12 +753,12 @@ public abstract class CQLTester
             Object[] expected = rows[i];
             UntypedResultSet.Row actual = iter.next();
 
-            Assert.assertEquals(String.format("Invalid number of (expected) values provided for row %d", i), expected.length, meta.size());
+            Assert.assertEquals(String.format("Invalid number of (expected) values provided for row %d", i), expected == null ? 1 : expected.length, meta.size());
 
             for (int j = 0; j < meta.size(); j++)
             {
                 ColumnSpecification column = meta.get(j);
-                ByteBuffer expectedByteValue = makeByteBuffer(expected[j], column.type);
+                ByteBuffer expectedByteValue = makeByteBuffer(expected == null ? null : expected[j], column.type);
                 ByteBuffer actualValue = actual.getBytes(column.name.toString());
 
                 if (!Objects.equal(expectedByteValue, actualValue))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
index 75379fb..f4b4a82 100644
--- a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.serializers.SimpleDateSerializer;
 import org.apache.cassandra.serializers.TimeSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class MaterializedViewTest extends CQLTester
 {
@@ -341,6 +342,124 @@ public class MaterializedViewTest extends CQLTester
     }
 
     @Test
+    public void complexTimestampUpdateTestWithFlush() throws Throwable
+    {
+        complexTimestampUpdateTest(true);
+    }
+
+    @Test
+    public void complexTimestampUpdateTestWithoutFlush() throws Throwable
+    {
+        complexTimestampUpdateTest(false);
+    }
+
+    public void complexTimestampUpdateTest(boolean flush) throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, e int, PRIMARY KEY (a, b))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+        Keyspace ks = Keyspace.open(keyspace());
+
+        createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, a, b)");
+        ks.getColumnFamilyStore("mv").disableAutoCompaction();
+
+        //Set initial values TS=0, leaving e null and verify view
+        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d) VALUES (0, 0, 1, 0) USING TIMESTAMP 0");
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0));
+
+        if (flush)
+            FBUtilities.waitOnFutures(ks.flush());
+
+        //update c's timestamp TS=2
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? and b = ? ", 1, 0, 0);
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0));
+
+        if (flush)
+            FBUtilities.waitOnFutures(ks.flush());
+
+            //change c's value and TS=3, tombstones c=1 and adds c=0 record
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? and b = ? ", 0, 0, 0);
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0));
+
+        if(flush)
+        {
+            ks.getColumnFamilyStore("mv").forceMajorCompaction();
+            FBUtilities.waitOnFutures(ks.flush());
+        }
+
+
+        //change c's value back to 1 with TS=4, check we can see d
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE a = ? and b = ? ", 1, 0, 0);
+        if (flush)
+        {
+            ks.getColumnFamilyStore("mv").forceMajorCompaction();
+            FBUtilities.waitOnFutures(ks.flush());
+        }
+
+        assertRows(execute("SELECT d,e from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0, null));
+
+
+            //Add e value @ TS=1
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 1 SET e = ? WHERE a = ? and b = ? ", 1, 0, 0);
+        assertRows(execute("SELECT d,e from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0, 1));
+
+        if (flush)
+            FBUtilities.waitOnFutures(ks.flush());
+
+
+        //Change d value @ TS=2
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET d = ? WHERE a = ? and b = ? ", 2, 0, 0);
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(2));
+
+        if (flush)
+            FBUtilities.waitOnFutures(ks.flush());
+
+
+        //Change d value @ TS=3
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? and b = ? ", 1, 0, 0);
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(1));
+
+
+        //Tombstone c
+        executeNet(protocolVersion, "DELETE FROM %s WHERE a = ? and b = ?", 0, 0);
+        assertRows(execute("SELECT d from mv"));
+
+        //Add back without D
+        executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES (0, 0, 1)");
+
+        //Make sure D doesn't pop back in.
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row((Object) null));
+
+
+        //New partition
+        // insert a row with timestamp 0
+        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d, e) VALUES (?, ?, ?, ?, ?) USING TIMESTAMP 0", 1, 0, 0, 0, 0);
+
+        // overwrite pk and e with timestamp 1, but don't overwrite d
+        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, e) VALUES (?, ?, ?, ?) USING TIMESTAMP 1", 1, 0, 0, 0);
+
+        // delete with timestamp 0 (which should only delete d)
+        executeNet(protocolVersion, "DELETE FROM %s USING TIMESTAMP 0 WHERE a = ? AND b = ?", 1, 0);
+        assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 0, 1, 0),
+                   row(1, 0, 0, null, 0)
+        );
+
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? AND b = ?", 1, 1, 0);
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? AND b = ?", 0, 1, 0);
+        assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 0, 1, 0),
+                   row(1, 0, 0, null, 0)
+        );
+
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? AND b = ?", 0, 1, 0);
+        assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 0, 1, 0),
+                   row(1, 0, 0, 0, 0)
+        );
+
+
+    }
+
+    @Test
     public void testBuilderWidePartition() throws Throwable
     {
         createTable("CREATE TABLE %s (" +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index 08b9b8e..7637fa0 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -337,8 +337,8 @@ public class UnfilteredRowIteratorsMergeTest
         else
         {
             Row row = (Row) list.get(index);
-            if (row.deletion() != null && row.deletion().supersedes(def))
-                def = row.deletion();
+            if (row.deletion().supersedes(def))
+                def = row.deletion().time();
         }
 
         if (index >= list.size())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/665f7474/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 3326b3f..9d9c0b3 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -395,13 +395,13 @@ public class CustomCassandraIndex implements Index
 
             private void indexPrimaryKey(final Clustering clustering,
                                          final LivenessInfo liveness,
-                                         final DeletionTime deletion)
+                                         final Row.Deletion deletion)
             {
                 if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
                     insert(key.getKey(), clustering, null, liveness, opGroup);
 
                 if (!deletion.isLive())
-                    delete(key.getKey(), clustering, deletion, opGroup);
+                    delete(key.getKey(), clustering, deletion.time(), opGroup);
             }
 
             private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
@@ -500,7 +500,7 @@ public class CustomCassandraIndex implements Index
                           DeletionTime deletion,
                           OpOrder.Group opGroup)
     {
-        Row row = BTreeRow.emptyDeletedRow(indexClustering, deletion);
+        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
         PartitionUpdate upd = partitionUpdate(indexKey, row);
         indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
         logger.debug("Removed index entry for value {}", indexKey);


Mime
View raw message