cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject cassandra git commit: Add row size to sstable format for faster skipping
Date Fri, 09 Oct 2015 16:07:10 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 028c3f779 -> 6584331c8


Add row size to sstable format for faster skipping

patch by slebresne; reviewed by benedict for CASSANDRA-10378


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6584331c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6584331c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6584331c

Branch: refs/heads/cassandra-3.0
Commit: 6584331c881329c2cb9afbcef19997e8a2a612d9
Parents: 028c3f7
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Tue Sep 22 13:53:22 2015 -0700
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Oct 9 18:07:01 2015 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnIndex.java    |  10 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   2 +-
 .../cassandra/db/SerializationHeader.java       |  26 ++-
 .../cassandra/db/UnfilteredDeserializer.java    |   8 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |  10 +-
 .../cassandra/db/rows/UnfilteredSerializer.java | 174 +++++++++++--------
 .../io/sstable/AbstractSSTableSimpleWriter.java |   2 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   4 +-
 .../apache/cassandra/db/RowIndexEntryTest.java  |   4 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   3 +-
 .../db/compaction/AntiCompactionTest.java       |   2 +-
 .../io/sstable/BigTableWriterTest.java          |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |   4 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   2 +-
 14 files changed, 155 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index add5fa7..ede3f79 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -76,6 +76,7 @@ public class ColumnIndex
         private long startPosition = -1;
 
         private int written;
+        private long previousRowStart;
 
         private ClusteringPrefix firstClustering;
         private ClusteringPrefix lastClustering;
@@ -99,7 +100,7 @@ public class ColumnIndex
             ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
             DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
             if (header.hasStatic())
-                UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer,
version);
+                UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(),
header, writer, version);
         }
 
         public ColumnIndex build() throws IOException
@@ -131,15 +132,18 @@ public class ColumnIndex
 
         private void add(Unfiltered unfiltered) throws IOException
         {
+            long pos = currentPosition();
+
             if (firstClustering == null)
             {
                 // Beginning of an index block. Remember the start and position
                 firstClustering = unfiltered.clustering();
-                startPosition = currentPosition();
+                startPosition = pos;
             }
 
-            UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version);
+            UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart,
version);
             lastClustering = unfiltered.clustering();
+            previousRowStart = pos;
             ++written;
 
             if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 014467e..f47efe3 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -430,7 +430,7 @@ public class Memtable implements Comparable<Memtable>
                                                                      (long)partitions.size(),
                                                                      ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                      sstableMetadataCollector,
