cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/2] git commit: Allow coordinator failover for cursors
Date Tue, 02 Jul 2013 18:51:16 GMT
Allow coordinator failover for cursors

patch by slebresne; reviewed by iamaleskey for CASSANDRA-5714


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b068a9c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b068a9c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b068a9c4

Branch: refs/heads/trunk
Commit: b068a9c4fa80ed85c463c6cb95102d2233fdf161
Parents: dad2f11
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon Jul 1 19:05:25 2013 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Tue Jul 2 20:49:55 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 doc/native_protocol_v2.spec                     |  81 ++++++-------
 src/java/org/apache/cassandra/auth/Auth.java    |   3 +-
 .../cassandra/auth/CassandraAuthorizer.java     |   3 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   3 +-
 .../org/apache/cassandra/cql3/CQLStatement.java |   5 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |  15 +--
 .../org/apache/cassandra/cql3/ResultSet.java    |  25 +++-
 .../statements/AuthenticationStatement.java     |   3 +-
 .../cql3/statements/AuthorizationStatement.java |   3 +-
 .../cql3/statements/BatchStatement.java         |   3 +-
 .../cql3/statements/ModificationStatement.java  |   3 +-
 .../statements/SchemaAlteringStatement.java     |   3 +-
 .../cql3/statements/SelectStatement.java        |  38 ++----
 .../cql3/statements/TruncateStatement.java      |   3 +-
 .../cassandra/cql3/statements/UseStatement.java |   3 +-
 .../apache/cassandra/service/QueryState.java    |  55 +--------
 .../service/pager/AbstractQueryPager.java       |   6 +
 .../service/pager/MultiPartitionPager.java      |  20 +++-
 .../service/pager/NamesQueryPager.java          |  10 ++
 .../cassandra/service/pager/PagingState.java    |  86 ++++++++++++++
 .../cassandra/service/pager/QueryPager.java     |   8 +-
 .../cassandra/service/pager/QueryPagers.java    |  27 +++--
 .../service/pager/RangeNamesQueryPager.java     |  21 +++-
 .../service/pager/RangeSliceQueryPager.java     |  22 +++-
 .../service/pager/SliceQueryPager.java          |  18 +++
 .../cassandra/thrift/CassandraServer.java       |   3 +-
 .../org/apache/cassandra/transport/Client.java  |  13 --
 .../org/apache/cassandra/transport/Message.java |   3 +-
 .../cassandra/transport/ServerConnection.java   |  15 +--
 .../transport/messages/ExecuteMessage.java      |  26 ++--
 .../transport/messages/NextMessage.java         | 118 -------------------
 .../transport/messages/QueryMessage.java        |  27 +++--
 33 files changed, 340 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a50038d..8b12243 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -70,6 +70,7 @@
  * Ensure changing column_index_size_in_kb on different nodes don't corrupt the
    sstable (CASSANDRA-5454)
  * Move resultset type information into prepare, not execute (CASSANDRA-5649)
+ * Auto paging in binary protocol (CASSANDRA-4415, 5714)
 
 
 1.2.7

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index 196b4f5..ae730eb 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -22,7 +22,6 @@ Table of Contents
       4.1.6. EXECUTE
       4.1.7. BATCH
       4.1.8. REGISTER
-      4.1.9. NEXT
     4.2. Responses
       4.2.1. ERROR
       4.2.2. READY
@@ -167,7 +166,6 @@ Table of Contents
     0x0E    AUTH_CHALLENGE
     0x0F    AUTH_RESPONSE
     0x10    AUTH_SUCCESS
-    0x11    NEXT
 
   Messages are described in Section 4.
 
@@ -279,7 +277,7 @@ Table of Contents
 4.1.4. QUERY
 
   Performs a CQL query. The body of the message must be:
-    <query><consistency><flags>[<result_page_size>][<n><value_1>...<value_n>]
+    <query><consistency><flags>[<result_page_size>][<n><value_1>...<value_n>][<paging_state>]
   where:
     - <flags> is a [byte] whose bits define the options for this query and
       in particular influence what the remainder of the message contains.
@@ -294,6 +292,12 @@ Table of Contents
         0x04: Skip_metadata. If present, the Result Set returned as a response
               to that query (if any) will have the NO_METADATA flag (see
               Section 4.2.5.2).
+        0x04: With_paging_state. If present, <paging_state> should be present.
+              <paging_state> is a [bytes] value that should have been returned
+              in a result set (Section 4.2.5.2). If provided, the query will be
+              executed but starting from a given paging state. This also to
+              continue paging on a different node from the one it has been
+              started (See Section 7 for more details).
     - <query> the query, [long string].
     - <consistency> is the [consistency] level for the operation.
 
@@ -316,7 +320,7 @@ Table of Contents
 4.1.6. EXECUTE
 
   Executes a prepared query. The body of the message must be:
-    <id><n><value_1>....<value_n><consistency><flags>[<result_page_size>]
+    <id><n><value_1>....<value_n><consistency><flags>[<result_page_size>][<paging_state>]
   where:
     - <id> is the prepared query ID. It's the [short bytes] returned as a
       response to a PREPARE message.
@@ -334,6 +338,12 @@ Table of Contents
         0x02: Skip metadata. If present, the Result Set returned as a response
               to that query (if any) will have the NO_METADATA flag (see
               Section 4.2.5.2).
+        0x03: With_paging_state. If present, <paging_state> should be present.
+              <paging_state> is a [bytes] value that should have been returned
+              in a result set (Section 4.2.5.2). If provided, the query will be
+              executed but starting from a given paging state. This also to
+              continue paging on a different node from the one it has been
+              started (See Section 7 for more details).
 
   Note that the consistency is ignored by some (prepared) queries (USE, CREATE,
   ALTER, TRUNCATE, ...).
