cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject cassandra git commit: Allow specifying now-in-seconds in native protocol
Date Thu, 30 Aug 2018 15:55:53 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 2e59ea8c7 -> f8d34d356


Allow specifying now-in-seconds in native protocol

patch by Aleksey Yeschenko; reviewed by Chris Lohfink for
CASSANDRA-14664


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8d34d35
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8d34d35
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8d34d35

Branch: refs/heads/trunk
Commit: f8d34d35646fceb76d6f747b681fe0108d7845d9
Parents: 2e59ea8
Author: Aleksey Yeshchenko <aleksey@apple.com>
Authored: Fri Aug 10 16:23:26 2018 +0100
Committer: Aleksey Yeshchenko <aleksey@apple.com>
Committed: Thu Aug 30 16:54:32 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 doc/native_protocol_v5.spec                     | 135 ++++++------
 .../cassandra/cql3/BatchQueryOptions.java       |   5 +
 .../org/apache/cassandra/cql3/QueryOptions.java |  78 +++++--
 .../apache/cassandra/cql3/UpdateParameters.java |   2 +-
 .../cql3/statements/BatchStatement.java         |   3 +-
 .../cql3/statements/ModificationStatement.java  |  37 ++--
 .../cql3/statements/SelectStatement.java        |   4 +-
 .../apache/cassandra/service/StorageProxy.java  |   3 +-
 .../cassandra/transport/ProtocolVersion.java    |   9 +
 .../cassandra/cql3/CustomNowInSecondsTest.java  | 211 +++++++++++++++++++
 .../org/apache/cassandra/cql3/ListsTest.java    |   2 +-
 .../cassandra/transport/SerDeserTest.java       |  74 ++++---
 13 files changed, 440 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fb84a2..d95b9ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664)
  * Improve BTree build performance by avoiding data copy (CASSANDRA-9989)
  * Make monotonic read / read repair configurable (CASSANDRA-14635)
  * Refactor CompactionStrategyManager (CASSANDRA-14621)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/doc/native_protocol_v5.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec
index feea021..8091775 100644
--- a/doc/native_protocol_v5.spec
+++ b/doc/native_protocol_v5.spec
@@ -332,52 +332,57 @@ Table of Contents
     <query><query_parameters>
   where <query> is a [long string] representing the query and
   <query_parameters> must be
-    <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>][<keyspace>]
+    <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>][<keyspace>][<now_in_seconds>]
   where:
     - <consistency> is the [consistency] level for the operation.
     - <flags> is a [int] whose bits define the options for this query and
       in particular influence what the remainder of the message contains.
       A flag is set if the bit corresponding to its `mask` is set. Supported
       flags are, given their mask:
