From commits-return-213608-archive-asf-public=cust-asf.ponee.io@cassandra.apache.org Thu Aug 30 17:56:01 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 859EC180656 for ; Thu, 30 Aug 2018 17:55:59 +0200 (CEST) Received: (qmail 71523 invoked by uid 500); 30 Aug 2018 15:55:53 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 71512 invoked by uid 99); 30 Aug 2018 15:55:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Aug 2018 15:55:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8203BE04A3; Thu, 30 Aug 2018 15:55:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aleksey@apache.org To: commits@cassandra.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: cassandra git commit: Allow specifying now-in-seconds in native protocol Date: Thu, 30 Aug 2018 15:55:53 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/trunk 2e59ea8c7 -> f8d34d356 Allow specifying now-in-seconds in native protocol patch by Aleksey Yeschenko; reviewed by Chris Lohfink for CASSANDRA-14664 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8d34d35 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8d34d35 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8d34d35 Branch: refs/heads/trunk Commit: f8d34d35646fceb76d6f747b681fe0108d7845d9 Parents: 2e59ea8 Author: Aleksey Yeshchenko Authored: Fri Aug 10 16:23:26 2018 +0100 Committer: Aleksey Yeshchenko Committed: Thu Aug 30 16:54:32 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol_v5.spec | 135 ++++++------ .../cassandra/cql3/BatchQueryOptions.java | 5 + .../org/apache/cassandra/cql3/QueryOptions.java | 78 +++++-- .../apache/cassandra/cql3/UpdateParameters.java | 2 +- .../cql3/statements/BatchStatement.java | 3 +- .../cql3/statements/ModificationStatement.java | 37 ++-- .../cql3/statements/SelectStatement.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 3 +- .../cassandra/transport/ProtocolVersion.java | 9 + .../cassandra/cql3/CustomNowInSecondsTest.java | 211 +++++++++++++++++++ .../org/apache/cassandra/cql3/ListsTest.java | 2 +- .../cassandra/transport/SerDeserTest.java | 74 ++++--- 13 files changed, 440 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5fb84a2..d95b9ed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664) * Improve BTree build performance by avoiding data copy (CASSANDRA-9989) * Make monotonic read / read repair configurable (CASSANDRA-14635) * Refactor CompactionStrategyManager (CASSANDRA-14621) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/doc/native_protocol_v5.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec index feea021..8091775 100644 --- a/doc/native_protocol_v5.spec +++ b/doc/native_protocol_v5.spec @@ -332,52 +332,57 @@ Table of Contents where is a [long string] representing the query and must be - [[name_1]...[name_n]][][][][][] + [[name_1]...[name_n]][][][][][][] where: - is the [consistency] level for the operation. - is a [int] whose bits define the options for this query and in particular influence what the remainder of the message contains. A flag is set if the bit corresponding to its `mask` is set. Supported flags are, given their mask: - 0x01: Values. If set, a [short] followed by [value] - values are provided. Those values are used for bound variables in - the query. Optionally, if the 0x40 flag is present, each value - will be preceded by a [string] name, representing the name of - the marker the value must be bound to. - 0x02: Skip_metadata. If set, the Result Set returned as a response - to the query (if any) will have the NO_METADATA flag (see - Section 4.2.5.2). - 0x04: Page_size. If set, is an [int] - controlling the desired page size of the result (in CQL3 rows). - See the section on paging (Section 8) for more details. - 0x08: With_paging_state. If set, should be present. - is a [bytes] value that should have been returned - in a result set (Section 4.2.5.2). The query will be - executed but starting from a given paging state. This is also to - continue paging on a different node than the one where it - started (See Section 8 for more details). - 0x10: With serial consistency. If set, should be - present. is the [consistency] level for the - serial phase of conditional updates. That consitency can only be - either SERIAL or LOCAL_SERIAL and if not present, it defaults to - SERIAL. This option will be ignored for anything else other than a - conditional update/insert. - 0x20: With default timestamp. If set, should be present. - is a [long] representing the default timestamp for the query - in microseconds (negative values are forbidden). This will - replace the server side assigned timestamp as default timestamp. - Note that a timestamp in the query itself will still override - this timestamp. This is entirely optional. - 0x40: With names for values. This only makes sense if the 0x01 flag is set and - is ignored otherwise. If present, the values from the 0x01 flag will - be preceded by a name (see above). Note that this is only useful for - QUERY requests where named bind markers are used; for EXECUTE statements, - since the names for the expected values was returned during preparation, - a client can always provide values in the right order without any names - and using this flag, while supported, is almost surely inefficient. - 0x80: With keyspace. If set, must be present. is a - [string] indicating the keyspace that the query should be executed in. - It supercedes the keyspace that the connection is bound to, if any. + 0x0001: Values. If set, a [short] followed by [value] + values are provided. Those values are used for bound variables in + the query. Optionally, if the 0x40 flag is present, each value + will be preceded by a [string] name, representing the name of + the marker the value must be bound to. + 0x0002: Skip_metadata. If set, the Result Set returned as a response + to the query (if any) will have the NO_METADATA flag (see + Section 4.2.5.2). + 0x0004: Page_size. If set, is an [int] + controlling the desired page size of the result (in CQL3 rows). + See the section on paging (Section 8) for more details. + 0x0008: With_paging_state. If set, should be present. + is a [bytes] value that should have been returned + in a result set (Section 4.2.5.2). The query will be + executed but starting from a given paging state. This is also to + continue paging on a different node than the one where it + started (See Section 8 for more details). + 0x0010: With serial consistency. If set, should be + present. is the [consistency] level for the + serial phase of conditional updates. That consitency can only be + either SERIAL or LOCAL_SERIAL and if not present, it defaults to + SERIAL. This option will be ignored for anything else other than a + conditional update/insert. + 0x0020: With default timestamp. If set, must be present. + is a [long] representing the default timestamp for the query + in microseconds (negative values are forbidden). This will + replace the server side assigned timestamp as default timestamp. + Note that a timestamp in the query itself will still override + this timestamp. This is entirely optional. + 0x0040: With names for values. This only makes sense if the 0x01 flag is set and + is ignored otherwise. If present, the values from the 0x01 flag will + be preceded by a name (see above). Note that this is only useful for + QUERY requests where named bind markers are used; for EXECUTE statements, + since the names for the expected values was returned during preparation, + a client can always provide values in the right order without any names + and using this flag, while supported, is almost surely inefficient. + 0x0080: With keyspace. If set, must be present. is a + [string] indicating the keyspace that the query should be executed in. + It supercedes the keyspace that the connection is bound to, if any. + 0x0100: With now in seconds. If set, must be present. + is an [int] representing the current time (now) for + the query. Affects TTL cell liveness in read queries and local deletion + time for tombstones and TTL cells in update requests. It's intended + for testing purposes and is optional. Note that the consistency is ignored by some queries (USE, CREATE, ALTER, TRUNCATE, ...). @@ -423,7 +428,7 @@ Table of Contents Allows executing a list of queries (prepared or not) as a batch (note that only DML statements are accepted in a batch). The body of the message must be: - ...[][][] + ...[][][][] where: - is a [byte] indicating the type of batch to use: - If == 0, the batch will be "logged". This is equivalent to a @@ -437,27 +442,32 @@ Table of Contents bits must always be 0 as their corresponding options do not make sense for Batch. A flag is set if the bit corresponding to its `mask` is set. Supported flags are, given their mask: - 0x10: With serial consistency. If set, should be - present. is the [consistency] level for the - serial phase of conditional updates. That consistency can only be - either SERIAL or LOCAL_SERIAL and if not present, it defaults to - SERIAL. This option will be ignored for anything else other than a - conditional update/insert. - 0x20: With default timestamp. If set, should be present. - is a [long] representing the default timestamp for the query - in microseconds. This will replace the server side assigned - timestamp as default timestamp. Note that a timestamp in the query itself - will still override this timestamp. This is entirely optional. - 0x40: With names for values. If set, then all values for all must be - preceded by a [string] that have the same meaning as in QUERY - requests [IMPORTANT NOTE: this feature does not work and should not be - used. It is specified in a way that makes it impossible for the server - to implement. This will be fixed in a future version of the native - protocol. See https://issues.apache.org/jira/browse/CASSANDRA-10246 for - more details]. - 0x80: With keyspace. If set, must be present. is a - [string] indicating the keyspace that the query should be executed in. - It supercedes the keyspace that the connection is bound to, if any. + 0x0010: With serial consistency. If set, should be + present. is the [consistency] level for the + serial phase of conditional updates. That consistency can only be + either SERIAL or LOCAL_SERIAL and if not present, it defaults to + SERIAL. This option will be ignored for anything else other than a + conditional update/insert. + 0x0020: With default timestamp. If set, should be present. + is a [long] representing the default timestamp for the query + in microseconds. This will replace the server side assigned + timestamp as default timestamp. Note that a timestamp in the query itself + will still override this timestamp. This is entirely optional. + 0x0040: With names for values. If set, then all values for all must be + preceded by a [string] that have the same meaning as in QUERY + requests [IMPORTANT NOTE: this feature does not work and should not be + used. It is specified in a way that makes it impossible for the server + to implement. This will be fixed in a future version of the native + protocol. See https://issues.apache.org/jira/browse/CASSANDRA-10246 for + more details]. + 0x0080: With keyspace. If set, must be present. is a + [string] indicating the keyspace that the query should be executed in. + It supercedes the keyspace that the connection is bound to, if any. + 0x0100: With now in seconds. If set, must be present. + is an [int] representing the current time (now) for + the query. Affects TTL cell liveness in read queries and local deletion + time for tombstones and TTL cells in update requests. It's intended + for testing purposes and is optional. - is a [short] indicating the number of following queries. - ... are the queries to execute. A must be of the form: @@ -1252,4 +1262,5 @@ Table of Contents (Sections 4.1.4, 4.1.6 and 4.1.7). * Add the duration data type * Added keyspace field in QUERY, PREPARE, and BATCH messages (Sections 4.1.4, 4.1.5, and 4.1.7). + * Added now_in_seconds field in QUERY, EXECUTE, and BATCH messages (Sections 4.1.4, 4.1.6, and 4.1.7). * Added [int] flags field in PREPARE message (Section 4.1.5). http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java index 59c35a5..ac0d148 100644 --- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java @@ -84,6 +84,11 @@ public abstract class BatchQueryOptions return wrapped.getTimestamp(state); } + public int getNowInSeconds() + { + return wrapped.getNowInSeconds(); + } + private static class WithoutPerStatementVariables extends BatchQueryOptions { private WithoutPerStatementVariables(QueryOptions wrapped, List queryOrIdList) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/QueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java index 49ab9b4..e546304 100644 --- a/src/java/org/apache/cassandra/cql3/QueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java @@ -34,6 +34,7 @@ import org.apache.cassandra.transport.CBCodec; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -69,9 +70,34 @@ public abstract class QueryOptions return new DefaultQueryOptions(null, null, true, null, protocolVersion); } - public static QueryOptions create(ConsistencyLevel consistency, List values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency, ProtocolVersion version, String keyspace) + public static QueryOptions create(ConsistencyLevel consistency, + List values, + boolean skipMetadata, + int pageSize, + PagingState pagingState, + ConsistencyLevel serialConsistency, + ProtocolVersion version, + String keyspace) { - return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L, keyspace), version); + return create(consistency, values, skipMetadata, pageSize, pagingState, serialConsistency, version, keyspace, Long.MIN_VALUE, Integer.MIN_VALUE); + } + + public static QueryOptions create(ConsistencyLevel consistency, + List values, + boolean skipMetadata, + int pageSize, + PagingState pagingState, + ConsistencyLevel serialConsistency, + ProtocolVersion version, + String keyspace, + long timestamp, + int nowInSeconds) + { + return new DefaultQueryOptions(consistency, + values, + skipMetadata, + new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace, nowInSeconds), + version); } public static QueryOptions addColumnSpecifications(QueryOptions options, List columnSpecs) @@ -174,6 +200,12 @@ public abstract class QueryOptions return tstamp != Long.MIN_VALUE ? tstamp : state.getTimestamp(); } + public int getNowInSeconds() + { + int nowInSeconds = getSpecificOptions().nowInSeconds; + return Integer.MIN_VALUE == nowInSeconds ? FBUtilities.nowInSeconds() : nowInSeconds; + } + /** The keyspace that this query is bound to, or null if not relevant. */ public String getKeyspace() { return getSpecificOptions().keyspace; } @@ -346,21 +378,28 @@ public abstract class QueryOptions // Options that are likely to not be present in most queries static class SpecificOptions { - private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE, null); + private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE, null, Integer.MIN_VALUE); private final int pageSize; private final PagingState state; private final ConsistencyLevel serialConsistency; private final long timestamp; private final String keyspace; - - private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp, String keyspace) + private final int nowInSeconds; + + private SpecificOptions(int pageSize, + PagingState state, + ConsistencyLevel serialConsistency, + long timestamp, + String keyspace, + int nowInSeconds) { this.pageSize = pageSize; this.state = state; this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency; this.timestamp = timestamp; this.keyspace = keyspace; + this.nowInSeconds = nowInSeconds; } } @@ -376,7 +415,8 @@ public abstract class QueryOptions SERIAL_CONSISTENCY, TIMESTAMP, NAMES_FOR_VALUES, - KEYSPACE; + KEYSPACE, + NOW_IN_SECONDS; private static final Flag[] ALL_VALUES = values(); @@ -442,8 +482,10 @@ public abstract class QueryOptions timestamp = ts; } String keyspace = flags.contains(Flag.KEYSPACE) ? CBUtil.readString(body) : null; - options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace); + int nowInSeconds = flags.contains(Flag.NOW_IN_SECONDS) ? body.readInt() : Integer.MIN_VALUE; + options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace, nowInSeconds); } + DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version); return names == null ? opts : new OptionsWithNames(opts, names); } @@ -452,7 +494,7 @@ public abstract class QueryOptions { CBUtil.writeConsistencyLevel(options.getConsistency(), dest); - EnumSet flags = gatherFlags(options); + EnumSet flags = gatherFlags(options, version); if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) dest.writeInt(Flag.serialize(flags)); else @@ -470,6 +512,8 @@ public abstract class QueryOptions dest.writeLong(options.getSpecificOptions().timestamp); if (flags.contains(Flag.KEYSPACE)) CBUtil.writeString(options.getSpecificOptions().keyspace, dest); + if (flags.contains(Flag.NOW_IN_SECONDS)) + dest.writeInt(options.getSpecificOptions().nowInSeconds); // Note that we don't really have to bother with NAMES_FOR_VALUES server side, // and in fact we never really encode QueryOptions, only decode them, so we @@ -482,7 +526,7 @@ public abstract class QueryOptions size += CBUtil.sizeOfConsistencyLevel(options.getConsistency()); - EnumSet flags = gatherFlags(options); + EnumSet flags = gatherFlags(options, version); size += (version.isGreaterOrEqualTo(ProtocolVersion.V5) ? 4 : 1); if (flags.contains(Flag.VALUES)) @@ -497,10 +541,13 @@ public abstract class QueryOptions size += 8; if (flags.contains(Flag.KEYSPACE)) size += CBUtil.sizeOfString(options.getSpecificOptions().keyspace); + if (flags.contains(Flag.NOW_IN_SECONDS)) + size += 4; + return size; } - private EnumSet gatherFlags(QueryOptions options) + private EnumSet gatherFlags(QueryOptions options, ProtocolVersion version) { EnumSet flags = EnumSet.noneOf(Flag.class); if (options.getValues().size() > 0) @@ -515,8 +562,15 @@ public abstract class QueryOptions flags.add(Flag.SERIAL_CONSISTENCY); if (options.getSpecificOptions().timestamp != Long.MIN_VALUE) flags.add(Flag.TIMESTAMP); - if (options.getSpecificOptions().keyspace != null) - flags.add(Flag.KEYSPACE); + + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + { + if (options.getSpecificOptions().keyspace != null) + flags.add(Flag.KEYSPACE); + if (options.getSpecificOptions().nowInSeconds != Integer.MIN_VALUE) + flags.add(Flag.NOW_IN_SECONDS); + } + return flags; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index 9d6f2e9..500862e 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -66,7 +66,7 @@ public class UpdateParameters this.updatedColumns = updatedColumns; this.options = options; - this.nowInSec = FBUtilities.nowInSeconds(); + this.nowInSec = options.getNowInSeconds(); this.timestamp = timestamp; this.ttl = ttl; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 9ed150c..089c532 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -443,6 +443,7 @@ public class BatchStatement implements CQLStatement options.getSerialConsistency(), options.getConsistency(), state.getClientState(), + options.getNowInSeconds(), queryStartNanoTime)) { @@ -551,7 +552,7 @@ public class BatchStatement implements CQLStatement String ksName = request.metadata.keyspace; String tableName = request.metadata.name; - try (RowIterator result = ModificationStatement.casInternal(request, state)) + try (RowIterator result = ModificationStatement.casInternal(request, state, options.getNowInSeconds())) { return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0))); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index c388c48..5f3d07f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -375,6 +375,7 @@ public abstract class ModificationStatement implements CQLStatement DataLimits limits, boolean local, ConsistencyLevel cl, + int nowInSeconds, long queryStartNanoTime) { if (!requiresRead()) @@ -390,10 +391,9 @@ public abstract class ModificationStatement implements CQLStatement } List commands = new ArrayList<>(partitionKeys.size()); - int nowInSec = FBUtilities.nowInSeconds(); for (ByteBuffer key : partitionKeys) commands.add(SinglePartitionReadCommand.create(metadata(), - nowInSec, + nowInSeconds, ColumnFilter.selection(this.requiresRead), RowFilter.NONE, limits, @@ -484,6 +484,7 @@ public abstract class ModificationStatement implements CQLStatement options.getSerialConsistency(), options.getConsistency(), queryState.getClientState(), + options.getNowInSeconds(), queryStartNanoTime)) { return new ResultMessage.Rows(buildCasResultSet(result, options)); @@ -600,7 +601,7 @@ public abstract class ModificationStatement implements CQLStatement SelectStatement.forSelection(metadata, selection).processPartition(partition, options, builder, - FBUtilities.nowInSeconds()); + options.getNowInSeconds()); return builder.build(); } @@ -622,17 +623,17 @@ public abstract class ModificationStatement implements CQLStatement public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException { CQL3CasRequest request = makeCasRequest(state, options); - try (RowIterator result = casInternal(request, state)) + try (RowIterator result = casInternal(request, state, options.getNowInSeconds())) { return new ResultMessage.Rows(buildCasResultSet(result, options)); } } - static RowIterator casInternal(CQL3CasRequest request, QueryState state) + static RowIterator casInternal(CQL3CasRequest request, QueryState state, int nowInSeconds) { UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp()); - SinglePartitionReadQuery readCommand = request.readCommand(FBUtilities.nowInSeconds()); + SinglePartitionReadQuery readCommand = request.readCommand(nowInSeconds); FilteredPartition current; try (ReadExecutionController executionController = readCommand.executionController(); PartitionIterator iter = readCommand.executeInternal(executionController)) @@ -656,21 +657,21 @@ public abstract class ModificationStatement implements CQLStatement * * @param options value for prepared statement markers * @param local if true, any requests (for collections) performed by getMutation should be done locally only. - * @param now the current timestamp in microseconds to use if no timestamp is user provided. + * @param timestamp the current timestamp in microseconds to use if no timestamp is user provided. * * @return list of the mutations */ - private Collection getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime) + private Collection getMutations(QueryOptions options, boolean local, long timestamp, long queryStartNanoTime) { UpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, 1); - addUpdates(collector, options, local, now, queryStartNanoTime); + addUpdates(collector, options, local, timestamp, queryStartNanoTime); return collector.toMutations(); } final void addUpdates(UpdatesCollector collector, QueryOptions options, boolean local, - long now, + long timestamp, long queryStartNanoTime) { List keys = buildPartitionKeyNames(options); @@ -688,7 +689,7 @@ public abstract class ModificationStatement implements CQLStatement options, DataLimits.NONE, local, - now, + timestamp, queryStartNanoTime); for (ByteBuffer key : keys) { @@ -709,7 +710,7 @@ public abstract class ModificationStatement implements CQLStatement if (restrictions.hasClusteringColumnsRestrictions() && clusterings.isEmpty()) return; - UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now, queryStartNanoTime); + UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, timestamp, queryStartNanoTime); for (ByteBuffer key : keys) { @@ -752,7 +753,7 @@ public abstract class ModificationStatement implements CQLStatement NavigableSet clusterings, QueryOptions options, boolean local, - long now, + long timestamp, long queryStartNanoTime) { if (clusterings.contains(Clustering.STATIC_CLUSTERING)) @@ -761,7 +762,7 @@ public abstract class ModificationStatement implements CQLStatement options, DataLimits.cqlLimits(1), local, - now, + timestamp, queryStartNanoTime); return makeUpdateParameters(keys, @@ -769,7 +770,7 @@ public abstract class ModificationStatement implements CQLStatement options, DataLimits.NONE, local, - now, + timestamp, queryStartNanoTime); } @@ -778,12 +779,12 @@ public abstract class ModificationStatement implements CQLStatement QueryOptions options, DataLimits limits, boolean local, - long now, + long timestamp, long queryStartNanoTime) { // Some lists operation requires reading - Map lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), queryStartNanoTime); - return new UpdateParameters(metadata(), updatedColumns(), options, getTimestamp(now, options), getTimeToLive(options), lists); + Map lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), options.getNowInSeconds(), queryStartNanoTime); + return new UpdateParameters(metadata(), updatedColumns(), options, getTimestamp(timestamp, options), getTimeToLive(options), lists); } private Slices toSlices(SortedSet startBounds, SortedSet endBounds) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index a5105f2..61715b9 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -234,7 +234,7 @@ public class SelectStatement implements CQLStatement cl.validateForRead(keyspace()); - int nowInSec = FBUtilities.nowInSeconds(); + int nowInSec = options.getNowInSeconds(); int userLimit = getLimit(options); int userPerPartitionLimit = getPerPartitionLimit(options); int pageSize = options.getPageSize(); @@ -428,7 +428,7 @@ public class SelectStatement implements CQLStatement public ResultMessage.Rows executeLocally(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { - return executeInternal(state, options, FBUtilities.nowInSeconds(), System.nanoTime()); + return executeInternal(state, options, options.getNowInSeconds(), System.nanoTime()); } public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 58f08d4..a011841 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -235,6 +235,7 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, ClientState state, + int nowInSeconds, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException { @@ -261,7 +262,7 @@ public class StorageProxy implements StorageProxyMBean // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); - SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(FBUtilities.nowInSeconds()); + SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(nowInSeconds); ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; FilteredPartition current; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/src/java/org/apache/cassandra/transport/ProtocolVersion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java b/src/java/org/apache/cassandra/transport/ProtocolVersion.java index cd73c86..9a13903 100644 --- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java +++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java @@ -84,6 +84,15 @@ public enum ProtocolVersion implements Comparable return ret; } + public static List supportedVersionsStartingWith(ProtocolVersion smallestVersion) + { + ArrayList versions = new ArrayList<>(SUPPORTED_VERSIONS.length); + for (ProtocolVersion version : SUPPORTED_VERSIONS) + if (version.isGreaterOrEqualTo(smallestVersion)) + versions.add(version); + return versions; + } + public static ProtocolVersion decode(int versionNum) { ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && versionNum <= MAX_SUPPORTED_VERSION.num http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java b/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java new file mode 100644 index 0000000..8bb0bf4 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.google.common.collect.ImmutableList; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.cql3.statements.ModificationStatement; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.transport.messages.ResultMessage; + +import static java.lang.String.format; +import static org.junit.Assert.assertEquals; + +public class CustomNowInSecondsTest extends CQLTester +{ + @BeforeClass + public static void setUpClass() + { + prepareServer(); + requireNetwork(); + } + + @Test + public void testSelectQuery() + { + testSelectQuery(false); + testSelectQuery(true); + } + + private void testSelectQuery(boolean prepared) + { + int now = (int) (System.currentTimeMillis() / 1000); + int day = 86400; + + String ks = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); + String tbl = createTable(ks, "CREATE TABLE %s (id int primary key, val int)"); + + // insert a row with TTL = 1 day. + executeModify(format("INSERT INTO %s.%s (id, val) VALUES (0, 0) USING TTL %d", ks, tbl, day), Integer.MIN_VALUE, prepared); + + // execute a SELECT query without overriding nowInSeconds - make sure we observe one row. + assertEquals(1, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), Integer.MIN_VALUE, prepared).size()); + + // execute a SELECT query with nowInSeconds set to [now + 1 day + 1], when the row should have expired. + assertEquals(0, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + day + 1, prepared).size()); + } + + @Test + public void testModifyQuery() + { + testModifyQuery(false); + testModifyQuery(true); + } + + private void testModifyQuery(boolean prepared) + { + int now = (int) (System.currentTimeMillis() / 1000); + int day = 86400; + + String ks = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); + String tbl = createTable(ks, "CREATE TABLE %s (id int primary key, val int)"); + + // execute an INSERT query with now set to [now + 1 day], with ttl = 1, making its effective ttl = 1 day + 1. + executeModify(format("INSERT INTO %s.%s (id, val) VALUES (0, 0) USING TTL %d", ks, tbl, 1), now + day, prepared); + + // verify that despite TTL having passed (if not for nowInSeconds override) the row is still there. + assertEquals(1, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + 1, prepared).size()); + + // jump in time by one day, make sure the row expired + assertEquals(0, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + day + 1, prepared).size()); + } + + @Test + public void testBatchQuery() + { + testBatchQuery(false); + testBatchQuery(true); + } + + private void testBatchQuery(boolean prepared) + { + int now = (int) (System.currentTimeMillis() / 1000); + int day = 86400; + + String ks = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); + String tbl = createTable(ks, "CREATE TABLE %s (id int primary key, val int)"); + + // execute an BATCH query with now set to [now + 1 day], with ttl = 1, making its effective ttl = 1 day + 1. + String batch = format("BEGIN BATCH " + + "INSERT INTO %s.%s (id, val) VALUES (0, 0) USING TTL %d; " + + "INSERT INTO %s.%s (id, val) VALUES (1, 1) USING TTL %d; " + + "APPLY BATCH;", + ks, tbl, 1, + ks, tbl, 1); + executeModify(batch, now + day, prepared); + + // verify that despite TTL having passed at now + 1 the rows are still there. + assertEquals(2, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + 1, prepared).size()); + + // jump in time by one day, make sure the row expired. + assertEquals(0, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + day + 1, prepared).size()); + } + + @Test + public void testBatchMessage() + { + // test BatchMessage path + + int now = (int) (System.currentTimeMillis() / 1000); + int day = 86400; + + String ks = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); + String tbl = createTable(ks, "CREATE TABLE %s (id int primary key, val int)"); + + List queries = ImmutableList.of( + format("INSERT INTO %s.%s (id, val) VALUES (0, 0) USING TTL %d;", ks, tbl, 1), + format("INSERT INTO %s.%s (id, val) VALUES (1, 1) USING TTL %d;", ks, tbl, 1) + ); + + ClientState cs = ClientState.forInternalCalls(); + QueryState qs = new QueryState(cs); + + List statements = new ArrayList<>(queries.size()); + for (String query : queries) + statements.add((ModificationStatement) QueryProcessor.parseStatement(query, cs)); + + BatchStatement batch = + new BatchStatement(BatchStatement.Type.UNLOGGED, VariableSpecifications.empty(), statements, Attributes.none()); + + // execute an BATCH message with now set to [now + 1 day], with ttl = 1, making its effective ttl = 1 day + 1. + QueryProcessor.instance.processBatch(batch, qs, batchQueryOptions(now + day), Collections.emptyMap(), System.nanoTime()); + + // verify that despite TTL having passed at now + 1 the rows are still there. + assertEquals(2, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + 1, false).size()); + + // jump in time by one day, make sure the row expired. + assertEquals(0, executeSelect(format("SELECT * FROM %s.%s", ks, tbl), now + day + 1, false).size()); + } + + private static ResultSet executeSelect(String query, int nowInSeconds, boolean prepared) + { + ResultMessage message = execute(query, nowInSeconds, prepared); + return ((ResultMessage.Rows) message).result; + } + + private static void executeModify(String query, int nowInSeconds, boolean prepared) + { + execute(query, nowInSeconds, prepared); + } + + // prepared = false tests QueryMessage path, prepared = true tests ExecuteMessage path + private static ResultMessage execute(String query, int nowInSeconds, boolean prepared) + { + ClientState cs = ClientState.forInternalCalls(); + QueryState qs = new QueryState(cs); + + if (prepared) + { + CQLStatement statement = QueryProcessor.parseStatement(query, cs); + return QueryProcessor.instance.processPrepared(statement, qs, queryOptions(nowInSeconds), Collections.emptyMap(), System.nanoTime()); + } + else + { + return QueryProcessor.instance.process(query, qs, queryOptions(nowInSeconds), Collections.emptyMap(), System.nanoTime()); + } + } + + private static QueryOptions queryOptions(int nowInSeconds) + { + return QueryOptions.create(ConsistencyLevel.ONE, + Collections.emptyList(), + false, + Integer.MAX_VALUE, + null, + null, + ProtocolVersion.CURRENT, + null, + Long.MIN_VALUE, + nowInSeconds); + } + + private static BatchQueryOptions batchQueryOptions(int nowInSeconds) + { + return BatchQueryOptions.withoutPerStatementVariables(queryOptions(nowInSeconds)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/test/unit/org/apache/cassandra/cql3/ListsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ListsTest.java b/test/unit/org/apache/cassandra/cql3/ListsTest.java index 63c496c..a377b96 100644 --- a/test/unit/org/apache/cassandra/cql3/ListsTest.java +++ b/test/unit/org/apache/cassandra/cql3/ListsTest.java @@ -140,7 +140,7 @@ public class ListsTest extends CQLTester ByteBuffer keyBuf = ByteBufferUtil.bytes("key"); DecoratedKey key = Murmur3Partitioner.instance.decorateKey(keyBuf); - UpdateParameters parameters = new UpdateParameters(metaData, null, null, System.currentTimeMillis(), 1000, Collections.emptyMap()); + UpdateParameters parameters = new UpdateParameters(metaData, null, QueryOptions.DEFAULT, System.currentTimeMillis(), 1000, Collections.emptyMap()); Clustering clustering = Clustering.make(ByteBufferUtil.bytes(1)); parameters.newRow(clustering); prepender.execute(key, parameters); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8d34d35/test/unit/org/apache/cassandra/transport/SerDeserTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java index d3b9282..9d07321 100644 --- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java +++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java @@ -42,7 +42,6 @@ import static org.junit.Assert.assertEquals; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; /** * Serialization/deserialization tests for protocol objects and messages. @@ -308,34 +307,56 @@ public class SerDeserTest } @Test - public void queryOptionsSerDeserTest() throws Exception + public void queryOptionsSerDeserTest() { for (ProtocolVersion version : ProtocolVersion.SUPPORTED) - queryOptionsSerDeserTest(version); - } + { + queryOptionsSerDeserTest( + version, + QueryOptions.create(ConsistencyLevel.ALL, + Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })), + false, + 5000, + Util.makeSomePagingState(version), + ConsistencyLevel.SERIAL, + version, + null) + ); + } - private void queryOptionsSerDeserTest(ProtocolVersion version) throws Exception - { - queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.ALL, - Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })), - false, - 5000, - Util.makeSomePagingState(version), - ConsistencyLevel.SERIAL, - version, - null - )); - - queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.LOCAL_ONE, - Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }), - ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })), - true, - 10, - Util.makeSomePagingState(version), - ConsistencyLevel.SERIAL, - version, - "some_keyspace" - )); + for (ProtocolVersion version : ProtocolVersion.supportedVersionsStartingWith(ProtocolVersion.V5)) + { + queryOptionsSerDeserTest( + version, + QueryOptions.create(ConsistencyLevel.LOCAL_ONE, + Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }), + ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })), + true, + 10, + Util.makeSomePagingState(version), + ConsistencyLevel.SERIAL, + version, + "some_keyspace") + ); + } + + for (ProtocolVersion version : ProtocolVersion.supportedVersionsStartingWith(ProtocolVersion.V5)) + { + queryOptionsSerDeserTest( + version, + QueryOptions.create(ConsistencyLevel.LOCAL_ONE, + Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }), + ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })), + true, + 10, + Util.makeSomePagingState(version), + ConsistencyLevel.SERIAL, + version, + "some_keyspace", + FBUtilities.timestampMicros(), + FBUtilities.nowInSeconds()) + ); + } } private void queryOptionsSerDeserTest(ProtocolVersion version, QueryOptions options) @@ -353,5 +374,6 @@ public class SerDeserTest assertEquals(options.getPagingState(), decodedOptions.getPagingState()); assertEquals(options.skipMetadata(), decodedOptions.skipMetadata()); assertEquals(options.getKeyspace(), decodedOptions.getKeyspace()); + assertEquals(options.getNowInSeconds(), decodedOptions.getNowInSeconds()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org For additional commands, e-mail: commits-help@cassandra.apache.org