@@ -384,17 +394,6 @@ Table of Contents
   multiple times the same event messages, wasting bandwidth.
 
 
-4.1.9. NEXT
-
-  Request the next page of result if paging was requested by a QUERY or EXECUTE
-  statement and there is more result to fetch (see Section 7 for more details).
-  The body of a NEXT message is a single [int] indicating the number of maximum
-  rows to return with the next page of results (it is equivalent to the
-  <result_page_size> in a QUERY or EXECUTE message).
-
-  The result to a NEXT message will be a RESULT message.
-
-
 4.2. Responses
 
   This section describes the content of the frame body for the different
@@ -476,7 +475,7 @@ Table of Contents
     <metadata><rows_count><rows_content>
   where:
     - <metadata> is composed of:
-        <flags><columns_count>[<global_table_spec>?<col_spec_1>...<col_spec_n>]
+        <flags><columns_count>[<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
       where:
         - <flags> is an [int]. The bits of <flags> provides information on the
           formatting of the remaining informations. A flag is set if the bit
@@ -486,14 +485,15 @@ Table of Contents
                       and table name) is provided as <global_table_spec>. If not
                       set, <global_table_spec> is not present.
             0x0002    Has_more_pages: indicates whether this is not the last
-                      page of results and more should be retrieve using a NEXT
-                      message. If not set, this is the laste "page" of result
-                      and NEXT cannot and should not be used. If no result
-                      paging has been requested in the QUERY/EXECUTE/BATCH
-                      message, this will never be set.
+                      page of results and more should be retrieve. If set, the
+                      <paging_state> will be present. The <paging_state> is a
+                      [bytes] value that should be used in QUERY/EXECUTE to
+                      continue paging and retrieve the remained of the result for
+                      this query (See Section 7 for more details).
             0x0003    No_metadata: if set, the <metadata> is only composed of
-                      these <flags> and the <column_count> but no other
-                      information (so no <global_table_spec> nor <col_spec_i>).
+                      these <flags>, the <column_count> and optionally the
+                      <paging_state> (depending on the Has_more_pages flage) but
+                      no other information (so no <global_table_spec> nor <col_spec_i>).
                       This will only ever be the case if this was requested
                       during the query (see QUERY and RESULT messages).
         - <columns_count> is an [int] representing the number of columns selected
@@ -555,7 +555,8 @@ Table of Contents
     <id><metadata><result_metadata>
   where:
     - <id> is [short bytes] representing the prepared query ID.
-    - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2) and
+    - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2; you
+      can however assume that the Has_more_pages flag is always off) and
       is the specification for the variable bound in this prepare statement.
     - <result_metadata> is defined exactly as <metadata> but correspond to the
       metadata for the resultSet that execute this query will yield. Note that
@@ -691,33 +692,22 @@ Table of Contents
   <result_page_size> first rows of the query result. If that first page of result
   contains the full result set for the query, the RESULT message (of kind `Rows`)
   will have the Has_more_pages flag *not* set. However, if some results are not
-  part of the first response, the Has_more_pages flag will be set. In that latter
-  case, more rows of the result can be retrieved by sending a NEXT message *with the
-  same stream id than the initial query*. The NEXT message also contains its own
-  <result_page_size> that control how many of the remaining result rows will be
-  sent in response. If the response to this NEXT message still does not contains
-  the full remainder of the query result set (the Has_more_pages is set once more),
-  another NEXT message can be send for more result, etc...
-
-  If a RESULT message has the Has_more_pages flag set and any other message than
-  a NEXT message is send on the same stream id, the query is cancelled and no more
-  of its result can be retrieved.
+  part of the first response, the Has_more_pages flag will be set and the result
+  will contain a <paging_state> value. In that case, the <paging_state> value
+  should be used in a QUERY or EXECUTE message (that has the *same* query than
+  the original one or the behavior is undefined) to retrieve the next page of
+  results.
 
   Only CQL3 queries that return a result set (RESULT message with a Rows `kind`)
   support paging. For other type of queries, the <result_page_size> value is
   ignored.
 
-  The <result_page_size> can be set to a negative value to disable paging (in
-  which case the whole result set will be retuned in the first RESULT message,
-  message that will not have the Has_more_pages flag set). The
-  <result_page_size> value cannot be 0.
-
   Note to client implementors:
   - While <result_page_size> can be as low as 1, it will likely be detrimental
     to performance to pick a value too low. A value below 100 is probably too
     low for most use cases.
   - Clients should not rely on the actual size of the result set returned to
-    decide if a NEXT message should be issued. Instead, they should always
+    decide if there is more result to fetch or not. Instead, they should always
     check the Has_more_pages flag (unless they did not enabled paging for the query
     obviously). Clients should also not assert that no result will have more than
     <result_page_size> results. While the current implementation always respect
@@ -830,7 +820,8 @@ Table of Contents
     through the new AUTH_RESPONSE/AUTH_CHALLENGE messages). See Section 4.2.3 for
     details.
   * Query paging has been added (Section 7): QUERY and EXECUTE message have an