-        0x01: Values. If set, a [short] <n> followed by <n> [value]
-              values are provided. Those values are used for bound variables in
-              the query. Optionally, if the 0x40 flag is present, each value
-              will be preceded by a [string] name, representing the name of
-              the marker the value must be bound to.
-        0x02: Skip_metadata. If set, the Result Set returned as a response
-              to the query (if any) will have the NO_METADATA flag (see
-              Section 4.2.5.2).
-        0x04: Page_size. If set, <result_page_size> is an [int]
-              controlling the desired page size of the result (in CQL3 rows).
-              See the section on paging (Section 8) for more details.
-        0x08: With_paging_state. If set, <paging_state> should be present.
-              <paging_state> is a [bytes] value that should have been returned
-              in a result set (Section 4.2.5.2). The query will be
-              executed but starting from a given paging state. This is also to
-              continue paging on a different node than the one where it
-              started (See Section 8 for more details).
-        0x10: With serial consistency. If set, <serial_consistency> should be
-              present. <serial_consistency> is the [consistency] level for the
-              serial phase of conditional updates. That consitency can only be
-              either SERIAL or LOCAL_SERIAL and if not present, it defaults to
-              SERIAL. This option will be ignored for anything else other than a
-              conditional update/insert.
-        0x20: With default timestamp. If set, <timestamp> should be present.
-              <timestamp> is a [long] representing the default timestamp for the query
-              in microseconds (negative values are forbidden). This will
-              replace the server side assigned timestamp as default timestamp.
-              Note that a timestamp in the query itself will still override
-              this timestamp. This is entirely optional.
-        0x40: With names for values. This only makes sense if the 0x01 flag is set and
-              is ignored otherwise. If present, the values from the 0x01 flag will
-              be preceded by a name (see above). Note that this is only useful for
-              QUERY requests where named bind markers are used; for EXECUTE statements,
-              since the names for the expected values was returned during preparation,
-              a client can always provide values in the right order without any names
-              and using this flag, while supported, is almost surely inefficient.
-        0x80: With keyspace. If set, <keyspace> must be present. <keyspace> is a
-              [string] indicating the keyspace that the query should be executed in.
-              It supercedes the keyspace that the connection is bound to, if any.
+        0x0001: Values. If set, a [short] <n> followed by <n> [value]
+                values are provided. Those values are used for bound variables in
+                the query. Optionally, if the 0x40 flag is present, each value
+                will be preceded by a [string] name, representing the name of
+                the marker the value must be bound to.
+        0x0002: Skip_metadata. If set, the Result Set returned as a response
+                to the query (if any) will have the NO_METADATA flag (see
+                Section 4.2.5.2).
+        0x0004: Page_size. If set, <result_page_size> is an [int]
+                controlling the desired page size of the result (in CQL3 rows).
+                See the section on paging (Section 8) for more details.
+        0x0008: With_paging_state. If set, <paging_state> should be present.
+                <paging_state> is a [bytes] value that should have been returned
+                in a result set (Section 4.2.5.2). The query will be
+                executed but starting from a given paging state. This is also to
+                continue paging on a different node than the one where it
+                started (See Section 8 for more details).
+        0x0010: With serial consistency. If set, <serial_consistency> should be
+                present. <serial_consistency> is the [consistency] level for the
+                serial phase of conditional updates. That consitency can only be
+                either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+                SERIAL. This option will be ignored for anything else other than a
+                conditional update/insert.
+        0x0020: With default timestamp. If set, <timestamp> must be present.
+                <timestamp> is a [long] representing the default timestamp for the query
+                in microseconds (negative values are forbidden). This will
+                replace the server side assigned timestamp as default timestamp.
+                Note that a timestamp in the query itself will still override
+                this timestamp. This is entirely optional.
+        0x0040: With names for values. This only makes sense if the 0x01 flag is set and
+                is ignored otherwise. If present, the values from the 0x01 flag will
+                be preceded by a name (see above). Note that this is only useful for
+                QUERY requests where named bind markers are used; for EXECUTE statements,
+                since the names for the expected values was returned during preparation,
+                a client can always provide values in the right order without any names
+                and using this flag, while supported, is almost surely inefficient.
+        0x0080: With keyspace. If set, <keyspace> must be present. <keyspace> is a
+                [string] indicating the keyspace that the query should be executed in.
+                It supercedes the keyspace that the connection is bound to, if any.
+        0x0100: With now in seconds. If set, <now_in_seconds> must be present.
+                <now_in_seconds> is an [int] representing the current time (now) for
+                the query. Affects TTL cell liveness in read queries and local deletion
+                time for tombstones and TTL cells in update requests. It's intended
+                for testing purposes and is optional.
 
   Note that the consistency is ignored by some queries (USE, CREATE, ALTER,
   TRUNCATE, ...).
@@ -423,7 +428,7 @@ Table of Contents
   Allows executing a list of queries (prepared or not) as a batch (note that
   only DML statements are accepted in a batch). The body of the message must
   be:
-    <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>][<keyspace>]
+    <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>][<keyspace>][<now_in_seconds>]
   where:
     - <type> is a [byte] indicating the type of batch to use:
         - If <type> == 0, the batch will be "logged". This is equivalent to a
@@ -437,27 +442,32 @@ Table of Contents
       bits must always be 0 as their corresponding options do not make sense for
       Batch. A flag is set if the bit corresponding to its `mask` is set. Supported
       flags are, given their mask:
-        0x10: With serial consistency. If set, <serial_consistency> should be
-              present. <serial_consistency> is the [consistency] level for the
-              serial phase of conditional updates. That consistency can only be
-              either SERIAL or LOCAL_SERIAL and if not present, it defaults to
-              SERIAL. This option will be ignored for anything else other than a
-              conditional update/insert.
-        0x20: With default timestamp. If set, <timestamp> should be present.
-              <timestamp> is a [long] representing the default timestamp for the query
-              in microseconds. This will replace the server side assigned
-              timestamp as default timestamp. Note that a timestamp in the query itself
-              will still override this timestamp. This is entirely optional.
-        0x40: With names for values. If set, then all values for all <query_i> must be
-              preceded by a [string] <name_i> that have the same meaning as in QUERY
-              requests [IMPORTANT NOTE: this feature does not work and should not be
-              used. It is specified in a way that makes it impossible for the server
-              to implement. This will be fixed in a future version of the native
-              protocol. See https://issues.apache.org/jira/browse/CASSANDRA-10246 for
-              more details].
-        0x80: With keyspace. If set, <keyspace> must be present. <keyspace> is a
-              [string] indicating the keyspace that the query should be executed in.
-              It supercedes the keyspace that the connection is bound to, if any.
+        0x0010: With serial consistency. If set, <serial_consistency> should be
+                present. <serial_consistency> is the [consistency] level for the
+                serial phase of conditional updates. That consistency can only be
+                either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+                SERIAL. This option will be ignored for anything else other than a
+                conditional update/insert.
+        0x0020: With default timestamp. If set, <timestamp> should be present.
+                <timestamp> is a [long] representing the default timestamp for the query
+                in microseconds. This will replace the server side assigned
+                timestamp as default timestamp. Note that a timestamp in the query itself
+                will still override this timestamp. This is entirely optional.
+        0x0040: With names for values. If set, then all values for all <query_i> must be
+                preceded by a [string] <name_i> that have the same meaning as in QUERY
+                requests [IMPORTANT NOTE: this feature does not work and should not be
+                used. It is specified in a way that makes it impossible for the server
+                to implement. This will be fixed in a future version of the native
+                protocol. See https://issues.apache.org/jira/browse/CASSANDRA-10246 for
+                more details].
+        0x0080: With keyspace. If set, <keyspace> must be present. <keyspace> is a
+                [string] indicating the keyspace that the query should be executed in.
+                It supercedes the keyspace that the connection is bound to, if any.
+        0x0100: With now in seconds. If set, <now_in_seconds> must be present.
+                <now_in_seconds> is an [int] representing the current time (now) for
+                the query. Affects TTL cell liveness in read queries and local deletion
+                time for tombstones and TTL cells in update requests. It's intended
+                for testing purposes and is optional.
     - <n> is a [short] indicating the number of following queries.
     - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the
       form:
