cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [1/2] cassandra git commit: Implement backward compatibility for paging states
Date Fri, 18 Sep 2015 11:24:26 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk bb826240b -> 9967303f6


Implement backward compatibility for paging states

Pre-3.0 nodes serialize a full cellname in the paging state, but doing
so is wasteful for 3.0 nodes. We can't however guarantee to which nodes
a particular paging state will be sent so we need to have a backward
compatibility.

The patch preserve the pre-existing format for the paging state (thus
requiring more work for 3.0 nodes) for the native protocol v3, but
change to a more efficient (for 3.0) format for the protocol v4. It is
then documented that paging states  shouldn't be used across protocol
versions.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b99c8631
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b99c8631
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b99c8631

Branch: refs/heads/trunk
Commit: b99c8631586e734532dfd9fa84e7d88551edd229
Parents: 106b1cd
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Tue Sep 8 16:00:58 2015 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Sep 18 13:01:59 2015 +0200

----------------------------------------------------------------------
 doc/native_protocol_v3.spec                     |   3 +
 doc/native_protocol_v4.spec                     |   5 +
 .../org/apache/cassandra/cql3/QueryOptions.java |   6 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |   3 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |   6 +-
 .../cql3/statements/SelectStatement.java        |   4 +-
 .../org/apache/cassandra/db/Clustering.java     |  29 +++
 .../org/apache/cassandra/db/LegacyLayout.java   |   2 +-
 .../cassandra/db/PartitionRangeReadCommand.java |   6 +-
 src/java/org/apache/cassandra/db/ReadQuery.java |   5 +-
 .../db/SinglePartitionNamesCommand.java         |   1 +
 .../db/SinglePartitionReadCommand.java          |  14 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  48 +++-
 .../cassandra/db/rows/ComplexColumnData.java    |   5 +
 src/java/org/apache/cassandra/db/rows/Row.java  |   3 +-
 src/java/org/apache/cassandra/db/view/View.java |   5 +-
 .../apache/cassandra/db/view/ViewBuilder.java   |   3 +-
 .../service/pager/AbstractQueryPager.java       |   4 +-
 .../service/pager/MultiPartitionPager.java      |   8 +-
 .../cassandra/service/pager/PagingState.java    | 220 ++++++++++++++++---
 .../cassandra/service/pager/QueryPagers.java    |   3 +-
 .../service/pager/RangeNamesQueryPager.java     |   4 +-
 .../service/pager/RangeSliceQueryPager.java     |  16 +-
 .../service/pager/SinglePartitionPager.java     |  16 +-
 .../org/apache/cassandra/transport/CBUtil.java  |   7 +
 .../cassandra/service/QueryPagerTest.java       |  97 +++++++-
 .../service/pager/PagingStateTest.java          |  99 +++++++++
 27 files changed, 526 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/doc/native_protocol_v3.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v3.spec b/doc/native_protocol_v3.spec
index 312c8c1..1c8e924 100644
--- a/doc/native_protocol_v3.spec
+++ b/doc/native_protocol_v3.spec
@@ -917,6 +917,9 @@ Table of Contents
     <result_page_size> results. While the current implementation always respect
     the exact value of <result_page_size>, we reserve ourselves the right to return
     slightly smaller or bigger pages in the future for performance reasons.
+  - The <paging_state> is specific to a protocol version and drivers should not
+    send a <paging_state> returned by a node using the protocol v3 to query a node
+    using the protocol v4 for instance.
 
 
 9. Error codes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index f040323..852ae60 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -1006,6 +1006,9 @@ Table of Contents
     <result_page_size> results. While the current implementation always respect
     the exact value of <result_page_size>, we reserve ourselves the right to return
     slightly smaller or bigger pages in the future for performance reasons.
+  - The <paging_state> is specific to a protocol version and drivers should not
+    send a <paging_state> returned by a node using the protocol v3 to query a node
+    using the protocol v4 for instance.
 
 
 9. Error codes
@@ -1160,3 +1163,5 @@ Table of Contents
   * Add warnings to frames for responses for which the server generated a warning during processing, which the client needs to address.
   * Add the date and time data types
   * Add the tinyint and smallint data types
+  * The <paging_state> return on the v4 protocol is not compatible with the v3 protocol. In other words, a <paging_state> returned by a
+    node using the protocol v4 should not be used to query a node using the protocol v3 (and vice-versa).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index ad554ed..e6e80e3 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -384,7 +384,7 @@ public abstract class QueryOptions
             if (!flags.isEmpty())
             {
                 int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() : -1;
-                PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body)) : null;
+                PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body), version) : null;
                 ConsistencyLevel serialConsistency = flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL;
                 long timestamp = Long.MIN_VALUE;
                 if (flags.contains(Flag.TIMESTAMP))
@@ -413,7 +413,7 @@ public abstract class QueryOptions
             if (flags.contains(Flag.PAGE_SIZE))
                 dest.writeInt(options.getPageSize());
             if (flags.contains(Flag.PAGING_STATE))
-                CBUtil.writeValue(options.getPagingState().serialize(), dest);
+                CBUtil.writeValue(options.getPagingState().serialize(version), dest);
             if (flags.contains(Flag.SERIAL_CONSISTENCY))
                 CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
             if (flags.contains(Flag.TIMESTAMP))
@@ -438,7 +438,7 @@ public abstract class QueryOptions
             if (flags.contains(Flag.PAGE_SIZE))
                 size += 4;
             if (flags.contains(Flag.PAGING_STATE))