-    additional <result_page_size> [int], a new NEXT message has been added and
-    the Rows kind of RESULT message has an additional flag. Note that paging is
-    optional, and a client that don't want to handle it can always pass -1 for
-    the <result_page_size> in QUERY and EXECUTE.
+    additional <result_page_size> [int] and <paging_state> [bytes], and
+    the Rows kind of RESULT message has an additional flag and <paging_state> 
+    value. Note that paging is optional, and a client that do not want to handle
+    can simply avoid including the Page_size flag and parameter in QUERY and
+    EXECUTE.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 559e10c..53debce 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -233,7 +233,8 @@ public class Auth
             ResultMessage.Rows rows = selectUserStatement.execute(consistencyForUser(username),
                                                                   new QueryState(new ClientState(true)),
                                                                   Lists.newArrayList(ByteBufferUtil.bytes(username)),
-                                                                  -1);
+                                                                  -1,
+                                                                  null);
             return new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 6f490f8..d07c2c8 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -75,7 +75,8 @@ public class CassandraAuthorizer implements IAuthorizer
                                                                  new QueryState(new ClientState(true)),
                                                                  Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
                                                                                     ByteBufferUtil.bytes(resource.getName())),
-                                                                 -1);
+                                                                 -1,
+                                                                 null);
             result = new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 4d37b7e..87539f2 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -109,7 +109,8 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
             ResultMessage.Rows rows = authenticateStatement.execute(consistencyForUser(username),
                                                                     new QueryState(new ClientState(true)),
                                                                     Lists.newArrayList(ByteBufferUtil.bytes(username)),
-                                                                    -1);
+                                                                    -1,
+                                                                    null);
             result = new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index a4abaf1..2f4dd65 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.exceptions.*;
 
 public interface CQLStatement
@@ -57,8 +58,10 @@ public interface CQLStatement
      * can assume that each bound term have a corresponding value.
      * @param pageSize the initial page size for the result set potentially returned. A negative value
      * means no paging needs to be done. Statements that do not return result sets can ignore this value.
+     * @param pageState the paging state for paged query. All statement except Select should ignore
+     * that value.
      */
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pageState) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Variante of execute used for internal query against the system tables, and thus only query the local node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 476ca88..75c5f67 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MD5Digest;
@@ -106,30 +107,30 @@ public class QueryProcessor
         }
     }
 
-    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
+    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize, PagingState pageState)
     throws RequestExecutionException, RequestValidationException
     {
         logger.trace("Process {} @CL.{}", statement, cl);
         ClientState clientState = queryState.getClientState();
         statement.checkAccess(clientState);
         statement.validate(clientState);
-        ResultMessage result = statement.execute(cl, queryState, variables, pageSize);
+        ResultMessage result = statement.execute(cl, queryState, variables, pageSize, pageState);
         return result == null ? new ResultMessage.Void() : result;
     }
 
     public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
     throws RequestExecutionException, RequestValidationException
     {
-        return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState, -1);
+        return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState, -1, null);
     }
 
-    public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState, int pageSize)
+    public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState, int pageSize, PagingState pageState)
     throws RequestExecutionException, RequestValidationException
     {
         CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
         if (prepared.getBoundsTerms() != variables.size())
             throw new InvalidRequestException("Invalid amount of bind variables");
-        return processStatement(prepared, cl, queryState, variables, pageSize);
+        return processStatement(prepared, cl, queryState, variables, pageSize, pageState);
     }
 
     public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
@@ -228,7 +229,7 @@ public class QueryProcessor
         }
     }
 
-    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
+    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize, PagingState pageState)
     throws RequestExecutionException, RequestValidationException
     {
         // Check to see if there are any bound variables to verify
@@ -246,7 +247,7 @@ public class QueryProcessor
                     logger.trace("[{}] '{}'", i+1, variables.get(i));
         }
 
-        return processStatement(statement, cl, queryState, variables, pageSize);
+        return processStatement(statement, cl, queryState, variables, pageSize, pageState);
     }
 
     public static ResultMessage processBatch(BatchStatement batch, ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 8d6995f..21ef8e8 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlResultType;
 import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.service.pager.PagingState;
 
 public class ResultSet
 {
@@ -222,6 +223,7 @@ public class ResultSet
         public final EnumSet<Flag> flags;
         public final List<ColumnSpecification> names;
         public final int columnCount;
+        public PagingState pagingState;
 
         public Metadata(List<ColumnSpecification> names)
         {
@@ -262,9 +264,11 @@ public class ResultSet
             return true;
         }
 
-        public void setHasMorePages()
+        public Metadata setHasMorePages(PagingState pagingState)
         {
             flags.add(Flag.HAS_MORE_PAGES);
+            this.pagingState = pagingState;
+            return this;
         }
 
         @Override
@@ -299,8 +303,13 @@ public class ResultSet
                 int columnCount = body.readInt();
 
                 EnumSet<Flag> flags = Flag.deserialize(iflags);
+
+                PagingState state = null;
+                if (flags.contains(Flag.HAS_MORE_PAGES))
+                    state = PagingState.deserialize(CBUtil.readValue(body));
+
                 if (flags.contains(Flag.NO_METADATA))
-                    return new Metadata(flags, columnCount);
+                    return new Metadata(flags, columnCount).setHasMorePages(state);
 
                 boolean globalTablesSpec = flags.contains(Flag.GLOBAL_TABLES_SPEC);
 
@@ -322,26 +331,30 @@ public class ResultSet
                     AbstractType type = DataType.toType(DataType.codec.decodeOne(body));
                     names.add(new ColumnSpecification(ksName, cfName, colName, type));
                 }
-                return new Metadata(flags, names);
+                return new Metadata(flags, names).setHasMorePages(state);
             }
 
             public ChannelBuffer encode(Metadata m, int version)
             {
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
+                boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
 
-                int stringCount = noMetadata ? 0 : (globalTablesSpec ? 2 + columnCount : 3 * columnCount);
-                CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(1 + (noMetadata : 0 ? columnCount), stringCount, 0);
+                int stringCount = noMetadata ? 0 : (globalTablesSpec ? 2 + m.columnCount : 3 * m.columnCount);
+                CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(1 + (noMetadata ? 0 : m.columnCount), stringCount, hasMorePages ? 1 : 0);
 
                 ChannelBuffer header = ChannelBuffers.buffer(8);
 
                 assert version > 1 || (!m.flags.contains(Flag.HAS_MORE_PAGES) && !noMetadata);
 
                 header.writeInt(Flag.serialize(m.flags));
-                header.writeInt(columnCount);
+                header.writeInt(m.columnCount);
 
                 builder.add(header);
 
+                if (hasMorePages)
+                    builder.addValue(m.pagingState == null ? null : m.pagingState.serialize());
+
                 if (!noMetadata)
                 {
                     if (globalTablesSpec)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 97d7be5..eecc73a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
 public abstract class AuthenticationStatement extends ParsedStatement implements CQLStatement
@@ -40,7 +41,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState)
     throws RequestExecutionException, RequestValidationException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index 5e317aa..cb3baa3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
 public abstract class AuthorizationStatement extends ParsedStatement implements CQLStatement
@@ -41,7 +42,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState)
     throws RequestValidationException, RequestExecutionException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index dd15270..90dc764 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.Pair;
 