@@ -1252,4 +1262,5 @@ Table of Contents
     (Sections 4.1.4, 4.1.6 and 4.1.7).
   * Add the duration data type
   * Added keyspace field in QUERY, PREPARE, and BATCH messages (Sections 4.1.4, 4.1.5, and 4.1.7).
+  * Added now_in_seconds field in QUERY, EXECUTE, and BATCH messages (Sections 4.1.4, 4.1.6, and 4.1.7).
   * Added [int] flags field in PREPARE message (Section 4.1.5).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index 59c35a5..ac0d148 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -84,6 +84,11 @@ public abstract class BatchQueryOptions
         return wrapped.getTimestamp(state);
     }
 
+    public int getNowInSeconds()
+    {
+        return wrapped.getNowInSeconds();
+    }
+
     private static class WithoutPerStatementVariables extends BatchQueryOptions
     {
         private WithoutPerStatementVariables(QueryOptions wrapped, List<Object> queryOrIdList)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 49ab9b4..e546304 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.transport.CBCodec;
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
@@ -69,9 +70,34 @@ public abstract class QueryOptions
         return new DefaultQueryOptions(null, null, true, null, protocolVersion);
     }
 
-    public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency, ProtocolVersion version, String keyspace)
+    public static QueryOptions create(ConsistencyLevel consistency,
+                                      List<ByteBuffer> values,
+                                      boolean skipMetadata,
+                                      int pageSize,
+                                      PagingState pagingState,
+                                      ConsistencyLevel serialConsistency,
+                                      ProtocolVersion version,
+                                      String keyspace)
     {
-        return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L, keyspace), version);
+        return create(consistency, values, skipMetadata, pageSize, pagingState, serialConsistency, version, keyspace, Long.MIN_VALUE, Integer.MIN_VALUE);
+    }
+
+    public static QueryOptions create(ConsistencyLevel consistency,
+                                      List<ByteBuffer> values,
+                                      boolean skipMetadata,
+                                      int pageSize,
+                                      PagingState pagingState,
+                                      ConsistencyLevel serialConsistency,
+                                      ProtocolVersion version,
+                                      String keyspace,
+                                      long timestamp,
+                                      int nowInSeconds)
+    {
+        return new DefaultQueryOptions(consistency,
+                                       values,
+                                       skipMetadata,
+                                       new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace, nowInSeconds),
+                                       version);
     }
 
     public static QueryOptions addColumnSpecifications(QueryOptions options, List<ColumnSpecification> columnSpecs)
@@ -174,6 +200,12 @@ public abstract class QueryOptions
         return tstamp != Long.MIN_VALUE ? tstamp : state.getTimestamp();
     }
 
+    public int getNowInSeconds()
+    {
+        int nowInSeconds = getSpecificOptions().nowInSeconds;
+        return Integer.MIN_VALUE == nowInSeconds ? FBUtilities.nowInSeconds() : nowInSeconds;
+    }
+
     /** The keyspace that this query is bound to, or null if not relevant. */
     public String getKeyspace() { return getSpecificOptions().keyspace; }
 
@@ -346,21 +378,28 @@ public abstract class QueryOptions
     // Options that are likely to not be present in most queries
     static class SpecificOptions
     {
-        private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE, null);
+        private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE, null, Integer.MIN_VALUE);
 
         private final int pageSize;
         private final PagingState state;
         private final ConsistencyLevel serialConsistency;
         private final long timestamp;
         private final String keyspace;