-                                                                     new SerializationHeader(cfs.metadata,
columns, stats),
+                                                                     new SerializationHeader(true,
cfs.metadata, columns, stats),
                                                                      txn));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index decac49..0706d06 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -45,6 +45,8 @@ public class SerializationHeader
 {
     public static final Serializer serializer = new Serializer();
 
+    private final boolean isForSSTable;
+
     private final AbstractType<?> keyType;
     private final List<AbstractType<?>> clusteringTypes;
 
@@ -53,12 +55,14 @@ public class SerializationHeader
 
     private final Map<ByteBuffer, AbstractType<?>> typeMap;
 
-    private SerializationHeader(AbstractType<?> keyType,
+    private SerializationHeader(boolean isForSSTable,
+                                AbstractType<?> keyType,
                                 List<AbstractType<?>> clusteringTypes,
                                 PartitionColumns columns,
                                 EncodingStats stats,
                                 Map<ByteBuffer, AbstractType<?>> typeMap)
     {
+        this.isForSSTable = isForSSTable;
         this.keyType = keyType;
         this.clusteringTypes = clusteringTypes;
         this.columns = columns;
@@ -77,7 +81,8 @@ public class SerializationHeader
         List<AbstractType<?>> clusteringTypes = new ArrayList<>(size);
         for (int i = 0; i < size; i++)
             clusteringTypes.add(BytesType.instance);
-        return new SerializationHeader(BytesType.instance,
+        return new SerializationHeader(false,
+                                       BytesType.instance,
                                        clusteringTypes,
                                        PartitionColumns.NONE,
                                        EncodingStats.NO_STATS,
@@ -108,14 +113,16 @@ public class SerializationHeader
             else
                 columns.addAll(sstable.header.columns());
         }
-        return new SerializationHeader(metadata, columns.build(), stats.get());
+        return new SerializationHeader(true, metadata, columns.build(), stats.get());
     }
 
-    public SerializationHeader(CFMetaData metadata,
+    public SerializationHeader(boolean isForSSTable,
+                               CFMetaData metadata,
                                PartitionColumns columns,
                                EncodingStats stats)
     {
-        this(metadata.getKeyValidator(),
+        this(isForSSTable,
+             metadata.getKeyValidator(),
              typesOf(metadata.clusteringColumns()),
              columns,
              stats,
@@ -137,6 +144,11 @@ public class SerializationHeader
         return !columns.statics.isEmpty();
     }
 
+    public boolean isForSSTable()
+    {
+        return isForSSTable;
+    }
+
     public EncodingStats stats()
     {
         return stats;
@@ -320,7 +332,7 @@ public class SerializationHeader
                 }
                 builder.add(column);
             }
-            return new SerializationHeader(keyType, clusteringTypes, builder.build(), stats,
typeMap);
+            return new SerializationHeader(true, keyType, clusteringTypes, builder.build(),
stats, typeMap);
         }
 
         @Override
@@ -390,7 +402,7 @@ public class SerializationHeader
                 regulars = Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars,
in);
             }
 
-            return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics,
regulars), stats, null);
+            return new SerializationHeader(false, keyType, clusteringTypes, new PartitionColumns(statics,
regulars), stats, null);
         }
 
         public long serializedSizeForMessaging(SerializationHeader header, ColumnFilter selection,
boolean hasStatic)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 5c76c63..ef30289 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -147,7 +147,7 @@ public abstract class UnfilteredDeserializer
                 return;
             }
 
-            nextExtendedFlags = UnfilteredSerializer.isExtended(nextFlags) ? in.readUnsignedByte()
: 0;
+            nextExtendedFlags = UnfilteredSerializer.readExtendedFlags(in, nextFlags);
 
             clusteringDeserializer.prepare(nextFlags, nextExtendedFlags);
             isReady = true;