@@ -133,7 +134,7 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 85728bc..b373cd5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.Pair;
@@ -332,7 +333,7 @@ public abstract class ModificationStatement implements CQLStatement
         return ifNotExists || (columnConditions != null && !columnConditions.isEmpty());
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize, PagingState pagingState)
     throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 1c5f051..99daffa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
 /**
@@ -68,7 +69,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
     public void validate(ClientState state) throws RequestValidationException
     {}
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws RequestValidationException
     {
         announceMigration();
         String tableName = cfName == null || columnFamily() == null ? "" : columnFamily();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 994c331..d21bbd1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -39,9 +39,7 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.RangeSliceVerbHandler;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.Pageable;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.service.pager.QueryPagers;
+import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
@@ -140,7 +138,7 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
@@ -165,10 +163,14 @@ public class SelectStatement implements CQLStatement
         }
         else
         {
-            QueryPager pager = QueryPagers.pager(command, cl);
-            return parameters.isCount
-                 ? pageCountQuery(pager, variables, pageSize)
-                 : setupPaging(pager, state, variables, limit, pageSize);
+            QueryPager pager = QueryPagers.pager(command, cl, pagingState);
+            if (parameters.isCount)
+                return pageCountQuery(pager, variables, pageSize, now);
+
+            List<Row> page = pager.fetchPage(pageSize);
+            ResultMessage.Rows msg = processResults(page, variables, limit, now);
+            msg.result.metadata.setHasMorePages(pager.state());
+            return msg;
         }
     }
 
@@ -181,29 +183,13 @@ public class SelectStatement implements CQLStatement
         return processResults(rows, variables, limit, now);
     }
 
-    // TODO: we could probably refactor processResults so it doesn't needs the variables, so we don't have to keep around. But that can wait.
-    private ResultMessage.Rows setupPaging(QueryPager pager, QueryState state, List<ByteBuffer> variables, int limit, int pageSize) throws RequestValidationException, RequestExecutionException
-    {
-        List<Row> page = pager.fetchPage(pageSize);
-
-        ResultMessage.Rows msg = processResults(page, variables, limit, pager.timestamp());
-
-        // Don't bother setting up the pager if we actually don't need to.
-        if (pager.isExhausted())
-            return msg;
-
-        state.attachPager(pager, this, variables);
-        msg.result.metadata.setHasMorePages();
-        return msg;
-    }
-
-    private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize) throws RequestValidationException, RequestExecutionException
+    private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize, long now) throws RequestValidationException, RequestExecutionException
     {
         int count = 0;
         while (!pager.isExhausted())
         {
             int maxLimit = pager.maxRemaining();
-            ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, pager.timestamp());
+            ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, now);
             count += rset.rows.size();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index a10415a..c66608b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.thrift.ThriftValidation;
 
 public class TruncateStatement extends CFStatement implements CQLStatement
@@ -54,7 +55,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws InvalidRequestException, TruncateException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws InvalidRequestException, TruncateException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 4806314..c2e3c34 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 
 public class UseStatement extends ParsedStatement implements CQLStatement
 {
@@ -51,7 +52,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws InvalidRequestException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws InvalidRequestException
     {
         state.getClientState().setKeyspace(keyspace);
         return new ResultMessage.SetKeyspace(keyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 38d103d..c8e2e8d 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -38,7 +38,6 @@ public class QueryState
     private final ClientState clientState;
     private volatile long clock;
     private volatile UUID preparedTracingSession;
-    private volatile Pager pager;
 
     public QueryState(ClientState clientState)
     {
@@ -77,13 +76,6 @@ public class QueryState
         this.preparedTracingSession = sessionId;
     }
 
-    public UUID getAndResetCurrentTracingSession()
-    {
-        UUID previous = preparedTracingSession;
-        preparedTracingSession = null;
-        return previous;
-    }
-
     public void createTracingSession()
     {
         if (this.preparedTracingSession == null)
@@ -93,53 +85,8 @@ public class QueryState
         else
         {
             UUID session = this.preparedTracingSession;
+            this.preparedTracingSession = null;
             Tracing.instance.newSession(session);
         }
     }
-
-    public void attachPager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
-    {
-        pager = new Pager(queryPager, statement, variables);
-    }
-
-    public boolean hasPager()
-    {
-        return pager != null;
-    }
-
-    public void dropPager()
-    {
-        pager = null;
-    }
-
-    public ResultMessage.Rows getNextPage(int pageSize) throws RequestValidationException, RequestExecutionException
-    {
-        assert pager != null; // We've already validated (in ServerConnection) that this should not be null
-
-        int currentLimit = pager.queryPager.maxRemaining();
-        List<Row> page = pager.queryPager.fetchPage(pageSize);
-        ResultMessage.Rows msg = pager.statement.processResults(page, pager.variables, currentLimit, pager.queryPager.timestamp());
-
-        if (pager.queryPager.isExhausted())
-            dropPager();
-        else
-            msg.result.metadata.setHasMorePages();
-
-        return msg;
-    }
-
-    // Groups the actual query pager with the Select Query
-    private static class Pager
-    {
-        private final QueryPager queryPager;
-        private final SelectStatement statement;
-        private final List<ByteBuffer> variables;
-
-        private Pager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
-        {
-            this.queryPager = queryPager;
-            this.statement = statement;
-            this.variables = variables;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 49a2c1e..e69a58f 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -127,6 +127,12 @@ abstract class AbstractQueryPager implements QueryPager
         return result;
     }
 
+    protected void restoreState(int remaining, boolean lastWasRecorded)
+    {
+        this.remaining = remaining;
+        this.lastWasRecorded = lastWasRecorded;
+    }
+
     public boolean isExhausted()
     {
         return exhausted || remaining == 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index ef82535..b9870dc 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -47,7 +47,12 @@ class MultiPartitionPager implements QueryPager
 
     private volatile int current;
 
-    MultiPartitionPager(List<ReadCommand> commands, final ConsistencyLevel consistencyLevel, final boolean localQuery)
+    MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        this(commands, consistencyLevel, localQuery, null);
+    }
+
+    MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
     {
         this.pagers = new SinglePartitionPager[commands.size()];
 
@@ -60,13 +65,22 @@ class MultiPartitionPager implements QueryPager
             else if (tstamp != command.timestamp)
                 throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
 
+            PagingState tmpState = state != null && command.key.equals(state.partitionKey) ? state : null;
             pagers[i] = command instanceof SliceFromReadCommand
-                      ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery)
-                      : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery);
+                      ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery, tmpState)
+                      : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery, tmpState);
         }
         timestamp = tstamp;
     }
 
+    public PagingState state()
+    {
+        PagingState state = pagers[current].state();
+        return state == null
+             ? null
+             : new PagingState(state.partitionKey, state.cellName, maxRemaining());
+    }
+
     public boolean isExhausted()
     {
         while (current < pagers.length)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
index 0ac7079..ede1e91 100644
--- a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
@@ -55,12 +55,22 @@ public class NamesQueryPager implements SinglePartitionPager
         this.localQuery = localQuery;
     }
 
+    NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+    {
+        this(command, consistencyLevel, localQuery);
+    }
+
     public ColumnCounter columnCounter()
     {
         // We know NamesQueryFilter.columnCounter don't care about his argument
         return command.filter.columnCounter(null, command.timestamp);
     }
 
+    public PagingState state()
+    {
+        return null;
+    }
+
     public boolean isExhausted()
     {
         return queried;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
new file mode 100644
index 0000000..1c542a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.util.ByteBufferOutputStream;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class PagingState
+{
+    public final ByteBuffer partitionKey;
+    public final ByteBuffer cellName;
+    public final int remaining;
+
+    public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining)
+    {
+        this.partitionKey = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
+        this.cellName = cellName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : cellName;
+        this.remaining = remaining;
+    }
+
+    public static PagingState deserialize(ByteBuffer bytes)
+    {
+        if (bytes == null)
+            return null;
+
+        try
+        {
+            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
+            ByteBuffer pk = ByteBufferUtil.readWithShortLength(in);
+            ByteBuffer cn = ByteBufferUtil.readWithShortLength(in);
+            int remaining = in.readInt();
+            return new PagingState(pk, cn, remaining);
+        }
+        catch (IOException e)
+        {
+            throw new ProtocolException("Invalid value for the paging state");
+        }
+    }
+
+    public ByteBuffer serialize()
+    {
+        try
+        {
+            ByteBuffer result = ByteBuffer.allocate(serializedSize());
+            DataOutput out = new DataOutputStream(new ByteBufferOutputStream(result));
+            ByteBufferUtil.writeWithShortLength(partitionKey, out);
+            ByteBufferUtil.writeWithShortLength(cellName, out);
+            out.writeInt(remaining);
+            return result;
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private int serializedSize()
+    {
+        return 2 + partitionKey.remaining()
+             + 2 + cellName.remaining()
+             + 4;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index a390859..218ade3 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -69,7 +69,11 @@ public interface QueryPager
     public int maxRemaining();
 
     /**
-     * The timestamp used by the last page.
+     * Get the current state of the pager. The state can allow to restart the
+     * paging on another host from where we are at this point.
+     *
+     * @return the current paging state. Will return null if paging is at the
+     * beginning. If the pager is exhausted, the result is undefined.
      */