-                size += CBUtil.sizeOfValue(options.getPagingState().serialize());
+                size += CBUtil.sizeOfValue(options.getPagingState().serializedSize(version));
             if (flags.contains(Flag.SERIAL_CONSISTENCY))
                 size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
             if (flags.contains(Flag.TIMESTAMP))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 92549f9..8f2a4f4 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.thrift.ThriftClientState;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.*;
 import org.github.jamm.MemoryMeter;
@@ -322,7 +323,7 @@ public class QueryProcessor implements QueryHandler
             throw new IllegalArgumentException("Only SELECTs can be paged");
 
         SelectStatement select = (SelectStatement)prepared.statement;
-        QueryPager pager = select.getQuery(makeInternalOptions(prepared, values), FBUtilities.nowInSeconds()).getPager(null);
+        QueryPager pager = select.getQuery(makeInternalOptions(prepared, values), FBUtilities.nowInSeconds()).getPager(null, Server.CURRENT_VERSION);
         return UntypedResultSet.create(select, pager, pageSize);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index ea26f34..9d2fbec 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -325,7 +325,7 @@ public class ResultSet
 
                 PagingState state = null;
                 if (flags.contains(Flag.HAS_MORE_PAGES))
-                    state = PagingState.deserialize(CBUtil.readValue(body));
+                    state = PagingState.deserialize(CBUtil.readValue(body), version);
 
                 if (flags.contains(Flag.NO_METADATA))
                     return new ResultMetadata(flags, null, columnCount, state);
@@ -365,7 +365,7 @@ public class ResultSet
                 dest.writeInt(m.columnCount);
 
                 if (hasMorePages)
-                    CBUtil.writeValue(m.pagingState.serialize(), dest);
+                    CBUtil.writeValue(m.pagingState.serialize(version), dest);
 
                 if (!noMetadata)
                 {
@@ -397,7 +397,7 @@ public class ResultSet
 
                 int size = 8;
                 if (hasMorePages)
-                    size += CBUtil.sizeOfValue(m.pagingState.serialize());
+                    size += CBUtil.sizeOfValue(m.pagingState.serializedSize(version));
 
                 if (!noMetadata)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index cb6de2b..170bfdf 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -209,7 +209,7 @@ public class SelectStatement implements CQLStatement
         if (pageSize <= 0 || query.limits().count() <= pageSize)
             return execute(query, options, state, nowInSec);
 
-        QueryPager pager = query.getPager(options.getPagingState());
+        QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
         return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec);
     }
 
@@ -389,7 +389,7 @@ public class SelectStatement implements CQLStatement
             }
             else
             {
-                QueryPager pager = query.getPager(options.getPagingState());
+                QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
                 return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index 2fb92d9..6bffd45 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -24,7 +24,9 @@ import java.util.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -138,6 +140,20 @@ public class Clustering extends AbstractClusteringPrefix
             ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types);
         }
 
+        public ByteBuffer serialize(Clustering clustering, int version, List<AbstractType<?>> types)
+        {
+            try
+            {
+                DataOutputBuffer buffer = new DataOutputBuffer((int)serializedSize(clustering, version, types));
+                serialize(clustering, buffer, version, types);
+                return buffer.buffer();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Writting to an in-memory buffer shouldn't trigger an IOException", e);
+            }
+        }
+
         public long serializedSize(Clustering clustering, int version, List<AbstractType<?>> types)
         {
             return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types);
@@ -151,5 +167,18 @@ public class Clustering extends AbstractClusteringPrefix
             ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types);
             return new Clustering(values);
         }