@@ -195,14 +195,14 @@ public abstract class UnfilteredDeserializer
         public void skipNext() throws IOException
         {
             isReady = false;
-            ClusteringPrefix.Kind kind = clusteringDeserializer.skipNext();
+            clusteringDeserializer.skipNext();
             if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
             {
-                UnfilteredSerializer.serializer.skipMarkerBody(in, header, kind.isBoundary());
+                UnfilteredSerializer.serializer.skipMarkerBody(in);
             }
             else
             {
-                UnfilteredSerializer.serializer.skipRowBody(in, header, nextFlags, nextExtendedFlags);
+                UnfilteredSerializer.serializer.skipRowBody(in);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index df006d7..3a0558e 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -80,7 +80,8 @@ public class UnfilteredRowIteratorSerializer
     // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus
out, int version, int rowEstimate) throws IOException
     {
-        SerializationHeader header = new SerializationHeader(iterator.metadata(),
+        SerializationHeader header = new SerializationHeader(false,
+                                                             iterator.metadata(),
                                                              iterator.columns(),
                                                              iterator.stats());
         serialize(iterator, header, selection, out, version, rowEstimate);
@@ -89,6 +90,8 @@ public class UnfilteredRowIteratorSerializer
     // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter
selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
+        assert !header.isForSSTable();
+
         ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out);
 
         int flags = 0;
@@ -134,7 +137,8 @@ public class UnfilteredRowIteratorSerializer
     // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
     public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int
version, int rowEstimate)
     {
-        SerializationHeader header = new SerializationHeader(iterator.metadata(),
+        SerializationHeader header = new SerializationHeader(false,
+                                                             iterator.metadata(),
                                                              iterator.columns(),
                                                              iterator.stats());
 
@@ -175,7 +179,7 @@ public class UnfilteredRowIteratorSerializer
         boolean isReversed = (flags & IS_REVERSED) != 0;
         if ((flags & IS_EMPTY) != 0)
         {
-            SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE,
EncodingStats.NO_STATS);
+            SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE,
EncodingStats.NO_STATS);
             return new Header(sh, key, isReversed, true, null, null, 0);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index b83ccf9..4efc5eb 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -35,10 +35,12 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  *       flag is defined/explained below as the "Unfiltered flags" constants. One of those
flags
  *       is an extension flag, and if present, trigger the rid of another byte that contains
more
  *       flags. If the extension is not set, defaults are assumed for the flags of that 2nd
byte.
- *   <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj>
where
- *       <clustering> is the row clustering as serialized by
- *       {@code Clustering.serializer}. Note that static row are an exception and
- *       don't have this. <timestamp>, <ttl> and <deletion> are the row
timestamp, ttl and deletion
+ *   <row> is <clustering><size>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj>
where
+ *       <clustering> is the row clustering as serialized by {@code Clustering.serializer}
(note
+ *       that static row are an exception and don't have this).
+ *       <size> is the size of the whole unfiltered on disk (it's only used for sstables
and is
+ *       used to efficiently skip rows).
+ *       <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and
deletion
  *       whose presence is determined by the flags. <sci> is the simple columns of
the row and <ccj> the
  *       complex ones.
  *       The columns for the row are then serialized if they differ from those in the header,
@@ -90,22 +92,35 @@ public class UnfilteredSerializer
     public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus
out, int version)
     throws IOException
     {
+        assert !header.isForSSTable();
+        serialize(unfiltered, header, out, 0, version);
+    }
+
+    public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus
out, long previousUnfilteredSize, int version)
+    throws IOException
+    {
         if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
         {
-            serialize((RangeTombstoneMarker) unfiltered, header, out, version);
+            serialize((RangeTombstoneMarker) unfiltered, header, out, previousUnfilteredSize,
version);
         }
         else
         {
-            serialize((Row) unfiltered, header, out, version);
+            serialize((Row) unfiltered, header, out, previousUnfilteredSize, version);
         }
     }
 
-    public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version)
+    public void serializeStaticRow(Row row, SerializationHeader header, DataOutputPlus out,
int version)
+    throws IOException
+    {
+        assert row.isStatic();
+        serialize(row, header, out, 0, version);
+    }
+
+    private void serialize(Row row, SerializationHeader header, DataOutputPlus out, long
previousUnfilteredSize, int version)
     throws IOException
     {
         int flags = 0;
         int extendedFlags = 0;
-        boolean hasExtendedFlags = false;
 
         boolean isStatic = row.isStatic();
         Columns headerColumns = header.columns(isStatic);
@@ -113,12 +128,10 @@ public class UnfilteredSerializer
         Row.Deletion deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
         boolean hasAllColumns = (row.size() == headerColumns.size());
+        boolean hasExtendedFlags = hasExtendedFlags(row);
 
         if (isStatic)
-        {
-            hasExtendedFlags = true;
             extendedFlags |= IS_STATIC;
-        }
 
         if (!pkLiveness.isEmpty())
             flags |= HAS_TIMESTAMP;
@@ -128,10 +141,7 @@ public class UnfilteredSerializer
         {
             flags |= HAS_DELETION;
             if (deletion.isShadowable())
-            {
-                hasExtendedFlags = true;
                 extendedFlags |= HAS_SHADOWABLE_DELETION;
-            }
         }
         if (hasComplexDeletion)
             flags |= HAS_COMPLEX_DELETION;
@@ -148,6 +158,12 @@ public class UnfilteredSerializer
         if (!isStatic)
             Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
 
+        if (header.isForSSTable())
+        {
+            out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize,
version));
+            out.writeUnsignedVInt(previousUnfilteredSize);
+        }
+
         if ((flags & HAS_TIMESTAMP) != 0)
             header.writeTimestamp(pkLiveness.timestamp(), out);
         if ((flags & HAS_TTL) != 0)