-    public long timestamp();
+    public PagingState state();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 2920317..5b97461 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -77,47 +77,52 @@ public class QueryPagers
         }
     }
 
-    private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean local)
+    private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean local, PagingState state)
     {
         if (command instanceof SliceByNamesReadCommand)
-            return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local);
+            return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local, state);
         else
-            return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local);
+            return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local, state);
     }
 
-    private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, boolean local)
+    private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, boolean local, PagingState state)
     {
         if (command instanceof Pageable.ReadCommands)
         {
             List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
             if (commands.size() == 1)
-                return pager(commands.get(0), consistencyLevel, local);
+                return pager(commands.get(0), consistencyLevel, local, state);
 
-            return new MultiPartitionPager(commands, consistencyLevel, local);
+            return new MultiPartitionPager(commands, consistencyLevel, local, state);
         }
         else if (command instanceof ReadCommand)
         {
-            return pager((ReadCommand)command, consistencyLevel, local);
+            return pager((ReadCommand)command, consistencyLevel, local, state);
         }
         else
         {
             assert command instanceof RangeSliceCommand;
             RangeSliceCommand rangeCommand = (RangeSliceCommand)command;
             if (rangeCommand.predicate instanceof NamesQueryFilter)
-                return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local);
+                return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local, state);
             else