-
-        private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp, String keyspace)
+        private final int nowInSeconds;
+
+        private SpecificOptions(int pageSize,
+                                PagingState state,
+                                ConsistencyLevel serialConsistency,
+                                long timestamp,
+                                String keyspace,
+                                int nowInSeconds)
         {
             this.pageSize = pageSize;
             this.state = state;
             this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency;
             this.timestamp = timestamp;
             this.keyspace = keyspace;
+            this.nowInSeconds = nowInSeconds;
         }
     }
 
@@ -376,7 +415,8 @@ public abstract class QueryOptions
             SERIAL_CONSISTENCY,
             TIMESTAMP,
             NAMES_FOR_VALUES,
-            KEYSPACE;
+            KEYSPACE,
+            NOW_IN_SECONDS;
 
             private static final Flag[] ALL_VALUES = values();
 
@@ -442,8 +482,10 @@ public abstract class QueryOptions
                     timestamp = ts;
                 }
                 String keyspace = flags.contains(Flag.KEYSPACE) ? CBUtil.readString(body) : null;
-                options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace);
+                int nowInSeconds = flags.contains(Flag.NOW_IN_SECONDS) ? body.readInt() : Integer.MIN_VALUE;
+                options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace, nowInSeconds);
             }
+
             DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
             return names == null ? opts : new OptionsWithNames(opts, names);
         }
@@ -452,7 +494,7 @@ public abstract class QueryOptions
         {
             CBUtil.writeConsistencyLevel(options.getConsistency(), dest);
 
-            EnumSet<Flag> flags = gatherFlags(options);
+            EnumSet<Flag> flags = gatherFlags(options, version);
             if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
                 dest.writeInt(Flag.serialize(flags));
             else
@@ -470,6 +512,8 @@ public abstract class QueryOptions
                 dest.writeLong(options.getSpecificOptions().timestamp);
             if (flags.contains(Flag.KEYSPACE))
                 CBUtil.writeString(options.getSpecificOptions().keyspace, dest);
+            if (flags.contains(Flag.NOW_IN_SECONDS))
+                dest.writeInt(options.getSpecificOptions().nowInSeconds);
 
             // Note that we don't really have to bother with NAMES_FOR_VALUES server side,
             // and in fact we never really encode QueryOptions, only decode them, so we
@@ -482,7 +526,7 @@ public abstract class QueryOptions
 
             size += CBUtil.sizeOfConsistencyLevel(options.getConsistency());
 
-            EnumSet<Flag> flags = gatherFlags(options);
+            EnumSet<Flag> flags = gatherFlags(options, version);
             size += (version.isGreaterOrEqualTo(ProtocolVersion.V5) ? 4 : 1);
 
             if (flags.contains(Flag.VALUES))
@@ -497,10 +541,13 @@ public abstract class QueryOptions
                 size += 8;
             if (flags.contains(Flag.KEYSPACE))
                 size += CBUtil.sizeOfString(options.getSpecificOptions().keyspace);
+            if (flags.contains(Flag.NOW_IN_SECONDS))
+                size += 4;
+
             return size;
         }
 
-        private EnumSet<Flag> gatherFlags(QueryOptions options)
+        private EnumSet<Flag> gatherFlags(QueryOptions options, ProtocolVersion version)
         {
             EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
             if (options.getValues().size() > 0)
@@ -515,8 +562,15 @@ public abstract class QueryOptions
                 flags.add(Flag.SERIAL_CONSISTENCY);
             if (options.getSpecificOptions().timestamp != Long.MIN_VALUE)
                 flags.add(Flag.TIMESTAMP);
-            if (options.getSpecificOptions().keyspace != null)
-                flags.add(Flag.KEYSPACE);
+
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+            {
+                if (options.getSpecificOptions().keyspace != null)
+                    flags.add(Flag.KEYSPACE);
+                if (options.getSpecificOptions().nowInSeconds != Integer.MIN_VALUE)
+                    flags.add(Flag.NOW_IN_SECONDS);
+            }
+
             return flags;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 9d6f2e9..500862e 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -66,7 +66,7 @@ public class UpdateParameters
         this.updatedColumns = updatedColumns;
         this.options = options;
 
-        this.nowInSec = FBUtilities.nowInSeconds();
+        this.nowInSec = options.getNowInSeconds();
         this.timestamp = timestamp;
         this.ttl = ttl;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 9ed150c..089c532 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -443,6 +443,7 @@ public class BatchStatement implements CQLStatement
                                                    options.getSerialConsistency(),
                                                    options.getConsistency(),
                                                    state.getClientState(),
+                                                   options.getNowInSeconds(),
                                                    queryStartNanoTime))
         {
 
@@ -551,7 +552,7 @@ public class BatchStatement implements CQLStatement
         String ksName = request.metadata.keyspace;
         String tableName = request.metadata.name;
 
-        try (RowIterator result = ModificationStatement.casInternal(request, state))
+        try (RowIterator result = ModificationStatement.casInternal(request, state, options.getNowInSeconds()))
         {
             return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0)));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index c388c48..5f3d07f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -375,6 +375,7 @@ public abstract class ModificationStatement implements CQLStatement
                                                            DataLimits limits,
                                                            boolean local,
                                                            ConsistencyLevel cl,