@@ -181,12 +197,18 @@ public class UnfilteredSerializer
             Cell.serializer.serialize(cell, out, rowLiveness, header);
     }
 
-    public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus
out, int version)
+    private void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus
out, long previousUnfilteredSize, int version)
     throws IOException
     {
         out.writeByte((byte)IS_MARKER);
         RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes());
 
+        if (header.isForSSTable())
+        {
+            out.writeUnsignedVInt(serializedMarkerBodySize(marker, header, previousUnfilteredSize,
version));
+            out.writeUnsignedVInt(previousUnfilteredSize);
+        }
+
         if (marker.isBoundary())
         {
             RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
@@ -201,15 +223,37 @@ public class UnfilteredSerializer
 
     public long serializedSize(Unfiltered unfiltered, SerializationHeader header, int version)
     {
+        assert !header.isForSSTable();
+        return serializedSize(unfiltered, header, 0, version);
+    }
+
+    public long serializedSize(Unfiltered unfiltered, SerializationHeader header, long previousUnfilteredSize,int
version)
+    {
         return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER
-             ? serializedSize((RangeTombstoneMarker) unfiltered, header, version)
-             : serializedSize((Row) unfiltered, header, version);
+             ? serializedSize((RangeTombstoneMarker) unfiltered, header, previousUnfilteredSize,
version)
+             : serializedSize((Row) unfiltered, header, previousUnfilteredSize, version);
     }
 
-    public long serializedSize(Row row, SerializationHeader header, int version)
+    private long serializedSize(Row row, SerializationHeader header, long previousUnfilteredSize,
int version)
     {
         long size = 1; // flags
 
+        if (hasExtendedFlags(row))
+            size += 1; // extended flags
+
+        if (!row.isStatic())
+            size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
+
+        return size + serializedRowBodySize(row, header, previousUnfilteredSize, version);
+    }
+
+    private long serializedRowBodySize(Row row, SerializationHeader header, long previousUnfilteredSize,
int version)
+    {
+        long size = 0;
+
+        if (header.isForSSTable())
+            size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
+
         boolean isStatic = row.isStatic();
         Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
@@ -217,12 +261,6 @@ public class UnfilteredSerializer
         boolean hasComplexDeletion = row.hasComplexDeletion();
         boolean hasAllColumns = (row.size() == headerColumns.size());
 
-        if (isStatic || deletion.isShadowable())
-            size += 1; // extended flags
-
-        if (!isStatic)
-            size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
-
         if (!pkLiveness.isEmpty())
             size += header.timestampSerializedSize(pkLiveness.timestamp());
         if (pkLiveness.isExpiring())
@@ -261,10 +299,19 @@ public class UnfilteredSerializer
         return size;
     }
 
-    public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int
version)
+    private long serializedSize(RangeTombstoneMarker marker, SerializationHeader header,
long previousUnfilteredSize, int version)
     {
-        long size = 1 // flags
-                  + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version,
header.clusteringTypes());
+        assert !header.isForSSTable();
+        return 1 // flags
+             + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version,
header.clusteringTypes())
+             + serializedMarkerBodySize(marker, header, previousUnfilteredSize, version);
+    }
+
+    private long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader
header, long previousUnfilteredSize, int version)
+    {
+        long size = 0;
+        if (header.isForSSTable())
+            size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
 
         if (marker.isBoundary())
         {
@@ -299,7 +346,7 @@ public class UnfilteredSerializer
         if (isEndOfPartition(flags))
             return null;
 
-        int extendedFlags = isExtended(flags) ? in.readUnsignedByte() : 0;
+        int extendedFlags = readExtendedFlags(in, flags);
 
         if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
         {
@@ -328,6 +375,12 @@ public class UnfilteredSerializer
     public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader
header, RangeTombstone.Bound bound)
     throws IOException
     {
+        if (header.isForSSTable())
+        {
+            in.readUnsignedVInt(); // marker size
+            in.readUnsignedVInt(); // previous unfiltered size
+        }
+
         if (bound.isBoundary())
             return new RangeTombstoneBoundaryMarker(bound, header.readDeletionTime(in), header.readDeletionTime(in));
         else
@@ -353,6 +406,12 @@ public class UnfilteredSerializer
             boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
             Columns headerColumns = header.columns(isStatic);
 
+            if (header.isForSSTable())
+            {
+                in.readUnsignedVInt(); // Skip row size
+                in.readUnsignedVInt(); // previous unfiltered size
+            }
+
             LivenessInfo rowLiveness = LivenessInfo.EMPTY;
             if (hasTimestamp)
             {
@@ -430,36 +489,10 @@ public class UnfilteredSerializer
         }
     }
 
-    public void skipRowBody(DataInputPlus in, SerializationHeader header, int flags, int
extendedFlags) throws IOException
+    public void skipRowBody(DataInputPlus in) throws IOException
     {
-        boolean isStatic = isStatic(extendedFlags);
-        boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
-        boolean hasTTL = (flags & HAS_TTL) != 0;
-        boolean hasDeletion = (flags & HAS_DELETION) != 0;
-        boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
-        boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
-        Columns headerColumns = header.columns(isStatic);
-
-        // Note that we don't want want to use FileUtils.skipBytesFully for anything that
may not have
-        // the size we think due to VINT encoding
-        if (hasTimestamp)
-            header.skipTimestamp(in);
-        if (hasTTL)
-        {
-            header.skipLocalDeletionTime(in);
-            header.skipTTL(in);
-        }
-        if (hasDeletion)
-            header.skipDeletionTime(in);
-
-        Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns,
in);
-        for (ColumnDefinition column : columns)
-        {
-            if (column.isSimple())
-                Cell.serializer.skip(in, column, header);
-            else
-                skipComplexColumn(in, column, header, hasComplexDeletion);
-        }
+        int rowSize = (int)in.readUnsignedVInt();
+        in.skipBytesFully(rowSize);
     }
 
     public void skipStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper
helper) throws IOException
@@ -468,20 +501,13 @@ public class UnfilteredSerializer
         assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW &&