-                return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local);
+                return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local, state);
         }
     }
 
     public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel)
     {
-        return pager(command, consistencyLevel, false);
+        return pager(command, consistencyLevel, false, null);
+    }
+
+    public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, PagingState state)
+    {
+        return pager(command, consistencyLevel, false, state);
     }
 
     public static QueryPager localPager(Pageable command)
     {
-        return pager(command, null, true);
+        return pager(command, null, true, null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
index e4d4295..7d222ed 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
 
 /**
  * Pages a RangeSliceCommand whose predicate is a name query.
@@ -38,7 +39,7 @@ import org.apache.cassandra.service.StorageProxy;
 public class RangeNamesQueryPager extends AbstractQueryPager
 {
     private final RangeSliceCommand command;
-    private volatile RowPosition lastReturnedKey;
+    private volatile DecoratedKey lastReturnedKey;
 
     // Don't use directly, use QueryPagers method instead
     RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
@@ -48,6 +49,24 @@ public class RangeNamesQueryPager extends AbstractQueryPager
         assert columnFilter instanceof NamesQueryFilter && ((NamesQueryFilter)columnFilter).countCQL3Rows();
     }
 
+    RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+    {
+        this(command, consistencyLevel, localQuery);
+
+        if (state != null)
+        {
+            lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey);
+            restoreState(state.remaining, true);
+        }
+    }
+
+    public PagingState state()
+    {
+        return lastReturnedKey == null
+             ? null
+             : new PagingState(lastReturnedKey.key, null, maxRemaining());
+    }
+
     protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
     throws RequestExecutionException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 578d5c9..a316ef8 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
 
 /**
  * Pages a RangeSliceCommand whose predicate is a slice query.
@@ -36,7 +37,7 @@ import org.apache.cassandra.service.StorageProxy;
 public class RangeSliceQueryPager extends AbstractQueryPager
 {
     private final RangeSliceCommand command;
-    private volatile RowPosition lastReturnedKey;
+    private volatile DecoratedKey lastReturnedKey;
     private volatile ByteBuffer lastReturnedName;
 
     // Don't use directly, use QueryPagers method instead
@@ -47,6 +48,25 @@ public class RangeSliceQueryPager extends AbstractQueryPager
         assert columnFilter instanceof SliceQueryFilter;
     }
 
+    RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+    {
+        this(command, consistencyLevel, localQuery);
+
+        if (state != null)
+        {
+            lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey);
+            lastReturnedName = state.cellName;
+            restoreState(state.remaining, true);
+        }
+    }
+
+    public PagingState state()
+    {
+        return lastReturnedKey == null
+             ? null
+             : new PagingState(lastReturnedKey.key, lastReturnedName, maxRemaining());
+    }
+
     protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
     throws RequestExecutionException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
index 1be1aed..dee9190 100644
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -46,6 +46,24 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
         this.command = command;
     }
 
+    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+    {
+        this(command, consistencyLevel, localQuery);
+
+        if (state != null)
+        {
+            lastReturned = state.cellName;
+            restoreState(state.remaining, true);
+        }
+    }
+
+    public PagingState state()
+    {
+        return lastReturned == null
+             ? null
+             : new PagingState(null, lastReturned, maxRemaining());
+    }
+
     protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
     throws RequestValidationException, RequestExecutionException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 0139e1f..8c6abb1 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -2017,7 +2017,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                             ThriftConversion.fromThrift(cLevel),
                                                                             cState.getQueryState(),
                                                                             bindVariables,
-                                                                            -1).toThriftResult();
+                                                                            -1,
+                                                                            null).toThriftResult();
         }
         catch (RequestExecutionException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 013de9f..12c46dd 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -128,19 +128,6 @@ public class Client extends SimpleClient
             }
             return new QueryMessage(query, ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), pageSize);
         }
-        else if (msgType.equals("NEXT"))
-        {
-            line = line.substring(5);
-            try
-            {
-                int pageSize = Integer.parseInt(line);
-                return new NextMessage(pageSize);
-            }
-            catch (NumberFormatException e)
-            {
-                return null;
-            }
-        }
         else if (msgType.equals("PREPARE"))
         {
             String query = line.substring(8);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index eca3697..8c46f9b 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -73,8 +73,7 @@ public abstract class Message
         BATCH          (13, Direction.REQUEST,  BatchMessage.codec),
         AUTH_CHALLENGE (14, Direction.RESPONSE, AuthChallenge.codec),
         AUTH_RESPONSE  (15, Direction.REQUEST,  AuthResponse.codec),
-        AUTH_SUCCESS   (16, Direction.RESPONSE, AuthSuccess.codec),
-        NEXT           (17, Direction.REQUEST,  NextMessage.codec);
+        AUTH_SUCCESS   (16, Direction.RESPONSE, AuthSuccess.codec);
 
         public final int opcode;
         public final Direction direction;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 538258d..7ec4a00 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -86,20 +86,7 @@ public class ServerConnection extends Connection
             default:
                 throw new AssertionError();
         }
-
-        QueryState qstate = getQueryState(streamId);
-        if (qstate.hasPager())
-        {
-            if (type != Message.Type.NEXT)
-                qstate.dropPager();
-        }
-        else
-        {
-            if (type == Message.Type.NEXT)
-                throw new ProtocolException("Unexpected NEXT message, paging is not set (or is done)");
-        }
-
-        return qstate;
+        return getQueryState(streamId);
     }
 
     public void applyStateTransition(Message.Type requestType, Message.Type responseType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 8082e23..5da1b12 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.utils.MD5Digest;
@@ -43,7 +44,8 @@ public class ExecuteMessage extends Message.Request
     {
         // The order of that enum matters!!
         PAGE_SIZE,
-        SKIP_METADATA;
+        SKIP_METADATA,
+        PAGING_STATE;
 
         public static EnumSet<Flag> deserialize(int flags)
         {
@@ -81,14 +83,17 @@ public class ExecuteMessage extends Message.Request
 
             int resultPageSize = -1;
             boolean skipMetadata = false;
+            PagingState pagingState = null;
             if (version >= 2)
             {
                 EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
                 if (flags.contains(Flag.PAGE_SIZE))
                     resultPageSize = body.readInt();
                 skipMetadata = flags.contains(Flag.SKIP_METADATA);
+                if (flags.contains(Flag.PAGING_STATE))
+                    pagingState = PagingState.deserialize(CBUtil.readValue(body));
             }
-            return new ExecuteMessage(MD5Digest.wrap(id), values, consistency, resultPageSize, skipMetadata);
+            return new ExecuteMessage(MD5Digest.wrap(id), values, consistency, resultPageSize, skipMetadata, pagingState);
         }
 
         public ChannelBuffer encode(ExecuteMessage msg, int version)
@@ -105,6 +110,8 @@ public class ExecuteMessage extends Message.Request
                 flags.add(Flag.PAGE_SIZE);
             if (msg.skipMetadata)
                 flags.add(Flag.SKIP_METADATA);
+            if (msg.pagingState != null)
+                flags.add(Flag.PAGING_STATE);
 
             assert flags.isEmpty() || version >= 2;
 
@@ -115,7 +122,7 @@ public class ExecuteMessage extends Message.Request
                 if (flags.contains(Flag.PAGE_SIZE))
                     nbBuff++;
             }
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, vs);
+            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, vs + (flags.contains(Flag.PAGING_STATE) ? 1 : 0));
             builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
             builder.add(CBUtil.shortToCB(vs));
 
@@ -130,6 +137,8 @@ public class ExecuteMessage extends Message.Request
                 builder.add(CBUtil.byteToCB((byte)Flag.serialize(flags)));
                 if (flags.contains(Flag.PAGE_SIZE))
                     builder.add(CBUtil.intToCB(msg.resultPageSize));
+                if (flags.contains(Flag.PAGING_STATE))
+                    builder.addValue(msg.pagingState == null ? null : msg.pagingState.serialize());
             }
             return builder.build();
         }
@@ -140,13 +149,14 @@ public class ExecuteMessage extends Message.Request
     public final ConsistencyLevel consistency;
     public final int resultPageSize;
     public final boolean skipMetadata;
+    public final PagingState pagingState;
 
     public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
     {
-        this(MD5Digest.wrap(statementId), values, consistency, resultPageSize, false);
+        this(MD5Digest.wrap(statementId), values, consistency, resultPageSize, false, null);
     }
 
-    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize, boolean skipMetadata)
+    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize, boolean skipMetadata, PagingState pagingState)
     {
         super(Message.Type.EXECUTE);
         this.statementId = statementId;
@@ -154,6 +164,7 @@ public class ExecuteMessage extends Message.Request
         this.consistency = consistency;
         this.resultPageSize = resultPageSize;
         this.skipMetadata = skipMetadata;
+        this.pagingState = pagingState;
     }
 
     public ChannelBuffer encode()
@@ -192,7 +203,7 @@ public class ExecuteMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 prepared query", builder.build());
             }
 
-            Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values, resultPageSize);
+            Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values, resultPageSize, pagingState);
 
             if (tracingId != null)
                 response.setTracingId(tracingId);
@@ -206,9 +217,6 @@ public class ExecuteMessage extends Message.Request
         finally
         {
             Tracing.instance.stopSession();
-            // Trash the current session id if we won't need it anymore
-            if (!state.hasPager())
-                state.getAndResetCurrentTracingSession();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/transport/messages/NextMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/NextMessage.java b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
deleted file mode 100644
index d68f603..0000000
--- a/src/java/org/apache/cassandra/transport/messages/NextMessage.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.transport.messages;
-
-import java.util.UUID;
-
-import com.google.common.collect.ImmutableMap;
-import org.jboss.netty.buffer.ChannelBuffer;
-
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
-import org.apache.cassandra.utils.UUIDGen;
-
-public class NextMessage extends Message.Request
-{
-    public static final Message.Codec<NextMessage> codec = new Message.Codec<NextMessage>()
-    {
-        public NextMessage decode(ChannelBuffer body, int version)
-        {
-            int resultPageSize = body.readInt();
-            return new NextMessage(resultPageSize);
-        }
-
-        public ChannelBuffer encode(NextMessage msg, int version)
-        {
-            return CBUtil.intToCB(msg.resultPageSize);
-        }
-    };
-
-    public final int resultPageSize;
-
-    public NextMessage(int resultPageSize)
-    {
-        super(Type.NEXT);
-        this.resultPageSize = resultPageSize;
-    }
-
-    public ChannelBuffer encode()
-    {
-        return codec.encode(this, getVersion());
-    }
-
-    public Message.Response execute(QueryState state)
-    {
-        try
-        {
-            if (resultPageSize == 0)
-                throw new ProtocolException("The page size cannot be 0");
-
-            /*
-             * If we had traced the previous page and we are asked to trace this one,
-             * record the previous id to allow linking the trace together.
-             */
-            UUID previousTracingId = state.getAndResetCurrentTracingSession();
-
-            UUID tracingId = null;
-            if (isTracingRequested())
-            {
-                tracingId = UUIDGen.getTimeUUID();
-                state.prepareTracingSession(tracingId);
-            }
-
-            if (state.traceNextQuery())
-            {
-                state.createTracingSession();
-                ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-                if (resultPageSize > 0)
-                    builder.put("page_size", Integer.toString(resultPageSize));
-                if (previousTracingId != null)
-                    builder.put("previous_trace", previousTracingId.toString());
-                Tracing.instance.begin("Continue paged CQL3 query", builder.build());
-            }
-
-            Message.Response response = state.getNextPage(resultPageSize < 0 ? Integer.MAX_VALUE : resultPageSize);
-
-            if (tracingId != null)
-                response.setTracingId(tracingId);
-
-            return response;
-        }
-        catch (Exception e)
-        {
-            if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException)))
-                logger.error("Unexpected error during query", e);
-            return ErrorMessage.fromException(e);
-        }
-        finally
-        {
-            Tracing.instance.stopSession();
-            // Trash the current session id if we won't need it anymore
-            if (!state.hasPager())
-                state.getAndResetCurrentTracingSession();
-        }
-    }
-
-    @Override
-    public String toString()
-    {
-        return "NEXT " + resultPageSize;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b068a9c4/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index c3f9d4c..dae08fe 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.utils.UUIDGen;
@@ -45,7 +46,8 @@ public class QueryMessage extends Message.Request
         // The order of that enum matters!!
         PAGE_SIZE,
         VALUES,
-        SKIP_METADATA;
+        SKIP_METADATA,
+        PAGING_STATE;
 
         public static EnumSet<Flag> deserialize(int flags)
         {
@@ -78,6 +80,7 @@ public class QueryMessage extends Message.Request
             int resultPageSize = -1;
             List<ByteBuffer> values = Collections.emptyList();
             boolean skipMetadata = false;
+            PagingState pagingState = null;
             if (version >= 2)
             {
                 EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
@@ -94,8 +97,11 @@ public class QueryMessage extends Message.Request
                 }
 
                 skipMetadata = flags.contains(Flag.SKIP_METADATA);
+
+                if (flags.contains(Flag.PAGING_STATE))
+                    pagingState = PagingState.deserialize(CBUtil.readValue(body));
             }
-            return new QueryMessage(query, consistency, values, resultPageSize, skipMetadata);
+            return new QueryMessage(query, consistency, values, resultPageSize, skipMetadata, pagingState);
         }
 
         public ChannelBuffer encode(QueryMessage msg, int version)