+                                                           int nowInSeconds,
                                                            long queryStartNanoTime)
     {
         if (!requiresRead())
@@ -390,10 +391,9 @@ public abstract class ModificationStatement implements CQLStatement
         }
 
         List<SinglePartitionReadCommand> commands = new ArrayList<>(partitionKeys.size());
-        int nowInSec = FBUtilities.nowInSeconds();
         for (ByteBuffer key : partitionKeys)
             commands.add(SinglePartitionReadCommand.create(metadata(),
-                                                           nowInSec,
+                                                           nowInSeconds,
                                                            ColumnFilter.selection(this.requiresRead),
                                                            RowFilter.NONE,
                                                            limits,
@@ -484,6 +484,7 @@ public abstract class ModificationStatement implements CQLStatement
                                                    options.getSerialConsistency(),
                                                    options.getConsistency(),
                                                    queryState.getClientState(),
+                                                   options.getNowInSeconds(),
                                                    queryStartNanoTime))
         {
             return new ResultMessage.Rows(buildCasResultSet(result, options));
@@ -600,7 +601,7 @@ public abstract class ModificationStatement implements CQLStatement
         SelectStatement.forSelection(metadata, selection).processPartition(partition,
                                                                       options,
                                                                       builder,
-                                                                      FBUtilities.nowInSeconds());
+                                                                      options.getNowInSeconds());
 
         return builder.build();
     }
@@ -622,17 +623,17 @@ public abstract class ModificationStatement implements CQLStatement
     public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
         CQL3CasRequest request = makeCasRequest(state, options);
-        try (RowIterator result = casInternal(request, state))
+        try (RowIterator result = casInternal(request, state, options.getNowInSeconds()))
         {
             return new ResultMessage.Rows(buildCasResultSet(result, options));
         }
     }
 
-    static RowIterator casInternal(CQL3CasRequest request, QueryState state)
+    static RowIterator casInternal(CQL3CasRequest request, QueryState state, int nowInSeconds)
     {
         UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
 
-        SinglePartitionReadQuery readCommand = request.readCommand(FBUtilities.nowInSeconds());
+        SinglePartitionReadQuery readCommand = request.readCommand(nowInSeconds);
         FilteredPartition current;
         try (ReadExecutionController executionController = readCommand.executionController();
              PartitionIterator iter = readCommand.executeInternal(executionController))
@@ -656,21 +657,21 @@ public abstract class ModificationStatement implements CQLStatement
      *
      * @param options value for prepared statement markers
      * @param local if true, any requests (for collections) performed by getMutation should be done locally only.
-     * @param now the current timestamp in microseconds to use if no timestamp is user provided.
+     * @param timestamp the current timestamp in microseconds to use if no timestamp is user provided.
      *
      * @return list of the mutations
      */
-    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime)
+    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long timestamp, long queryStartNanoTime)
     {
         UpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, 1);
-        addUpdates(collector, options, local, now, queryStartNanoTime);
+        addUpdates(collector, options, local, timestamp, queryStartNanoTime);
         return collector.toMutations();
     }
 
     final void addUpdates(UpdatesCollector collector,
                           QueryOptions options,
                           boolean local,
-                          long now,
+                          long timestamp,
                           long queryStartNanoTime)
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
@@ -688,7 +689,7 @@ public abstract class ModificationStatement implements CQLStatement
                                                            options,
                                                            DataLimits.NONE,
                                                            local,
-                                                           now,
+                                                           timestamp,
                                                            queryStartNanoTime);
             for (ByteBuffer key : keys)
             {
@@ -709,7 +710,7 @@ public abstract class ModificationStatement implements CQLStatement
             if (restrictions.hasClusteringColumnsRestrictions() && clusterings.isEmpty())
                 return;
 
-            UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now, queryStartNanoTime);
+            UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, timestamp, queryStartNanoTime);
 
             for (ByteBuffer key : keys)
             {
@@ -752,7 +753,7 @@ public abstract class ModificationStatement implements CQLStatement
                                                   NavigableSet<Clustering> clusterings,
                                                   QueryOptions options,
                                                   boolean local,
-                                                  long now,
+                                                  long timestamp,
                                                   long queryStartNanoTime)
     {
         if (clusterings.contains(Clustering.STATIC_CLUSTERING))
@@ -761,7 +762,7 @@ public abstract class ModificationStatement implements CQLStatement
                                         options,
                                         DataLimits.cqlLimits(1),
                                         local,
-                                        now,
+                                        timestamp,
                                         queryStartNanoTime);
 
         return makeUpdateParameters(keys,
@@ -769,7 +770,7 @@ public abstract class ModificationStatement implements CQLStatement
                                     options,
                                     DataLimits.NONE,
                                     local,
-                                    now,
+                                    timestamp,
                                     queryStartNanoTime);
     }
 
