Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 62BFB18187 for ; Wed, 24 Jun 2015 16:15:02 +0000 (UTC) Received: (qmail 9820 invoked by uid 500); 24 Jun 2015 16:14:56 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 9760 invoked by uid 500); 24 Jun 2015 16:14:56 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 8237 invoked by uid 99); 24 Jun 2015 16:14:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 16:14:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1A680E3651; Wed, 24 Jun 2015 16:14:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jmckenzie@apache.org To: commits@cassandra.apache.org Date: Wed, 24 Jun 2015 16:15:24 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/32] cassandra git commit: 2.2 commit for CASSANDRA-9160 2.2 commit for CASSANDRA-9160 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01115f72 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01115f72 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01115f72 Branch: refs/heads/trunk Commit: 01115f72fc50b603ece0a00431308abec24706b7 Parents: 20364f4 Author: Stefania Alborghetti Authored: Wed Jun 24 12:11:46 2015 -0400 Committer: Josh McKenzie Committed: Wed Jun 24 12:11:46 2015 -0400 ---------------------------------------------------------------------- .../cassandra/config/DatabaseDescriptor.java | 6 + .../org/apache/cassandra/cql3/ResultSet.java | 9 + .../apache/cassandra/cql3/UntypedResultSet.java | 2 +- .../cql3/statements/BatchStatement.java | 69 +- .../cql3/statements/CQL3CasRequest.java | 8 +- .../cql3/statements/ModificationStatement.java | 70 +- .../cql3/statements/SelectStatement.java | 84 +- .../cql3/statements/TruncateStatement.java | 13 +- .../apache/cassandra/service/StorageProxy.java | 4 +- .../org/apache/cassandra/utils/UUIDGen.java | 16 +- .../org/apache/cassandra/cql3/ManyRowsTest.java | 92 + .../apache/cassandra/cql3/AggregationTest.java | 1479 ---------- .../org/apache/cassandra/cql3/AliasTest.java | 40 - .../apache/cassandra/cql3/AlterTableTest.java | 113 - .../org/apache/cassandra/cql3/CQLTester.java | 172 +- .../apache/cassandra/cql3/CollectionsTest.java | 340 --- .../cassandra/cql3/ContainsRelationTest.java | 283 -- .../cassandra/cql3/CrcCheckChanceTest.java | 159 -- .../cql3/CreateAndAlterKeyspaceTest.java | 37 - .../cql3/CreateIndexStatementTest.java | 101 - .../apache/cassandra/cql3/CreateTableTest.java | 69 - .../cql3/CreateTriggerStatementTest.java | 121 - .../cassandra/cql3/FrozenCollectionsTest.java | 1101 -------- .../cql3/IndexedValuesValidationTest.java | 149 - .../org/apache/cassandra/cql3/JsonTest.java | 947 ------- .../apache/cassandra/cql3/ModificationTest.java | 112 - .../cassandra/cql3/MultiColumnRelationTest.java | 936 ------- .../org/apache/cassandra/cql3/PgStringTest.java | 76 - .../cassandra/cql3/RangeDeletionTest.java | 35 - .../apache/cassandra/cql3/RoleSyntaxTest.java | 51 - .../cql3/SSTableMetadataTrackingTest.java | 160 -- .../cql3/SecondaryIndexOnMapEntriesTest.java | 337 --- .../cql3/SelectWithTokenFunctionTest.java | 233 -- .../cassandra/cql3/SelectionOrderingTest.java | 233 -- .../cql3/SingleColumnRelationTest.java | 553 ---- .../SliceQueryFilterWithTombstonesTest.java | 170 -- .../cassandra/cql3/StaticColumnsQueryTest.java | 280 -- .../cassandra/cql3/ThriftCompatibilityTest.java | 1 + .../apache/cassandra/cql3/TimestampTest.java | 36 - .../apache/cassandra/cql3/TupleTypeTest.java | 114 - .../org/apache/cassandra/cql3/TypeCastTest.java | 54 - .../org/apache/cassandra/cql3/TypeTest.java | 89 - .../org/apache/cassandra/cql3/UFAuthTest.java | 724 ----- .../cassandra/cql3/UFIdentificationTest.java | 376 --- test/unit/org/apache/cassandra/cql3/UFTest.java | 2585 ----------------- .../apache/cassandra/cql3/UseStatementTest.java | 29 - .../apache/cassandra/cql3/UserTypesTest.java | 334 --- .../selection/SelectionColumnMappingTest.java | 9 + .../validation/entities/CollectionsTest.java | 588 ++++ .../cql3/validation/entities/CountersTest.java | 115 + .../cql3/validation/entities/DateTypeTest.java | 39 + .../entities/FrozenCollectionsTest.java | 1111 ++++++++ .../cql3/validation/entities/JsonTest.java | 958 +++++++ .../SecondaryIndexOnMapEntriesTest.java | 348 +++ .../validation/entities/SecondaryIndexTest.java | 645 +++++ .../validation/entities/StaticColumnsTest.java | 271 ++ .../cql3/validation/entities/TimestampTest.java | 155 ++ .../cql3/validation/entities/TimeuuidTest.java | 81 + .../cql3/validation/entities/TupleTypeTest.java | 171 ++ .../cql3/validation/entities/TypeTest.java | 92 + .../cql3/validation/entities/UFAuthTest.java | 728 +++++ .../entities/UFIdentificationTest.java | 380 +++ .../cql3/validation/entities/UFTest.java | 2596 ++++++++++++++++++ .../cql3/validation/entities/UserTypesTest.java | 404 +++ .../miscellaneous/CrcCheckChanceTest.java | 160 ++ .../validation/miscellaneous/OverflowTest.java | 331 +++ .../validation/miscellaneous/PgStringTest.java | 77 + .../miscellaneous/RoleSyntaxTest.java | 53 + .../SSTableMetadataTrackingTest.java | 161 ++ .../miscellaneous/TombstonesTest.java | 171 ++ .../validation/operations/AggregationTest.java | 1481 ++++++++++ .../cql3/validation/operations/AlterTest.java | 203 ++ .../cql3/validation/operations/BatchTest.java | 106 + .../cql3/validation/operations/CreateTest.java | 498 ++++ .../cql3/validation/operations/DeleteTest.java | 329 +++ .../cql3/validation/operations/InsertTest.java | 59 + .../operations/InsertUpdateIfCondition.java | 861 ++++++ .../validation/operations/SelectLimitTest.java | 112 + .../SelectMultiColumnRelationTest.java | 962 +++++++ .../operations/SelectOrderByTest.java | 504 ++++ .../SelectOrderedPartitionerTest.java | 481 ++++ .../SelectSingleColumnRelationTest.java | 555 ++++ .../cql3/validation/operations/SelectTest.java | 1336 +++++++++ .../cql3/validation/operations/UpdateTest.java | 86 + .../cql3/validation/operations/UseTest.java | 31 + .../cassandra/service/ClientWarningsTest.java | 5 +- .../cassandra/transport/MessagePayloadTest.java | 6 +- .../stress/generate/values/TimeUUIDs.java | 2 +- 88 files changed, 17720 insertions(+), 12543 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 443246e..39a06cb 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1544,6 +1544,12 @@ public class DatabaseDescriptor return conf.row_cache_size_in_mb; } + @VisibleForTesting + public static void setRowCacheSizeInMB(long val) + { + conf.row_cache_size_in_mb = val; + } + public static int getRowCacheSavePeriod() { return conf.row_cache_save_period; http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/cql3/ResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java index 281923c..ea26f34 100644 --- a/src/java/org/apache/cassandra/cql3/ResultSet.java +++ b/src/java/org/apache/cassandra/cql3/ResultSet.java @@ -254,6 +254,15 @@ public class ResultSet return new ResultMetadata(EnumSet.copyOf(flags), names, columnCount, pagingState); } + /** + * Return only the column names requested by the user, excluding those added for post-query re-orderings, + * see definition of names and columnCount. + **/ + public List requestNames() + { + return names.subList(0, columnCount); + } + // The maximum number of values that the ResultSet can hold. This can be bigger than columnCount due to CASSANDRA-4911 public int valueCount() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/cql3/UntypedResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java index bf3cbb5..49e0d86 100644 --- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java +++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java @@ -95,7 +95,7 @@ public abstract class UntypedResultSet implements Iterable public List metadata() { - return cqlRows.metadata.names; + return cqlRows.metadata.requestNames(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index b1751a2..0661b56 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -38,9 +38,11 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.thrift.Column; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.Pair; /** * A BATCH statement parsed from a CQL query. @@ -343,10 +345,31 @@ public class BatchStatement implements CQLStatement private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state) throws RequestExecutionException, RequestValidationException { + Pair> p = makeCasRequest(options, state); + CQL3CasRequest casRequest = p.left; + Set columnsWithConditions = p.right; + + ColumnFamily result = StorageProxy.cas(casRequest.cfm.ksName, + casRequest.cfm.cfName, + casRequest.key, + casRequest, + options.getSerialConsistency(), + options.getConsistency(), + state.getClientState()); + + return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(casRequest.cfm.ksName, + casRequest.key, + casRequest.cfm.cfName, + result, + columnsWithConditions, + true, + options.forStatement(0))); + } + + private Pair> makeCasRequest(BatchQueryOptions options, QueryState state) + { long now = state.getTimestamp(); ByteBuffer key = null; - String ksName = null; - String cfName = null; CQL3CasRequest casRequest = null; Set columnsWithConditions = new LinkedHashSet<>(); @@ -361,8 +384,6 @@ public class BatchStatement implements CQLStatement if (key == null) { key = pks.get(0); - ksName = statement.cfm.ksName; - cfName = statement.cfm.cfName; casRequest = new CQL3CasRequest(statement.cfm, key, true); } else if (!key.equals(pks.get(0))) @@ -383,23 +404,49 @@ public class BatchStatement implements CQLStatement casRequest.addRowUpdate(clusteringPrefix, statement, statementOptions, timestamp); } - ColumnFamily result = StorageProxy.cas(ksName, cfName, key, casRequest, options.getSerialConsistency(), options.getConsistency(), state.getClientState()); - - return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true, options.forStatement(0))); + return Pair.create(casRequest, columnsWithConditions); } public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException { - assert !hasConditions; + if (hasConditions) + return executeInternalWithConditions(BatchQueryOptions.withoutPerStatementVariables(options), queryState); + + executeInternalWithoutCondition(queryState, options); + return new ResultMessage.Void(); + } + + private ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException + { for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp())) { - // We don't use counters internally. - assert mutation instanceof Mutation; - ((Mutation) mutation).apply(); + assert mutation instanceof Mutation || mutation instanceof CounterMutation; + + if (mutation instanceof Mutation) + ((Mutation) mutation).apply(); + else if (mutation instanceof CounterMutation) + ((CounterMutation) mutation).apply(); } return null; } + private ResultMessage executeInternalWithConditions(BatchQueryOptions options, QueryState state) throws RequestExecutionException, RequestValidationException + { + Pair> p = makeCasRequest(options, state); + CQL3CasRequest request = p.left; + Set columnsWithConditions = p.right; + + ColumnFamily result = ModificationStatement.casInternal(request, state); + + return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(request.cfm.ksName, + request.key, + request.cfm.cfName, + result, + columnsWithConditions, + true, + options.forStatement(0))); + } + public interface BatchVariables { public List getVariablesForStatement(int statementInBatch); http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index 4ff9c27..081a14e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -36,10 +36,10 @@ import org.apache.cassandra.utils.Pair; */ public class CQL3CasRequest implements CASRequest { - private final CFMetaData cfm; - private final ByteBuffer key; - private final long now; - private final boolean isBatch; + final CFMetaData cfm; + final ByteBuffer key; + final long now; + final boolean isBatch; // We index RowCondition by the prefix of the row they applied to for 2 reasons: // 1) this allows to keep things sorted to build the ColumnSlice array below http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 0862a9f..aac94be 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.cql3.restrictions.Restriction; @@ -41,9 +42,12 @@ import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDGen; import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull; @@ -486,6 +490,20 @@ public abstract class ModificationStatement implements CQLStatement public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException { + CQL3CasRequest request = makeCasRequest(queryState, options); + + ColumnFamily result = StorageProxy.cas(keyspace(), + columnFamily(), + request.key, + request, + options.getSerialConsistency(), + options.getConsistency(), + queryState.getClientState()); + return new ResultMessage.Rows(buildCasResultSet(request.key, result, options)); + } + + private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions options) + { List keys = buildPartitionKeyNames(options); // We don't support IN for CAS operation so far if (keys.size() > 1) @@ -498,15 +516,7 @@ public abstract class ModificationStatement implements CQLStatement CQL3CasRequest request = new CQL3CasRequest(cfm, key, false); addConditions(prefix, request, options); request.addRowUpdate(prefix, this, options, now); - - ColumnFamily result = StorageProxy.cas(keyspace(), - columnFamily(), - key, - request, - options.getSerialConsistency(), - options.getConsistency(), - queryState.getClientState()); - return new ResultMessage.Rows(buildCasResultSet(key, result, options)); + return request; } public void addConditions(Composite clusteringPrefix, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException @@ -608,9 +618,13 @@ public abstract class ModificationStatement implements CQLStatement public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException { - if (hasConditions()) - throw new UnsupportedOperationException(); + return hasConditions() + ? executeInternalWithCondition(queryState, options) + : executeInternalWithoutCondition(queryState, options); + } + public ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException + { for (IMutation mutation : getMutations(options, true, queryState.getTimestamp())) { assert mutation instanceof Mutation || mutation instanceof CounterMutation; @@ -623,6 +637,40 @@ public abstract class ModificationStatement implements CQLStatement return null; } + public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException + { + CQL3CasRequest request = makeCasRequest(state, options); + ColumnFamily result = casInternal(request, state); + return new ResultMessage.Rows(buildCasResultSet(request.key, result, options)); + } + + static ColumnFamily casInternal(CQL3CasRequest request, QueryState state) + { + long millis = state.getTimestamp() / 1000; + long nanos = ((state.getTimestamp() - (millis * 1000)) + 1) * 10; + UUID ballot = UUIDGen.getTimeUUID(millis, nanos); + CFMetaData metadata = Schema.instance.getCFMetaData(request.cfm.ksName, request.cfm.cfName); + + ReadCommand readCommand = ReadCommand.create(request.cfm.ksName, request.key, request.cfm.cfName, request.now, request.readFilter()); + Keyspace keyspace = Keyspace.open(request.cfm.ksName); + + Row row = readCommand.getRow(keyspace); + ColumnFamily current = row.cf; + if (!request.appliesTo(current)) + { + if (current == null) + current = ArrayBackedSortedColumns.factory.create(metadata); + return current; + } + + ColumnFamily updates = request.makeUpdates(current); + updates = TriggerExecutor.instance.execute(request.key, updates); + + Commit proposal = Commit.newProposal(request.key, ballot, updates); + proposal.makeMutation().apply(); + return null; + } + /** * Convert statement into a list of mutations to apply on the server * http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 8ce555f..e2708cd 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -164,37 +164,13 @@ public class SelectStatement implements CQLStatement int limit = getLimit(options); long now = System.currentTimeMillis(); Pageable command = getPageableCommand(options, limit, now); - - int pageSize = options.getPageSize(); - - // An aggregation query will never be paged for the user, but we always page it internally to avoid OOM. - // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default - // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707). - if (selection.isAggregate() && pageSize <= 0) - pageSize = DEFAULT_COUNT_PAGE_SIZE; + int pageSize = getPageSize(options); if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) - { return execute(command, options, limit, now, state); - } QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState()); - - if (selection.isAggregate()) - return pageAggregateQuery(pager, options, pageSize, now); - - // We can't properly do post-query ordering if we page (see #6722) - checkFalse(needsPostQueryOrdering(), - "Cannot page queries with both ORDER BY and a IN restriction on the partition key;" - + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"); - - List page = pager.fetchPage(pageSize); - ResultMessage.Rows msg = processResults(page, options, limit, now); - - if (!pager.isExhausted()) - msg.result.metadata.setHasMorePages(pager.state()); - - return msg; + return execute(pager, options, limit, now, pageSize); } private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException @@ -212,7 +188,21 @@ public class SelectStatement implements CQLStatement return getPageableCommand(options, getLimit(options), System.currentTimeMillis()); } - private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) throws RequestValidationException, RequestExecutionException + private int getPageSize(QueryOptions options) + { + int pageSize = options.getPageSize(); + + // An aggregation query will never be paged for the user, but we always page it internally to avoid OOM. + // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default + // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707). + if (selection.isAggregate() && pageSize <= 0) + pageSize = DEFAULT_COUNT_PAGE_SIZE; + + return pageSize; + } + + private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) + throws RequestValidationException, RequestExecutionException { List rows; if (command == null) @@ -229,6 +219,26 @@ public class SelectStatement implements CQLStatement return processResults(rows, options, limit, now); } + private ResultMessage.Rows execute(QueryPager pager, QueryOptions options, int limit, long now, int pageSize) + throws RequestValidationException, RequestExecutionException + { + if (selection.isAggregate()) + return pageAggregateQuery(pager, options, pageSize, now); + + // We can't properly do post-query ordering if we page (see #6722) + checkFalse(needsPostQueryOrdering(), + "Cannot page queries with both ORDER BY and a IN restriction on the partition key;" + + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"); + + List page = pager.fetchPage(pageSize); + ResultMessage.Rows msg = processResults(page, options, limit, now); + + if (!pager.isExhausted()) + msg.result.metadata.setHasMorePages(pager.state()); + + return msg; + } + private ResultMessage.Rows pageAggregateQuery(QueryPager pager, QueryOptions options, int pageSize, long now) throws RequestValidationException, RequestExecutionException { @@ -267,13 +277,21 @@ public class SelectStatement implements CQLStatement int limit = getLimit(options); long now = System.currentTimeMillis(); Pageable command = getPageableCommand(options, limit, now); - List rows = command == null - ? Collections.emptyList() - : (command instanceof Pageable.ReadCommands - ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands) - : ((RangeSliceCommand)command).executeLocally()); + int pageSize = getPageSize(options); - return processResults(rows, options, limit, now); + if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) + { + List rows = command == null + ? Collections.emptyList() + : (command instanceof Pageable.ReadCommands + ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands) + : ((RangeSliceCommand)command).executeLocally()); + + return processResults(rows, options, limit, now); + } + + QueryPager pager = QueryPagers.localPager(command); + return execute(pager, options, limit, now, pageSize); } public ResultSet process(List rows) throws InvalidRequestException http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java index 16c531c..9234a79 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java @@ -22,6 +22,8 @@ import java.util.concurrent.TimeoutException; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.cql3.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.service.ClientState; @@ -71,6 +73,15 @@ public class TruncateStatement extends CFStatement implements CQLStatement public ResultMessage executeInternal(QueryState state, QueryOptions options) { - throw new UnsupportedOperationException(); + try + { + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily()); + cfs.truncateBlocking(); + } + catch (Exception e) + { + throw new TruncateException(e); + } + return null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 7801c3e..ac42eb0 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -228,7 +228,9 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Reading existing values for CAS precondition"); long timestamp = System.currentTimeMillis(); ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, request.readFilter()); - List rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM); + List rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL + ? ConsistencyLevel.LOCAL_QUORUM + : ConsistencyLevel.QUORUM); ColumnFamily current = rows.get(0).cf; if (!request.appliesTo(current)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/src/java/org/apache/cassandra/utils/UUIDGen.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java index 33f14a4..16190e2 100644 --- a/src/java/org/apache/cassandra/utils/UUIDGen.java +++ b/src/java/org/apache/cassandra/utils/UUIDGen.java @@ -82,10 +82,15 @@ public class UUIDGen return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode); } + public static UUID getTimeUUID(long when, long nanos) + { + return new UUID(createTime(fromUnixTimestamp(when, nanos)), clockSeqAndNode); + } + @VisibleForTesting - public static UUID getTimeUUID(long when, long clockSeqAndNode) + public static UUID getTimeUUID(long when, long nanos, long clockSeqAndNode) { - return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode); + return new UUID(createTime(fromUnixTimestamp(when, nanos)), clockSeqAndNode); } /** creates a type 1 uuid from raw bytes. */ @@ -169,7 +174,12 @@ public class UUIDGen * @return */ private static long fromUnixTimestamp(long timestamp) { - return (timestamp - START_EPOCH) * 10000; + return fromUnixTimestamp(timestamp, 0L); + } + + private static long fromUnixTimestamp(long timestamp, long nanos) + { + return ((timestamp - START_EPOCH) * 10000) + nanos; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/01115f72/test/long/org/apache/cassandra/cql3/ManyRowsTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/cql3/ManyRowsTest.java b/test/long/org/apache/cassandra/cql3/ManyRowsTest.java new file mode 100644 index 0000000..82eeabd --- /dev/null +++ b/test/long/org/apache/cassandra/cql3/ManyRowsTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.junit.Test; + +public class ManyRowsTest extends CQLTester +{ + /** + * Migrated from cql_tests.py:TestCQL.large_count_test() + */ + @Test + public void testLargeCount() throws Throwable + { + createTable("CREATE TABLE %s (k int, v int, PRIMARY KEY (k))"); + + // We know we page at 10K, so test counting just before, at 10K, just after and + // a bit after that. + for (int k = 1; k < 10000; k++) + execute("INSERT INTO %s (k) VALUES (?)", k); + + assertRows(execute("SELECT COUNT(*) FROM %s"), row(9999L)); + + execute("INSERT INTO %s (k) VALUES (?)", 10000); + + assertRows(execute("SELECT COUNT(*) FROM %s"), row(10000L)); + + execute("INSERT INTO %s (k) VALUES (?)", 10001); + + assertRows(execute("SELECT COUNT(*) FROM %s"), row(10001L)); + + for (int k = 10002; k < 15001; k++) + execute("INSERT INTO %s (k) VALUES (?)", k); + + assertRows(execute("SELECT COUNT(*) FROM %s"), row(15000L)); + } + + /** + * Test for CASSANDRA-8410, + * migrated from cql_tests.py:TestCQL.large_clustering_in_test() + */ + @Test + public void testLargeClustering() throws Throwable + { + createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c) )"); + + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 0, 0); + + // try to fetch one existing row and 9999 non-existing rows + List inValues = new ArrayList(10000); + for (int i = 0; i < 10000; i++) + inValues.add(i); + + assertRows(execute("SELECT * FROM %s WHERE k=? AND c IN ?", 0, inValues), + row(0, 0, 0)); + + // insert approximately 1000 random rows between 0 and 10k + Random rnd = new Random(); + Set clusteringValues = new HashSet<>(); + for (int i = 0; i < 1000; i++) + clusteringValues.add(rnd.nextInt(10000)); + + clusteringValues.add(0); + + for (int i : clusteringValues) // TODO - this was done in parallel by dtests + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, i, i); + + assertRowCount(execute("SELECT * FROM %s WHERE k=? AND c IN ?", 0, inValues), clusteringValues.size()); + } +}