@@ -115,6 +121,8 @@ public class QueryMessage extends Message.Request
                 flags.add(Flag.VALUES);
             if (msg.skipMetadata)
                 flags.add(Flag.SKIP_METADATA);
+            if (msg.pagingState != null)
+                flags.add(Flag.PAGING_STATE);
 
             assert flags.isEmpty() || version >= 2 : "Version 1 of the protocol supports no option after the consistency level";
 
@@ -127,7 +135,7 @@ public class QueryMessage extends Message.Request
                 if (flags.contains(Flag.VALUES))
                     nbBuff++;
             }
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, vs);
+            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, vs + (flags.contains(Flag.PAGING_STATE) ? 1 : 0));
             builder.add(CBUtil.longStringToCB(msg.query));
             builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
             if (version >= 2)
@@ -141,6 +149,8 @@ public class QueryMessage extends Message.Request
                     for (ByteBuffer value : msg.values)
                         builder.addValue(value);
                 }
+                if (flags.contains(Flag.PAGING_STATE))
+                    builder.addValue(msg.pagingState == null ? null : msg.pagingState.serialize());
             }
             return builder.build();
         }
@@ -151,6 +161,7 @@ public class QueryMessage extends Message.Request
     public final int resultPageSize;
     public final List<ByteBuffer> values;
     public final boolean skipMetadata;