isExtended(flags) : "Flags is " + flags;
         int extendedFlags = in.readUnsignedByte();
         assert isStatic(extendedFlags);
-        skipRowBody(in, header, flags, extendedFlags);
+        skipRowBody(in);
     }
 
-    public void skipMarkerBody(DataInputPlus in, SerializationHeader header, boolean isBoundary)
throws IOException
+    public void skipMarkerBody(DataInputPlus in) throws IOException
     {
-        if (isBoundary)
-        {
-            header.skipDeletionTime(in);
-            header.skipDeletionTime(in);
-        }
-        else
-        {
-            header.skipDeletionTime(in);
-        }
+        int markerSize = (int)in.readUnsignedVInt();
+        in.skipBytesFully(markerSize);
     }
 
     private void skipComplexColumn(DataInputPlus in, ColumnDefinition column, SerializationHeader
header, boolean hasComplexDeletion)
@@ -510,8 +536,18 @@ public class UnfilteredSerializer
         return (extendedFlags & IS_STATIC) != 0;
     }
 
-    public static boolean isExtended(int flags)
+    private static boolean isExtended(int flags)
     {
         return (flags & EXTENSION_FLAG) != 0;
     }
+
+    public static int readExtendedFlags(DataInputPlus in, int flags) throws IOException
+    {
+        return isExtended(flags) ? in.readUnsignedByte() : 0;
+    }
+
+    public static boolean hasExtendedFlags(Row row)
+    {
+        return row.isStatic() || row.deletion().isShadowable();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index d94b219..62348ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -65,7 +65,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
                                        0,
-                                       new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
+                                       new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS));
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final
String columnFamily, final SSTableFormat.Type fmt)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index f4b9adf..6d3a714 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -63,7 +63,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
     {
         super(directory, metadata, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
+        this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
         diskWriter.start();
     }
 
@@ -89,7 +89,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         // improve that. In particular, what we count is closer to the serialized value,
but it's debatable that it's the right thing
         // to count since it will take a lot more space in memory and the bufferSize if first
and foremost used to avoid OOM when
         // using this writer.
-        currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, formatType.info.getLatestVersion().correspondingMessagingVersion());
+        currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, 0, formatType.info.getLatestVersion().correspondingMessagingVersion());
     }
 
     private void maybeSync() throws SyncException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 25baa4e..62c88a0 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -60,7 +60,7 @@ public class RowIndexEntryTest extends CQLTester
 
         DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
 