@@ -778,12 +779,12 @@ public abstract class ModificationStatement implements CQLStatement
                                                   QueryOptions options,
                                                   DataLimits limits,
                                                   boolean local,
-                                                  long now,
+                                                  long timestamp,
                                                   long queryStartNanoTime)
     {
         // Some lists operation requires reading
-        Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), queryStartNanoTime);
-        return new UpdateParameters(metadata(), updatedColumns(), options, getTimestamp(now, options), getTimeToLive(options), lists);
+        Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), options.getNowInSeconds(), queryStartNanoTime);
+        return new UpdateParameters(metadata(), updatedColumns(), options, getTimestamp(timestamp, options), getTimeToLive(options), lists);
     }
 
     private Slices toSlices(SortedSet<ClusteringBound> startBounds, SortedSet<ClusteringBound> endBounds)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a5105f2..61715b9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -234,7 +234,7 @@ public class SelectStatement implements CQLStatement
 
         cl.validateForRead(keyspace());
 
-        int nowInSec = FBUtilities.nowInSeconds();
+        int nowInSec = options.getNowInSeconds();
         int userLimit = getLimit(options);
         int userPerPartitionLimit = getPerPartitionLimit(options);
         int pageSize = options.getPageSize();
@@ -428,7 +428,7 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeLocally(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        return executeInternal(state, options, FBUtilities.nowInSeconds(), System.nanoTime());
+        return executeInternal(state, options, options.getNowInSeconds(), System.nanoTime());
     }
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 58f08d4..a011841 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -235,6 +235,7 @@ public class StorageProxy implements StorageProxyMBean
                                   ConsistencyLevel consistencyForPaxos,
                                   ConsistencyLevel consistencyForCommit,
                                   ClientState state,
+                                  int nowInSeconds,
                                   long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
     {
@@ -261,7 +262,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 // read the current values and check they validate the conditions
                 Tracing.trace("Reading existing values for CAS precondition");
-                SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(FBUtilities.nowInSeconds());
+                SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(nowInSeconds);
                 ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
 
                 FilteredPartition current;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/transport/ProtocolVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
index cd73c86..9a13903 100644
--- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
@@ -84,6 +84,15 @@ public enum ProtocolVersion implements Comparable<ProtocolVersion>
         return ret;
     }
 