+    public final PagingState pagingState;
 
     public QueryMessage(String query, ConsistencyLevel consistency)
     {
@@ -159,10 +170,10 @@ public class QueryMessage extends Message.Request
 
     public QueryMessage(String query, ConsistencyLevel consistency, List<ByteBuffer> values, int resultPageSize)
     {
-        this(query, consistency, values, resultPageSize, false);
+        this(query, consistency, values, resultPageSize, false, null);
     }
 
-    public QueryMessage(String query, ConsistencyLevel consistency, List<ByteBuffer> values, int resultPageSize, boolean skipMetadata)
+    public QueryMessage(String query, ConsistencyLevel consistency, List<ByteBuffer> values, int resultPageSize, boolean skipMetadata, PagingState pagingState)
     {
         super(Type.QUERY);
         this.query = query;
@@ -170,6 +181,7 @@ public class QueryMessage extends Message.Request
         this.resultPageSize = resultPageSize;
         this.values = values;
         this.skipMetadata = skipMetadata;
+        this.pagingState = pagingState;
     }
 
     public ChannelBuffer encode()
@@ -203,7 +215,7 @@ public class QueryMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 query", builder.build());
             }
 
-            Message.Response response = QueryProcessor.process(query, values, consistency, state, resultPageSize);
+            Message.Response response = QueryProcessor.process(query, values, consistency, state, resultPageSize, pagingState);
 
             if (tracingId != null)
                 response.setTracingId(tracingId);
@@ -219,9 +231,6 @@ public class QueryMessage extends Message.Request
         finally
         {
             Tracing.instance.stopSession();
-            // Trash the current session id if we won't need it anymore
-            if (!state.hasPager())
-                state.getAndResetCurrentTracingSession();
         }
     }
 


Mime
View raw message