-        SerializationHeader header = new SerializationHeader(cfMeta, cfMeta.partitionColumns(),
EncodingStats.NO_STATS);
+        SerializationHeader header = new SerializationHeader(true, cfMeta, cfMeta.partitionColumns(),
EncodingStats.NO_STATS);
         IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfMeta,
BigFormat.latestVersion, header);
 
         DataOutputBuffer dob = new DataOutputBuffer();
@@ -119,7 +119,7 @@ public class RowIndexEntryTest extends CQLTester
         final RowIndexEntry simple = new RowIndexEntry(123);
 
         DataOutputBuffer buffer = new DataOutputBuffer();
-        SerializationHeader header = new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(),
EncodingStats.NO_STATS);
+        SerializationHeader header = new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(),
EncodingStats.NO_STATS);
         RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(cfs.metadata,
BigFormat.latestVersion, header);
 
         serializer.serialize(simple, buffer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 2fc8436..ab99750 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -325,7 +325,8 @@ public class ScrubTest
                                                                    keys.size(),
                                                                    0L,
                                                                    0,
-                                                                   new SerializationHeader(cfs.metadata,
+                                                                   new SerializationHeader(true,
+                                                                                        
  cfs.metadata,
                                                                                         
  cfs.metadata.partitionColumns(),
                                                                                         
  EncodingStats.NO_STATS)))
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index cd82b19..db07eb8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -166,7 +166,7 @@ public class AntiCompactionTest
         File dir = cfs.getDirectories().getDirectoryForNewSSTables();
         String filename = cfs.getSSTablePath(dir);
 
-        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm,
cfm.partitionColumns(), EncodingStats.NO_STATS)))
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(true,
cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
         {
             for (int i = 0; i < count; i++)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index e1ab48f..78964f4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest
 
         private TestableBTW(String file)
         {
-            this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+            this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(true,
cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(String file, SSTableTxnWriter sw)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 093bffd..bd286e4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -943,7 +943,7 @@ public class SSTableRewriterTest extends SchemaLoader
             File dir = cfs.getDirectories().getDirectoryForNewSSTables();
             String filename = cfs.getSSTablePath(dir);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new
SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new
SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount)
/ fileCount;
                 for ( ; i < end ; i++)
@@ -1011,7 +1011,7 @@ public class SSTableRewriterTest extends SchemaLoader
     public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction
txn)
     {
         String filename = cfs.getSSTablePath(directory);
-        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
     }
 
     public static ByteBuffer random(int i, int size)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6584331c/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index fcd2d71..5c7ff02 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -187,7 +187,7 @@ public class SSTableUtils
             {
                 public SerializationHeader header()
                 {
-                    return new SerializationHeader(Schema.instance.getCFMetaData(ksname,
cfname), builder.build(), EncodingStats.NO_STATS);
+                    return new SerializationHeader(true, Schema.instance.getCFMetaData(ksname,
cfname), builder.build(), EncodingStats.NO_STATS);
                 }
 
                 @Override


Mime
View raw message