+    public static List<ProtocolVersion> supportedVersionsStartingWith(ProtocolVersion smallestVersion)
+    {
+        ArrayList<ProtocolVersion> versions = new ArrayList<>(SUPPORTED_VERSIONS.length);
+        for (ProtocolVersion version : SUPPORTED_VERSIONS)
+            if (version.isGreaterOrEqualTo(smallestVersion))
+                versions.add(version);
+        return versions;
+    }
+
     public static ProtocolVersion decode(int versionNum)
     {
         ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && versionNum <= MAX_SUPPORTED_VERSION.num

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java b/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
new file mode 100644
index 0000000..8bb0bf4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static java.lang.String.format;
+import static org.junit.Assert.assertEquals;
+
+public class CustomNowInSecondsTest extends CQLTester
+{
+    @BeforeClass
+    public static void setUpClass()
+    {
+        prepareServer();
+        requireNetwork();
+    }
+
+    @Test
+    public void testSelectQuery()
+    {
+        testSelectQuery(false);
+        testSelectQuery(true);
+    }
+
+    private void testSelectQuery(boolean prepared)
+    {
+        int now = (int) (System.currentTimeMillis() / 1000);
+        int day = 86400;
+
+        String ks = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+        String tbl = createTable(ks, "CREATE TABLE %s (id int primary key, val int)");
+
+        // insert a row with TTL = 1 day.
+        executeModify(format("INSERT INTO %s.%s (id, val) VALUES (0, 0) USING TTL %d", ks, tbl, day), Integer.MIN_VALUE, prepared);
+
+        // execute a SELECT query without overriding nowInSeconds - make sure we observe one row.
+        assertEquals(1, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), Integer.MIN_VALUE, prepared).size());
+
+        // execute a SELECT query with nowInSeconds set to [now + 1 day + 1], when the row should have expired.
+        assertEquals(0, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + day + 1, prepared).size());
+    }
+
+    @Test
+    public void testModifyQuery()
+    {
+        testModifyQuery(false);
+        testModifyQuery(true);
+    }
+
+    private void testModifyQuery(boolean prepared)
+    {
+        int now = (int) (System.currentTimeMillis() / 1000);
+        int day = 86400;
+
+        String ks = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+        String tbl = createTable(ks, "CREATE TABLE %s (id int primary key, val int)");
+
+        // execute an INSERT query with now set to [now + 1 day], with ttl = 1, making its effective ttl = 1 day + 1.
+        executeModify(format("INSERT INTO %s.%s (id, val) VALUES (0, 0) USING TTL %d", ks, tbl, 1), now + day, prepared);
+
+        // verify that despite TTL having passed (if not for nowInSeconds override) the row is still there.
+        assertEquals(1, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + 1, prepared).size());
+
+        // jump in time by one day, make sure the row expired
+        assertEquals(0, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + day + 1, prepared).size());
+    }
+
+    @Test
+    public void testBatchQuery()
+    {
+        testBatchQuery(false);
+        testBatchQuery(true);
+    }
+
+    private void testBatchQuery(boolean prepared)
+    {
+        int now = (int) (System.currentTimeMillis() / 1000);
+        int day = 86400;
+
+        String ks = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+        String tbl = createTable(ks, "CREATE TABLE %s (id int primary key, val int)");
+
+        // execute an BATCH query with now set to [now + 1 day], with ttl = 1, making its effective ttl = 1 day + 1.
+        String batch = format("BEGIN BATCH " +
+                              "INSERT INTO %s.%s (id, val) VALUES (0, 0) USING TTL %d; " +
+                              "INSERT INTO %s.%s (id, val) VALUES (1, 1) USING TTL %d; " +
+                              "APPLY BATCH;",
+                              ks, tbl, 1,
+                              ks, tbl, 1);
+        executeModify(batch, now + day, prepared);
+
+        // verify that despite TTL having passed at now + 1 the rows are still there.
+        assertEquals(2, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + 1, prepared).size());
+
+        // jump in time by one day, make sure the row expired.
+        assertEquals(0, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + day + 1, prepared).size());
+    }
+
+    @Test
+    public void testBatchMessage()
+    {
+        // test BatchMessage path
+
+        int now = (int) (System.currentTimeMillis() / 1000);
+        int day = 86400;
+
+        String ks = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+        String tbl = createTable(ks, "CREATE TABLE %s (id int primary key, val int)");
+
+        List<String> queries = ImmutableList.of(
+            format("INSERT INTO %s.%s (id, val) VALUES (0, 0) USING TTL %d;", ks, tbl, 1),
+            format("INSERT INTO %s.%s (id, val) VALUES (1, 1) USING TTL %d;", ks, tbl, 1)
+        );
+
+        ClientState cs = ClientState.forInternalCalls();
+        QueryState qs = new QueryState(cs);
+
+        List<ModificationStatement> statements = new ArrayList<>(queries.size());
+        for (String query : queries)
+            statements.add((ModificationStatement) QueryProcessor.parseStatement(query, cs));
+
+        BatchStatement batch =
+            new BatchStatement(BatchStatement.Type.UNLOGGED, VariableSpecifications.empty(), statements, Attributes.none());
+
+        // execute an BATCH message with now set to [now + 1 day], with ttl = 1, making its effective ttl = 1 day + 1.
+        QueryProcessor.instance.processBatch(batch, qs, batchQueryOptions(now + day), Collections.emptyMap(), System.nanoTime());
+
+        // verify that despite TTL having passed at now + 1 the rows are still there.
+        assertEquals(2, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + 1, false).size());
+
+        // jump in time by one day, make sure the row expired.
+        assertEquals(0, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + day + 1, false).size());
+    }
+
+    private static ResultSet executeSelect(String query, int nowInSeconds, boolean prepared)
+    {
+        ResultMessage message = execute(query, nowInSeconds, prepared);
+        return ((ResultMessage.Rows) message).result;
+    }
+
+    private static void executeModify(String query, int nowInSeconds, boolean prepared)
+    {
+        execute(query, nowInSeconds, prepared);
+    }
+
+    // prepared = false tests QueryMessage path, prepared = true tests ExecuteMessage path
+    private static ResultMessage execute(String query, int nowInSeconds, boolean prepared)
+    {
+        ClientState cs = ClientState.forInternalCalls();
+        QueryState qs = new QueryState(cs);
+
+        if (prepared)
+        {
+            CQLStatement statement = QueryProcessor.parseStatement(query, cs);
+            return QueryProcessor.instance.processPrepared(statement, qs, queryOptions(nowInSeconds), Collections.emptyMap(), System.nanoTime());
+        }
+        else
+        {
+            return QueryProcessor.instance.process(query, qs, queryOptions(nowInSeconds), Collections.emptyMap(), System.nanoTime());
+        }
+    }
+
+    private static QueryOptions queryOptions(int nowInSeconds)
+    {
+        return QueryOptions.create(ConsistencyLevel.ONE,
+                                   Collections.emptyList(),
+                                   false,
+                                   Integer.MAX_VALUE,
+                                   null,
+                                   null,
+                                   ProtocolVersion.CURRENT,
+                                   null,
+                                   Long.MIN_VALUE,
+                                   nowInSeconds);
+    }
+
+    private static BatchQueryOptions batchQueryOptions(int nowInSeconds)
+    {
+        return BatchQueryOptions.withoutPerStatementVariables(queryOptions(nowInSeconds));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/test/unit/org/apache/cassandra/cql3/ListsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ListsTest.java b/test/unit/org/apache/cassandra/cql3/ListsTest.java
index 63c496c..a377b96 100644
--- a/test/unit/org/apache/cassandra/cql3/ListsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ListsTest.java
@@ -140,7 +140,7 @@ public class ListsTest extends CQLTester
 
         ByteBuffer keyBuf = ByteBufferUtil.bytes("key");
         DecoratedKey key = Murmur3Partitioner.instance.decorateKey(keyBuf);
-        UpdateParameters parameters = new UpdateParameters(metaData, null, null, System.currentTimeMillis(), 1000, Collections.emptyMap());
+        UpdateParameters parameters = new UpdateParameters(metaData, null, QueryOptions.DEFAULT, System.currentTimeMillis(), 1000, Collections.emptyMap());
         Clustering clustering = Clustering.make(ByteBufferUtil.bytes(1));
         parameters.newRow(clustering);
         prepender.execute(key, parameters);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index d3b9282..9d07321 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -42,7 +42,6 @@ import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Serialization/deserialization tests for protocol objects and messages.
@@ -308,34 +307,56 @@ public class SerDeserTest
     }
 
     @Test