+
+        public Clustering deserialize(ByteBuffer in, int version, List<AbstractType<?>> types)
+        {
+            try
+            {
+                DataInputBuffer buffer = new DataInputBuffer(in, true);
+                return deserialize(buffer, version, types);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Reading from an in-memory buffer shouldn't trigger an IOException", e);
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 2719105..d57bc6b 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -708,7 +708,7 @@ public abstract class LegacyLayout
 
         Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>()
         {
-            private final Iterator<Cell> cells = row.cellsInLegacyOrder(metadata).iterator();
+            private final Iterator<Cell> cells = row.cellsInLegacyOrder(metadata, false).iterator();
             // we don't have (and shouldn't have) row markers for compact tables.
             private boolean hasReturnedRowMarker = metadata.isCompactTable();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 965e9af..4e96d81 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -155,12 +155,12 @@ public class PartitionRangeReadCommand extends ReadCommand
         return StorageProxy.getRangeSlice(this, consistency);
     }
 
-    public QueryPager getPager(PagingState pagingState)
+    public QueryPager getPager(PagingState pagingState, int protocolVersion)
     {
         if (isNamesQuery())
-            return new RangeNamesQueryPager(this, pagingState);
+            return new RangeNamesQueryPager(this, pagingState, protocolVersion);
         else
-            return new RangeSliceQueryPager(this, pagingState);
+            return new RangeSliceQueryPager(this, pagingState, protocolVersion);
     }
 
     protected void recordLatency(TableMetrics metric, long latencyNanos)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index 3ad0f82..3abffd5 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -58,7 +58,7 @@ public interface ReadQuery
             return DataLimits.cqlLimits(0);
         }
 
-        public QueryPager getPager(PagingState state)
+        public QueryPager getPager(PagingState state, int protocolVersion)
         {
             return QueryPager.EMPTY;
         }
@@ -104,10 +104,11 @@ public interface ReadQuery
      *
      * @param pagingState the {@code PagingState} to start from if this is a paging continuation. This can be
      * {@code null} if this is the start of paging.
+     * @param protocolVersion the protocol version to use for the paging state of that pager.
      *
      * @return a pager for the query.
      */
-    public QueryPager getPager(PagingState pagingState);
+    public QueryPager getPager(PagingState pagingState, int protocolVersion);
 
     /**
      * The limits for the query.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index 1b41005..430e4a1 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.utils.memory.HeapAllocator;
 public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<ClusteringIndexNamesFilter>
 {
     private int oldestUnrepairedDeletionTime = Integer.MAX_VALUE;
+
     protected SinglePartitionNamesCommand(boolean isDigest,
                                           int digestVersion,
                                           boolean isForThrift,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index cd01748..49cf07c 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -230,14 +230,14 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
         return StorageProxy.read(Group.one(this), consistency, clientState);
     }
 
-    public SinglePartitionPager getPager(PagingState pagingState)
+    public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
     {
-        return getPager(this, pagingState);
+        return getPager(this, pagingState, protocolVersion);
     }
 
-    private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState)
+    private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion)
     {
-        return new SinglePartitionPager(command, pagingState);
+        return new SinglePartitionPager(command, pagingState, protocolVersion);
     }
 
     protected void recordLatency(TableMetrics metric, long latencyNanos)
@@ -495,12 +495,12 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
             return limits.filter(PartitionIterators.concat(partitions), nowInSec);
         }
 
-        public QueryPager getPager(PagingState pagingState)
+        public QueryPager getPager(PagingState pagingState, int protocolVersion)
         {
             if (commands.size() == 1)
-                return SinglePartitionReadCommand.getPager(commands.get(0), pagingState);
+                return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
 
-            return new MultiPartitionPager(this, pagingState);
+            return new MultiPartitionPager(this, pagingState, protocolVersion);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 1bc1162..a2a8c5f 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -392,9 +392,9 @@ public class BTreeRow extends AbstractRow
             ((ComplexColumnData) current).setValue(path, value);
     }
 
-    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata)
+    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata, boolean reversed)
     {
-        return () -> new CellInLegacyOrderIterator(metadata);
+        return () -> new CellInLegacyOrderIterator(metadata, reversed);
     }
 
     private class CellIterator extends AbstractIterator<Cell>
@@ -429,15 +429,17 @@ public class BTreeRow extends AbstractRow
     private class CellInLegacyOrderIterator extends AbstractIterator<Cell>
     {
         private final AbstractType<?> comparator;
+        private final boolean reversed;
         private final int firstComplexIdx;
         private int simpleIdx;
         private int complexIdx;
         private Iterator<Cell> complexCells;
         private final Object[] data;
 
-        private CellInLegacyOrderIterator(CFMetaData metadata)
+        private CellInLegacyOrderIterator(CFMetaData metadata, boolean reversed)
         {
             this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR);
+            this.reversed = reversed;
 
             // copy btree into array for simple separate iteration of simple and complex columns
             this.data = new Object[BTree.size(btree)];
@@ -448,6 +450,36 @@ public class BTreeRow extends AbstractRow
             this.complexIdx = firstComplexIdx;
         }
 
+        private int getSimpleIdx()
+        {
+            return reversed ? firstComplexIdx - simpleIdx - 1 : simpleIdx;
+        }
+
+        private int getSimpleIdxAndIncrement()
+        {
+            int idx = getSimpleIdx();
+            ++simpleIdx;
+            return idx;
+        }
+
+        private int getComplexIdx()
+        {
+            return reversed ? data.length - simpleIdx - 1 : simpleIdx;
+        }
+
+        private int getComplexIdxAndIncrement()
+        {
+            int idx = getComplexIdx();
+            ++complexIdx;
+            return idx;
+        }
+
+        private Iterator<Cell> makeComplexIterator(Object complexData)
+        {
+            ComplexColumnData ccd = (ComplexColumnData)complexData;
+            return reversed ? ccd.reverseIterator() : ccd.iterator();
+        }
+
         protected Cell computeNext()
         {
             while (true)
@@ -465,17 +497,17 @@ public class BTreeRow extends AbstractRow
                     if (complexIdx >= data.length)
                         return endOfData();
 
-                    complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+                    complexCells = makeComplexIterator(data[getComplexIdxAndIncrement()]);
                 }
                 else
                 {
                     if (complexIdx >= data.length)
-                        return (Cell)data[simpleIdx++];
+                        return (Cell)data[getSimpleIdxAndIncrement()];
 
-                    if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes, ((ColumnData) data[complexIdx]).column().name.bytes) < 0)
-                        return (Cell)data[simpleIdx++];
+                    if (comparator.compare(((ColumnData) data[getSimpleIdx()]).column().name.bytes, ((ColumnData) data[getComplexIdx()]).column().name.bytes) < 0)
+                        return (Cell)data[getSimpleIdxAndIncrement()];
                     else
-                        complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+                        complexCells = makeComplexIterator(data[getComplexIdxAndIncrement()]);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 76ab7e7..fab529b 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -102,6 +102,11 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
         return BTree.iterator(cells);
     }
 
+    public Iterator<Cell> reverseIterator()
+    {
+        return BTree.iterator(cells, BTree.Dir.DESC);
+    }
+
     public int dataSize()
     {
         int size = complexDeletion.dataSize();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index a80325f..8a67e9b 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -150,9 +150,10 @@ public interface Row extends Unfiltered, Collection<ColumnData>
      * legacy order. It's only ever meaningful for backward/thrift compatibility code.
      *
      * @param metadata the table this is a row of.
+     * @param reversed if cells should returned in reverse order.
      * @return an iterable over the cells of this row in "legacy order".
      */
-    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata);
+    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata, boolean reversed);
 
     /**
      * Whether the row stores any (non-live) complex deletion for any complex column.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index fb34ca9..28ec489 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.transport.Server;
 
 /**
  * A View copies data from a base table into a view table which can be queried independently from the
@@ -481,7 +482,7 @@ public class View
 
             if (!rowSet.hasTombstonedExisting())
             {
-                QueryPager pager = command.getPager(null);
+                QueryPager pager = command.getPager(null, Server.CURRENT_VERSION);
 
                 // Add all of the rows which were recovered from the query to the row set
                 while (!pager.isExhausted())
@@ -538,7 +539,7 @@ public class View
         for (TemporalRow temporalRow : rowSet)
             builder.addSlice(temporalRow.baseSlice());
 
-        QueryPager pager = builder.build().getPager(null);
+        QueryPager pager = builder.build().getPager(null, Server.CURRENT_VERSION);
 
         while (!pager.isExhausted())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 62aa332..f0b01c7 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Pair;
@@ -76,7 +77,7 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     private void buildKey(DecoratedKey key)
     {
-        QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
+        QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null, Server.CURRENT_VERSION);
 
         while (!pager.isExhausted())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 9991277..b92d1e1 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -32,6 +32,7 @@ abstract class AbstractQueryPager implements QueryPager
 {
     protected final ReadCommand command;
     protected final DataLimits limits;
+    protected final int protocolVersion;
 
     private int remaining;
 
@@ -43,9 +44,10 @@ abstract class AbstractQueryPager implements QueryPager
 
     private boolean exhausted;
 
-    protected AbstractQueryPager(ReadCommand command)
+    protected AbstractQueryPager(ReadCommand command, int protocolVersion)
     {
         this.command = command;
+        this.protocolVersion = protocolVersion;
         this.limits = command.limits();
 
         this.remaining = limits.count();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 5d06df7..ee2db9f 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -53,7 +53,7 @@ public class MultiPartitionPager implements QueryPager
     private int remaining;
     private int current;
 
-    public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state)
+    public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, int protocolVersion)
     {
         this.limit = group.limits();
         this.nowInSec = group.nowInSec();
@@ -74,11 +74,11 @@ public class MultiPartitionPager implements QueryPager
 
         pagers = new SinglePartitionPager[group.commands.size() - i];
         // 'i' is on the first non exhausted pager for the previous page (or the first one)
-        pagers[0] = group.commands.get(i).getPager(state);
+        pagers[0] = group.commands.get(i).getPager(state, protocolVersion);
 
         // Following ones haven't been started yet
         for (int j = i + 1; j < group.commands.size(); j++)
-            pagers[j - i] = group.commands.get(j).getPager(null);
+            pagers[j - i] = group.commands.get(j).getPager(null, protocolVersion);
 
         remaining = state == null ? limit.count() : state.remaining;
     }
@@ -90,7 +90,7 @@ public class MultiPartitionPager implements QueryPager
             return null;
 
         PagingState state = pagers[current].state();
-        return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining, Integer.MAX_VALUE);
+        return new PagingState(pagers[current].key(), state == null ? null : state.rowMark, remaining, Integer.MAX_VALUE);
     }
 
     public boolean isExhausted()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index 685dc3f..542b6d2 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -20,44 +20,72 @@ package org.apache.cassandra.service.pager;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.*;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.LegacyLayout;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class PagingState
 {
-    public final ByteBuffer partitionKey;
-    public final ByteBuffer cellName;
+    public final ByteBuffer partitionKey;  // Can be null for single partition queries.
+    public final RowMark rowMark;          // Can be null if not needed.
     public final int remaining;
     public final int remainingInPartition;
 
-    public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining, int remainingInPartition)
+    public PagingState(ByteBuffer partitionKey, RowMark rowMark, int remaining, int remainingInPartition)
     {
-        this.partitionKey = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
-        this.cellName = cellName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : cellName;
+        this.partitionKey = partitionKey;
+        this.rowMark = rowMark;
         this.remaining = remaining;
         this.remainingInPartition = remainingInPartition;
     }
 
-    public static PagingState deserialize(ByteBuffer bytes)
+    public static PagingState deserialize(ByteBuffer bytes, int protocolVersion)
     {
         if (bytes == null)
             return null;
 
         try
         {
-            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
-            ByteBuffer pk = ByteBufferUtil.readWithShortLength(in);
-            ByteBuffer cn = ByteBufferUtil.readWithShortLength(in);
-            int remaining = in.readInt();
-            // Note that while 'in.available()' is theoretically an estimate of how many bytes are available
-            // without blocking, we know that since we're reading a ByteBuffer it will be exactly how many
-            // bytes remain to be read. And the reason we want to condition this is for backward compatility
-            // as we used to not set this.
-            int remainingInPartition = in.available() > 0 ? in.readInt() : Integer.MAX_VALUE;
-            return new PagingState(pk, cn, remaining, remainingInPartition);
+            DataInputBuffer in = new DataInputBuffer(bytes, true);
+            ByteBuffer pk;
+            RowMark mark;
+            int remaining, remainingInPartition;
+            if (protocolVersion <= Server.VERSION_3)
+            {
+                pk = ByteBufferUtil.readWithShortLength(in);
+                mark = new RowMark(ByteBufferUtil.readWithShortLength(in), protocolVersion);
+                remaining = in.readInt();
+                // Note that while 'in.available()' is theoretically an estimate of how many bytes are available
+                // without blocking, we know that since we're reading a ByteBuffer it will be exactly how many
+                // bytes remain to be read. And the reason we want to condition this is for backward compatility
+                // as we used to not set this.
+                remainingInPartition = in.available() > 0 ? in.readInt() : Integer.MAX_VALUE;
+            }
+            else
+            {
+                pk = ByteBufferUtil.readWithVIntLength(in);
+                mark = new RowMark(ByteBufferUtil.readWithVIntLength(in), protocolVersion);
+                remaining = (int)in.readUnsignedVInt();
+                remainingInPartition = (int)in.readUnsignedVInt();
+            }
+            return new PagingState(pk.hasRemaining() ? pk : null,
+                                   mark.mark.hasRemaining() ? mark : null,
+                                   remaining,
+                                   remainingInPartition);
         }
         catch (IOException e)
         {
@@ -65,14 +93,27 @@ public class PagingState
         }
     }
 
-    public ByteBuffer serialize()
+    public ByteBuffer serialize(int protocolVersion)
     {
-        try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize()))
+        assert rowMark == null || protocolVersion == rowMark.protocolVersion;
+        try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize(protocolVersion)))
         {
-            ByteBufferUtil.writeWithShortLength(partitionKey, out);
-            ByteBufferUtil.writeWithShortLength(cellName, out);
-            out.writeInt(remaining);
-            out.writeInt(remainingInPartition);
+            ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
+            ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark;
+            if (protocolVersion <= Server.VERSION_3)
+            {
+                ByteBufferUtil.writeWithShortLength(pk, out);
+                ByteBufferUtil.writeWithShortLength(mark, out);
+                out.writeInt(remaining);
+                out.writeInt(remainingInPartition);
+            }
+            else
+            {
+                ByteBufferUtil.writeWithVIntLength(pk, out);
+                ByteBufferUtil.writeWithVIntLength(mark, out);
+                out.writeUnsignedVInt(remaining);
+                out.writeUnsignedVInt(remainingInPartition);
+            }
             return out.buffer();
         }
         catch (IOException e)
@@ -81,11 +122,42 @@ public class PagingState
         }
     }
 
-    private int serializedSize()
+    public int serializedSize(int protocolVersion)
     {
-        return 2 + partitionKey.remaining()
-             + 2 + cellName.remaining()
-             + 8; // remaining & remainingInPartition
+        assert rowMark == null || protocolVersion == rowMark.protocolVersion;
+        ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
+        ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark;
+        if (protocolVersion <= Server.VERSION_3)
+        {
+            return ByteBufferUtil.serializedSizeWithShortLength(pk)
+                 + ByteBufferUtil.serializedSizeWithShortLength(mark)
+                 + 8; // remaining & remainingInPartition
+        }
+        else
+        {
+            return ByteBufferUtil.serializedSizeWithVIntLength(pk)
+                 + ByteBufferUtil.serializedSizeWithVIntLength(mark)
+                 + TypeSizes.sizeofUnsignedVInt(remaining)
+                 + TypeSizes.sizeofUnsignedVInt(remainingInPartition);
+        }
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        return Objects.hash(partitionKey, rowMark, remaining, remainingInPartition);
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if(!(o instanceof PagingState))
+            return false;
+        PagingState that = (PagingState)o;
+        return Objects.equals(this.partitionKey, that.partitionKey)
+            && Objects.equals(this.rowMark, that.rowMark)
+            && this.remaining == that.remaining
+            && this.remainingInPartition == that.remainingInPartition;
     }
 
     @Override
@@ -93,8 +165,102 @@ public class PagingState
     {
         return String.format("PagingState(key=%s, cellname=%s, remaining=%d, remainingInPartition=%d",
                              ByteBufferUtil.bytesToHex(partitionKey),
-                             ByteBufferUtil.bytesToHex(cellName),
+                             rowMark,
                              remaining,
                              remainingInPartition);
     }
+
+    /**
+     * Marks the last row returned by paging, the one from which paging should continue.
+     * This class essentially holds a row clustering, but due to backward compatibility reasons,
+     * we need to actually store  the cell name for the last cell of the row we're marking when
+     * the protocol v3 is in use, and this class abstract that complication.
+     *
+     * See CASSANDRA-10254 for more details.
+     */
+    public static class RowMark
+    {
+        // This can be null for convenience if no row is marked.
+        private final ByteBuffer mark;
+        private final int protocolVersion;
+
+        private RowMark(ByteBuffer mark, int protocolVersion)
+        {
+            this.mark = mark;
+            this.protocolVersion = protocolVersion;
+        }
+
+        private static List<AbstractType<?>> makeClusteringTypes(CFMetaData metadata)
+        {
+            // This is the types that will be used when serializing the clustering in the paging state. We can't really use the actual clustering
+            // types however because we can't guarantee that there won't be a schema change between when we send the paging state and get it back,
+            // and said schema change could theoretically change one of the clustering types from a fixed width type to a non-fixed one
+            // (say timestamp -> blob). So we simply use a list of BytesTypes (for both reading and writting), which may be slightly inefficient
+            // for fixed-width types, but avoid any risk during schema changes.
+            int size = metadata.clusteringColumns().size();
+            List<AbstractType<?>> l = new ArrayList<>(size);
+            for (int i = 0; i < size; i++)
+                l.add(BytesType.instance);
+            return l;
+        }
+
+        public static RowMark create(CFMetaData metadata, Row row, int protocolVersion)
+        {
+            ByteBuffer mark;
+            if (protocolVersion <= Server.VERSION_3)
+            {
+                // We need to be backward compatible with 2.1/2.2 nodes paging states. Which means we have to send
+                // the full cellname of the "last" cell in the row we get (since that's how 2.1/2.2 nodes will start after
+                // that last row if they get that paging state).
+                Iterator<Cell> cells = row.cellsInLegacyOrder(metadata, true).iterator();
+                if (!cells.hasNext())
+                {
+                    mark = LegacyLayout.encodeClustering(metadata, row.clustering());
+                }
+                else
+                {
+                    Cell cell = cells.next();
+                    mark = LegacyLayout.encodeCellName(metadata, row.clustering(), cell.column().name.bytes, cell.column().isComplex() ? cell.path().get(0) : null);
+                }
+            }
+            else
+            {
+                // We froze the serialization version to 3.0 as we need to make this this doesn't change (that is, it has to be
+                // fix for a given version of the protocol).
+                mark = Clustering.serializer.serialize(row.clustering(), MessagingService.VERSION_30, makeClusteringTypes(metadata));
+            }
+            return new RowMark(mark, protocolVersion);
+        }
+
+        public Clustering clustering(CFMetaData metadata)
+        {
+            if (mark == null)
+                return null;
+
+            return protocolVersion <= Server.VERSION_3
+                 ? LegacyLayout.decodeClustering(metadata, mark)
+                 : Clustering.serializer.deserialize(mark, MessagingService.VERSION_30, makeClusteringTypes(metadata));
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            return Objects.hash(mark, protocolVersion);
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof RowMark))
+                return false;
+            RowMark that = (RowMark)o;
+            return Objects.equals(this.mark, that.mark) && this.protocolVersion == that.protocolVersion;
+        }
+
+        @Override
+        public String toString()
+        {
+            return ByteBufferUtil.bytesToHex(mark);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 618ca32..eee94e6 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Server;
 
 /**
  * Static utility methods for paging.
@@ -47,7 +48,7 @@ public class QueryPagers
                                  boolean isForThrift) throws RequestValidationException, RequestExecutionException
     {
         SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
-        final SinglePartitionPager pager = new SinglePartitionPager(command, null);
+        final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION);
 
         int count = 0;
         while (!pager.isExhausted())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
index e085490..9801565 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -35,9 +35,9 @@ public class RangeNamesQueryPager extends AbstractQueryPager
 {
     private volatile DecoratedKey lastReturnedKey;
 
-    public RangeNamesQueryPager(PartitionRangeReadCommand command, PagingState state)
+    public RangeNamesQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion)
     {
-        super(command);
+        super(command, protocolVersion);
         assert command.isNamesQuery();
 
         if (state != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 87eb018..770875a 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -39,19 +39,17 @@ public class RangeSliceQueryPager extends AbstractQueryPager
     private static final Logger logger = LoggerFactory.getLogger(RangeSliceQueryPager.class);
 
     private volatile DecoratedKey lastReturnedKey;
-    private volatile Clustering lastReturnedClustering;
+    private volatile PagingState.RowMark lastReturnedRow;
 
-    public RangeSliceQueryPager(PartitionRangeReadCommand command, PagingState state)
+    public RangeSliceQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion)
     {
-        super(command);
+        super(command, protocolVersion);
         assert !command.isNamesQuery();
 
         if (state != null)
         {
             lastReturnedKey = command.metadata().decorateKey(state.partitionKey);
-            lastReturnedClustering = state.cellName.hasRemaining()
-                                   ? LegacyLayout.decodeClustering(command.metadata(), state.cellName)
-                                   : null;
+            lastReturnedRow = state.rowMark;
             restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
         }
     }
@@ -60,7 +58,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
     {
         return lastReturnedKey == null
              ? null
-             : new PagingState(lastReturnedKey.getKey(), LegacyLayout.encodeClustering(command.metadata(), lastReturnedClustering), maxRemaining(), remainingInPartition());
+             : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition());
     }
 
     protected ReadCommand nextPageReadCommand(int pageSize)
@@ -81,7 +79,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
             AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey);
             if (includeLastKey)
             {
-                pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedClustering, false);
+                pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false);
                 limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
             }
             else
@@ -101,7 +99,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
         if (last != null)
         {
             lastReturnedKey = key;
-            lastReturnedClustering = last.clustering();
+            lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 28c5206..7057e79 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -37,18 +37,16 @@ public class SinglePartitionPager extends AbstractQueryPager
 
     private final SinglePartitionReadCommand<?> command;
 
-    private volatile Clustering lastReturned;
+    private volatile PagingState.RowMark lastReturned;
 
-    public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state)
+    public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state, int protocolVersion)
     {
-        super(command);
+        super(command, protocolVersion);
         this.command = command;
 
         if (state != null)
         {
-            lastReturned = state.cellName.hasRemaining()
-                         ? LegacyLayout.decodeClustering(command.metadata(), state.cellName)
-                         : null;
+            lastReturned = state.rowMark;
             restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
         }
     }
@@ -67,18 +65,18 @@ public class SinglePartitionPager extends AbstractQueryPager
     {
         return lastReturned == null
              ? null
-             : new PagingState(null, LegacyLayout.encodeClustering(command.metadata(), lastReturned), maxRemaining(), remainingInPartition());
+             : new PagingState(null, lastReturned, maxRemaining(), remainingInPartition());
     }
 
     protected ReadCommand nextPageReadCommand(int pageSize)
     {
-        return command.forPaging(lastReturned, pageSize);
+        return command.forPaging(lastReturned == null ? null : lastReturned.clustering(command.metadata()), pageSize);
     }
 
     protected void recordLast(DecoratedKey key, Row last)
     {
         if (last != null)
-            lastReturned = last.clustering();
+            lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
     }
 
     protected boolean isPreviouslyReturnedPartition(DecoratedKey key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 92e2891..800a9a8 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -430,6 +430,13 @@ public abstract class CBUtil
         return 4 + (bytes == null ? 0 : bytes.remaining());
     }
 
+    // The size of serializing a value given the size (in bytes) of said value. The provided size can be negative
+    // to indicate that the value is null.
+    public static int sizeOfValue(int valueSize)
+    {
+        return 4 + (valueSize < 0 ? 0 : valueSize);
+    }
+
     public static List<ByteBuffer> readValueList(ByteBuf cb, int protocolVersion)
     {
         int size = cb.readUnsignedShort();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/test/unit/org/apache/cassandra/service/QueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
index 228b2a9..0f79e84 100644
--- a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -37,8 +37,10 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.transport.Server;
 
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -205,10 +207,19 @@ public class QueryPagerTest
         }
     }
 
+    private QueryPager maybeRecreate(QueryPager pager, ReadQuery command, boolean testPagingState, int protocolVersion)
+    {
+        if (!testPagingState)
+            return pager;
+
+        PagingState state = PagingState.deserialize(pager.state().serialize(protocolVersion), protocolVersion);
+        return command.getPager(state, protocolVersion);
+    }
+
     @Test
     public void namesQueryTest() throws Exception
     {
-        QueryPager pager = namesQuery("k0", "c1", "c5", "c7", "c8").getPager(null);
+        QueryPager pager = namesQuery("k0", "c1", "c5", "c7", "c8").getPager(null, Server.CURRENT_VERSION);
 
         assertFalse(pager.isExhausted());
         List<FilteredPartition> partition = query(pager, 5, 4);
@@ -220,16 +231,29 @@ public class QueryPagerTest
     @Test
     public void sliceQueryTest() throws Exception
     {
-        QueryPager pager = sliceQuery("k0", "c1", "c8", 10).getPager(null);
+        sliceQueryTest(false, Server.VERSION_3);
+        sliceQueryTest(true, Server.VERSION_4);
+        sliceQueryTest(false, Server.VERSION_3);
+        sliceQueryTest(true, Server.VERSION_4);
+    }
+
+    public void sliceQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+    {
+        ReadCommand command = sliceQuery("k0", "c1", "c8", 10);
+        QueryPager pager = command.getPager(null, protocolVersion);
 
         assertFalse(pager.isExhausted());
         List<FilteredPartition> partition = query(pager, 3);
         assertRow(partition.get(0), "k0", "c1", "c2", "c3");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partition = query(pager, 3);
         assertRow(partition.get(0), "k0", "c4", "c5", "c6");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partition = query(pager, 3, 2);
         assertRow(partition.get(0), "k0", "c7", "c8");
@@ -240,16 +264,29 @@ public class QueryPagerTest
     @Test
     public void reversedSliceQueryTest() throws Exception
     {
-        QueryPager pager = sliceQuery("k0", "c1", "c8", true, 10).getPager(null);
+        reversedSliceQueryTest(false, Server.VERSION_3);
+        reversedSliceQueryTest(true, Server.VERSION_4);
+        reversedSliceQueryTest(false, Server.VERSION_3);
+        reversedSliceQueryTest(true, Server.VERSION_4);
+    }
+
+    public void reversedSliceQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+    {
+        ReadCommand command = sliceQuery("k0", "c1", "c8", true, 10);
+        QueryPager pager = command.getPager(null, protocolVersion);
 
         assertFalse(pager.isExhausted());
         List<FilteredPartition> partition = query(pager, 3);
         assertRow(partition.get(0), "k0", "c6", "c7", "c8");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partition = query(pager, 3);
         assertRow(partition.get(0), "k0", "c3", "c4", "c5");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partition = query(pager, 3, 2);
         assertRow(partition.get(0), "k0", "c1", "c2");
@@ -260,21 +297,34 @@ public class QueryPagerTest
     @Test
     public void multiQueryTest() throws Exception
     {
-        QueryPager pager = new SinglePartitionReadCommand.Group(new ArrayList<SinglePartitionReadCommand<?>>()
+        multiQueryTest(false, Server.VERSION_3);
+        multiQueryTest(true, Server.VERSION_4);
+        multiQueryTest(false, Server.VERSION_3);
+        multiQueryTest(true, Server.VERSION_4);
+    }
+
+    public void multiQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+    {
+        ReadQuery command = new SinglePartitionReadCommand.Group(new ArrayList<SinglePartitionReadCommand<?>>()
         {{
             add(sliceQuery("k1", "c2", "c6", 10));
             add(sliceQuery("k4", "c3", "c5", 10));
-        }}, DataLimits.NONE).getPager(null);
+        }}, DataLimits.NONE);
+        QueryPager pager = command.getPager(null, protocolVersion);
 
         assertFalse(pager.isExhausted());
         List<FilteredPartition> partition = query(pager, 3);
         assertRow(partition.get(0), "k1", "c2", "c3", "c4");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partition = query(pager , 4);
         assertRow(partition.get(0), "k1", "c5", "c6");
         assertRow(partition.get(1), "k4", "c3", "c4");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partition = query(pager, 3, 1);
         assertRow(partition.get(0), "k4", "c5");
@@ -285,13 +335,24 @@ public class QueryPagerTest
     @Test
     public void rangeNamesQueryTest() throws Exception
     {
-        QueryPager pager = rangeNamesQuery("k0", "k5", 100, "c1", "c4", "c8").getPager(null);
+        rangeNamesQueryTest(false, Server.VERSION_3);
+        rangeNamesQueryTest(true, Server.VERSION_4);
+        rangeNamesQueryTest(false, Server.VERSION_3);
+        rangeNamesQueryTest(true, Server.VERSION_4);
+    }
+
+    public void rangeNamesQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+    {
+        ReadCommand command = rangeNamesQuery("k0", "k5", 100, "c1", "c4", "c8");
+        QueryPager pager = command.getPager(null, protocolVersion);
 
         assertFalse(pager.isExhausted());
         List<FilteredPartition> partitions = query(pager, 3 * 3);
         for (int i = 1; i <= 3; i++)
             assertRow(partitions.get(i-1), "k" + i, "c1", "c4", "c8");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partitions = query(pager, 3 * 3, 2 * 3);
         for (int i = 4; i <= 5; i++)
@@ -303,31 +364,50 @@ public class QueryPagerTest
     @Test
     public void rangeSliceQueryTest() throws Exception
     {
-        QueryPager pager = rangeSliceQuery("k1", "k5", 100, "c1", "c7").getPager(null);
+        rangeSliceQueryTest(false, Server.VERSION_3);
+        rangeSliceQueryTest(true, Server.VERSION_4);
+        rangeSliceQueryTest(false, Server.VERSION_3);
+        rangeSliceQueryTest(true, Server.VERSION_4);
+    }
+
+    public void rangeSliceQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+    {
+        ReadCommand command = rangeSliceQuery("k1", "k5", 100, "c1", "c7");
+        QueryPager pager = command.getPager(null, protocolVersion);
 
         assertFalse(pager.isExhausted());
         List<FilteredPartition> partitions = query(pager, 5);
         assertRow(partitions.get(0), "k2", "c1", "c2", "c3", "c4", "c5");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partitions = query(pager, 4);
         assertRow(partitions.get(0), "k2", "c6", "c7");
         assertRow(partitions.get(1), "k3", "c1", "c2");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partitions = query(pager, 6);
         assertRow(partitions.get(0), "k3", "c3", "c4", "c5", "c6", "c7");
         assertRow(partitions.get(1), "k4", "c1");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partitions = query(pager, 5);
         assertRow(partitions.get(0), "k4", "c2", "c3", "c4", "c5", "c6");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partitions = query(pager, 5);
         assertRow(partitions.get(0), "k4", "c7");
         assertRow(partitions.get(1), "k5", "c1", "c2", "c3", "c4");
+        assertFalse(pager.isExhausted());
 
+        pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
         assertFalse(pager.isExhausted());
         partitions = query(pager, 5, 3);
         assertRow(partitions.get(0), "k5", "c5", "c6", "c7");
@@ -335,7 +415,6 @@ public class QueryPagerTest
         assertTrue(pager.isExhausted());
     }
 
-
     @Test
     public void SliceQueryWithTombstoneTest() throws Exception
     {
@@ -350,7 +429,7 @@ public class QueryPagerTest
 
         ReadCommand command = SinglePartitionSliceCommand.create(cfs.metadata, FBUtilities.nowInSeconds(), Util.dk("k0"), Slice.ALL);
 
-        QueryPager pager = command.getPager(null);
+        QueryPager pager = command.getPager(null, Server.CURRENT_VERSION);
 
         for (int i = 0; i < 5; i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java b/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
new file mode 100644
index 0000000..ba82e85
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
@@ -0,0 +1,99 @@
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PagingStateTest
+{
+    private PagingState makeSomePagingState(int protocolVersion)
+    {
+        CFMetaData metadata = CFMetaData.Builder.create("ks", "tbl")
+                                                .addPartitionKey("k", AsciiType.instance)
+                                                .addClusteringColumn("c1", AsciiType.instance)
+                                                .addClusteringColumn("c1", Int32Type.instance)
+                                                .addRegularColumn("myCol", AsciiType.instance)
+                                                .build();
+
+        ByteBuffer pk = ByteBufferUtil.bytes("someKey");
+
+        ColumnDefinition def = metadata.getColumnDefinition(new ColumnIdentifier("myCol", false));
+        Clustering c = new Clustering(ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes(42));
+        Row row = BTreeRow.singleCellRow(c, BufferCell.live(metadata, def, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+        PagingState.RowMark mark = PagingState.RowMark.create(metadata, row, protocolVersion);
+        return new PagingState(pk, mark, 10, 0);
+    }
+
+    @Test
+    public void testSerializationBackwardCompatibility()
+    {
+        /*
+         * Tests that the serialized paging state for the native protocol V3 is backward compatible
+         * with what old nodes generate. For that, it compares the serialized format to the hard-coded
+         * value of the same state generated on a 2.1. For the curious, said hardcoded value has been
+         * generated by the following code:
+         *     ByteBuffer pk = ByteBufferUtil.bytes("someKey");
+         *     CellName cn = CellNames.compositeSparse(new ByteBuffer[]{ ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes(42) },
+         *                                             new ColumnIdentifier("myCol", false),
+         *                                             false);
+         *     PagingState state = new PagingState(pk, cn.toByteBuffer(), 10);
+         *     System.out.println("PagingState = " + ByteBufferUtil.bytesToHex(state.serialize()));
+         */
+        PagingState state = makeSomePagingState(Server.VERSION_3);
+
+        String serializedState = ByteBufferUtil.bytesToHex(state.serialize(Server.VERSION_3));
+        // Note that we don't assert exact equality because we know 3.0 nodes include the "remainingInPartition" number
+        // that is not present on 2.1/2.2 nodes. We know this is ok however because we know that 2.1/2.2 nodes will ignore
+        // anything remaining once they have properly deserialized a paging state.
+        assertTrue(serializedState.startsWith("0007736f6d654b65790014000263310000040000002a0000056d79636f6c000000000a"));
+    }
+
+    @Test
+    public void testSerializeDeserializeV3()
+    {
+        PagingState state = makeSomePagingState(Server.VERSION_3);
+        ByteBuffer serialized = state.serialize(Server.VERSION_3);
+        assertEquals(serialized.remaining(), state.serializedSize(Server.VERSION_3));
+        assertEquals(state, PagingState.deserialize(serialized, Server.VERSION_3));
+    }
+
+    @Test
+    public void testSerializeDeserializeV4()
+    {
+        PagingState state = makeSomePagingState(Server.VERSION_4);
+        ByteBuffer serialized = state.serialize(Server.VERSION_4);
+        assertEquals(serialized.remaining(), state.serializedSize(Server.VERSION_4));
+        assertEquals(state, PagingState.deserialize(serialized, Server.VERSION_4));
+    }
+}


Mime
View raw message