-    public void queryOptionsSerDeserTest() throws Exception
+    public void queryOptionsSerDeserTest()
     {
         for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
-            queryOptionsSerDeserTest(version);
-    }
+        {
+            queryOptionsSerDeserTest(
+                version,
+                QueryOptions.create(ConsistencyLevel.ALL,
+                                    Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })),
+                                    false,
+                                    5000,
+                                    Util.makeSomePagingState(version),
+                                    ConsistencyLevel.SERIAL,
+                                    version,
+                                    null)
+            );
+        }
 
-    private void queryOptionsSerDeserTest(ProtocolVersion version) throws Exception
-    {
-        queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.ALL,
-                                                              Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })),
-                                                              false,
-                                                              5000,
-                                                              Util.makeSomePagingState(version),
-                                                              ConsistencyLevel.SERIAL,
-                                                              version,
-                                                              null
-        ));
-
-        queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.LOCAL_ONE,
-                                                              Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }),
-                                                                            ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })),
-                                                              true,
-                                                              10,
-                                                              Util.makeSomePagingState(version),
-                                                              ConsistencyLevel.SERIAL,
-                                                              version,
-                                                              "some_keyspace"
-        ));
+        for (ProtocolVersion version : ProtocolVersion.supportedVersionsStartingWith(ProtocolVersion.V5))
+        {
+            queryOptionsSerDeserTest(
+                version,
+                QueryOptions.create(ConsistencyLevel.LOCAL_ONE,
+                                    Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }),
+                                                  ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })),
+                                    true,
+                                    10,
+                                    Util.makeSomePagingState(version),
+                                    ConsistencyLevel.SERIAL,
+                                    version,
+                                    "some_keyspace")
+            );
+        }
+
+        for (ProtocolVersion version : ProtocolVersion.supportedVersionsStartingWith(ProtocolVersion.V5))
+        {
+            queryOptionsSerDeserTest(
+                version,
+                QueryOptions.create(ConsistencyLevel.LOCAL_ONE,
+                                    Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }),
+                                                  ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })),
+                                    true,
+                                    10,
+                                    Util.makeSomePagingState(version),
+                                    ConsistencyLevel.SERIAL,
+                                    version,
+                                    "some_keyspace",
+                                    FBUtilities.timestampMicros(),
+                                    FBUtilities.nowInSeconds())
+            );
+        }
     }
 
     private void queryOptionsSerDeserTest(ProtocolVersion version, QueryOptions options)
@@ -353,5 +374,6 @@ public class SerDeserTest
         assertEquals(options.getPagingState(), decodedOptions.getPagingState());
         assertEquals(options.skipMetadata(), decodedOptions.skipMetadata());
         assertEquals(options.getKeyspace(), decodedOptions.getKeyspace());
+        assertEquals(options.getNowInSeconds(), decodedOptions.getNowInSeconds());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message