Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2FED6200D06 for ; Mon, 25 Sep 2017 08:30:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2E81A1609BB; Mon, 25 Sep 2017 06:30:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D30441609EC for ; Mon, 25 Sep 2017 08:30:18 +0200 (CEST) Received: (qmail 14487 invoked by uid 500); 25 Sep 2017 06:30:17 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 13297 invoked by uid 99); 25 Sep 2017 06:30:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Sep 2017 06:30:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60E20F5818; Mon, 25 Sep 2017 06:30:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: paulo@apache.org To: commits@cassandra.apache.org Date: Mon, 25 Sep 2017 06:30:20 -0000 Message-Id: In-Reply-To: <1e65484e91084ad093a8489e458f2dbc@git.apache.org> References: <1e65484e91084ad093a8489e458f2dbc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/8] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11 archived-at: Mon, 25 Sep 2017 06:30:21 -0000 Merge branch 'cassandra-3.0' into cassandra-3.11 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e3d56ec Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e3d56ec Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e3d56ec Branch: refs/heads/trunk Commit: 3e3d56ecd41f04a31a60a4811702873d9aa57aad Parents: 0e5c84a 68bdf45 Author: Paulo Motta Authored: Mon Sep 25 01:01:31 2017 -0500 Committer: Paulo Motta Committed: Mon Sep 25 01:01:47 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 5 +- .../org/apache/cassandra/db/ReadCommand.java | 3 +- .../db/SinglePartitionReadCommand.java | 9 +- .../apache/cassandra/db/filter/DataLimits.java | 111 ++++++++++++------- .../db/partitions/CachedBTreePartition.java | 3 +- .../apache/cassandra/db/rows/AbstractRow.java | 6 +- src/java/org/apache/cassandra/db/rows/Row.java | 7 +- .../cassandra/db/view/ViewUpdateGenerator.java | 6 +- .../composites/ClusteringColumnIndex.java | 5 +- .../internal/composites/PartitionKeyIndex.java | 4 +- .../apache/cassandra/service/DataResolver.java | 6 +- .../apache/cassandra/service/StorageProxy.java | 12 +- .../service/pager/AbstractQueryPager.java | 4 +- .../cassandra/service/pager/QueryPagers.java | 2 +- .../org/apache/cassandra/cql3/CachingBench.java | 3 +- .../cassandra/cql3/GcCompactionBench.java | 3 +- .../apache/cassandra/cql3/GcCompactionTest.java | 3 +- .../apache/cassandra/cql3/ViewComplexTest.java | 65 ++++++++++- .../org/apache/cassandra/cql3/ViewTest.java | 2 + .../apache/cassandra/db/RangeTombstoneTest.java | 35 ++++-- .../db/compaction/CompactionsPurgeTest.java | 3 +- 22 files changed, 226 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index eb2ccc0,9cba02b..8fc72fc --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,15 -1,6 +1,16 @@@ -3.0.15 +3.11.1 + * Handle limit correctly on tables with strict liveness (CASSANDRA-13883) - * Fix missing original update in TriggerExecutor (CASSANDRA-13894) + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is a single leaf and overflow collisions (CASSANDRA-13869) + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418) + * BTree.Builder memory leak (CASSANDRA-13754) + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798) + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938) + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744) + * "ignore" option is ignored in sstableloader (CASSANDRA-13721) + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652) + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512) + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641) +Merged from 3.0: * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043) * Improve short read protection performance (CASSANDRA-13794) * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index bb4d5e8,160b104..4f3c66f --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -462,7 -449,8 +462,8 @@@ public abstract class ReadCommand exten private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); - private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName); + private final boolean respectTombstoneThresholds = !SchemaConstants.isSystemKeyspace(ReadCommand.this.metadata().ksName); + private final boolean enforceStrictLiveness = metadata.enforceStrictLiveness(); private int liveRows = 0; private int tombstones = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index c7080e7,7a66eca..7564571 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -574,9 -556,10 +574,10 @@@ public class SinglePartitionReadComman try { final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); + final boolean enforceStrictLiveness = metadata().enforceStrictLiveness(); @SuppressWarnings("resource") // we close on exception or upon closing the result of this method - UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); + UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, executionController); try { // Use a custom iterator instead of DataLimits to avoid stopping the original iterator @@@ -1195,45 -1153,26 +1196,49 @@@ { // Note that the only difference between the command in a group must be the partition key on which // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one. - return commands.get(0).startOrderGroup(); + return commands.get(0).executionController(); } - public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + public PartitionIterator executeInternal(ReadExecutionController controller) { - List partitions = new ArrayList<>(commands.size()); - for (SinglePartitionReadCommand cmd : commands) - partitions.add(cmd.executeInternal(orderGroup)); - + // Note that the only difference between the command in a group must be the partition key on which + // they applied. + boolean enforceStrictLiveness = commands.get(0).metadata().enforceStrictLiveness(); - // Because we only have enforce the limit per command, we need to enforce it globally. - return limits.filter(PartitionIterators.concat(partitions), + return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec), nowInSec, - selectsFullPartitions); + selectsFullPartitions, + enforceStrictLiveness); } - public QueryPager getPager(PagingState pagingState, int protocolVersion) + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + return executeLocally(executionController, true); + } + + /** + * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}. + * + * @param executionController - the {@code ReadExecutionController} protecting the read. + * @param sort - whether to sort the inner commands by partition key, required for merging the iterator + * later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)} + * because in this case it is safe to do so as there is no merging involved and we don't want to + * change the old behavior which was to not sort by partition. + * + * @return - the iterator that can be used to retrieve the query result. + */ + private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort) + { + List> partitions = new ArrayList<>(commands.size()); + for (SinglePartitionReadCommand cmd : commands) + partitions.add(Pair.of(cmd.partitionKey, cmd.executeLocally(executionController))); + + if (sort) + Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft())); + + return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList())); + } + + public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) { if (commands.size() == 1) return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java index 0c8cd37,6b74293..6d7df16 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@@ -20,10 -20,8 +20,11 @@@ package org.apache.cassandra.db.filter import java.io.IOException; import java.nio.ByteBuffer; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.aggregation.GroupMaker; +import org.apache.cassandra.db.aggregation.GroupingState; +import org.apache.cassandra.db.aggregation.AggregationSpecification; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.transform.BasePartitions; @@@ -139,18 -110,10 +140,21 @@@ public abstract class DataLimit public abstract DataLimits forShortReadRetry(int toFetch); + /** + * Creates a DataLimits instance to be used for paginating internally GROUP BY queries. + * + * @param state the GroupMaker state + * @return a DataLimits instance to be used for paginating internally GROUP BY queries + */ + public DataLimits forGroupByInternalPaging(GroupingState state) + { + throw new UnsupportedOperationException(); + } + - public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData); + public abstract boolean hasEnoughLiveData(CachedPartition cached, + int nowInSec, + boolean countPartitionsWithOnlyStaticData, + boolean enforceStrictLiveness); /** * Returns a new {@code Counter} for this limits. @@@ -210,18 -180,9 +227,20 @@@ public static abstract class Counter extends StoppingTransformation> { + protected final int nowInSec; + protected final boolean assumeLiveData; ++ private final boolean enforceStrictLiveness; + // false means we do not propagate our stop signals onto the iterator, we only count private boolean enforceLimits = true; - protected Counter(int nowInSec, boolean assumeLiveData) ++ protected Counter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness) + { + this.nowInSec = nowInSec; + this.assumeLiveData = assumeLiveData; ++ this.enforceStrictLiveness = enforceStrictLiveness; + } + public Counter onlyCount() { this.enforceLimits = false; @@@ -276,11 -222,6 +295,11 @@@ public abstract boolean isDone(); public abstract boolean isDoneForPartition(); + protected boolean isLive(Row row) + { - return assumeLiveData || row.hasLiveData(nowInSec); ++ return assumeLiveData || row.hasLiveData(nowInSec, enforceStrictLiveness); + } + @Override protected BaseRowIterator applyToPartition(BaseRowIterator partition) { @@@ -439,16 -372,25 +461,19 @@@ protected class CQLCounter extends Counter { - protected final int nowInSec; - protected final boolean assumeLiveData; - protected final boolean countPartitionsWithOnlyStaticData; - protected int rowCounted; protected int rowInCurrentPartition; + protected final boolean countPartitionsWithOnlyStaticData; protected boolean hasLiveStaticRow; - private final boolean enforceStrictLiveness; - public CQLCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + public CQLCounter(int nowInSec, + boolean assumeLiveData, + boolean countPartitionsWithOnlyStaticData, + boolean enforceStrictLiveness) { - super(nowInSec, assumeLiveData); - this.nowInSec = nowInSec; - this.assumeLiveData = assumeLiveData; ++ super(nowInSec, assumeLiveData, enforceStrictLiveness); this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; - this.enforceStrictLiveness = enforceStrictLiveness; } @Override @@@ -566,15 -498,9 +591,15 @@@ } @Override + public DataLimits withoutState() + { + return new CQLLimits(rowLimit, perPartitionLimit, isDistinct); + } + + @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { - return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } private class PagingAwareCounter extends CQLCounter @@@ -605,502 -534,6 +633,508 @@@ } /** + * CQLLimits used for GROUP BY queries or queries with aggregates. + *

Internally, GROUP BY queries are always paginated by number of rows to avoid OOMExceptions. By consequence, + * the limits keep track of the number of rows as well as the number of groups.

+ *

A group can only be counted if the next group or the end of the data is reached.

+ */ + private static class CQLGroupByLimits extends CQLLimits + { + /** + * The GroupMaker state + */ + protected final GroupingState state; + + /** + * The GROUP BY specification + */ + protected final AggregationSpecification groupBySpec; + + /** + * The limit on the number of groups + */ + protected final int groupLimit; + + /** + * The limit on the number of groups per partition + */ + protected final int groupPerPartitionLimit; + + public CQLGroupByLimits(int groupLimit, + int groupPerPartitionLimit, + int rowLimit, + AggregationSpecification groupBySpec) + { + this(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, GroupingState.EMPTY_STATE); + } + + private CQLGroupByLimits(int groupLimit, + int groupPerPartitionLimit, + int rowLimit, + AggregationSpecification groupBySpec, + GroupingState state) + { + super(rowLimit, NO_LIMIT, false); + this.groupLimit = groupLimit; + this.groupPerPartitionLimit = groupPerPartitionLimit; + this.groupBySpec = groupBySpec; + this.state = state; + } + + @Override + public Kind kind() + { + return Kind.CQL_GROUP_BY_LIMIT; + } + + @Override + public boolean isGroupByLimit() + { + return true; + } + + public boolean isUnlimited() + { + return groupLimit == NO_LIMIT && groupPerPartitionLimit == NO_LIMIT && rowLimit == NO_LIMIT; + } + + public DataLimits forShortReadRetry(int toFetch) + { + return new CQLLimits(toFetch); + } + + @Override + public float estimateTotalResults(ColumnFamilyStore cfs) + { + // For the moment, we return the estimated number of rows as we have no good way of estimating + // the number of groups that will be returned. Hopefully, we should be able to fix + // that problem at some point. + return super.estimateTotalResults(cfs); + } + + @Override + public DataLimits forPaging(int pageSize) + { + return new CQLGroupByLimits(pageSize, + groupPerPartitionLimit, + rowLimit, + groupBySpec, + state); + } + + @Override + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + return new CQLGroupByPagingLimits(pageSize, + groupPerPartitionLimit, + rowLimit, + groupBySpec, + state, + lastReturnedKey, + lastReturnedKeyRemaining); + } + + @Override + public DataLimits forGroupByInternalPaging(GroupingState state) + { + return new CQLGroupByLimits(rowLimit, + groupPerPartitionLimit, + rowLimit, + groupBySpec, + state); + } + + @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) ++ public Counter newCounter(int nowInSec, ++ boolean assumeLiveData, ++ boolean countPartitionsWithOnlyStaticData, ++ boolean enforceStrictLiveness) + { - return new GroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); ++ return new GroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); + } + + @Override + public int count() + { + return groupLimit; + } + + @Override + public int perPartitionCount() + { + return groupPerPartitionLimit; + } + + @Override + public DataLimits withoutState() + { + return state == GroupingState.EMPTY_STATE + ? this + : new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + + if (groupLimit != NO_LIMIT) + { + sb.append("GROUP LIMIT ").append(groupLimit); + if (groupPerPartitionLimit != NO_LIMIT || rowLimit != NO_LIMIT) + sb.append(' '); + } + + if (groupPerPartitionLimit != NO_LIMIT) + { + sb.append("GROUP PER PARTITION LIMIT ").append(groupPerPartitionLimit); + if (rowLimit != NO_LIMIT) + sb.append(' '); + } + + if (rowLimit != NO_LIMIT) + { + sb.append("LIMIT ").append(rowLimit); + } + + return sb.toString(); + } + + @Override + public boolean isExhausted(Counter counter) + { + return ((GroupByAwareCounter) counter).rowCounted < rowLimit + && counter.counted() < groupLimit; + } + + protected class GroupByAwareCounter extends Counter + { + private final GroupMaker groupMaker; + + protected final boolean countPartitionsWithOnlyStaticData; + + /** + * The key of the partition being processed. + */ + protected DecoratedKey currentPartitionKey; + + /** + * The number of rows counted so far. + */ + protected int rowCounted; + + /** + * The number of rows counted so far in the current partition. + */ + protected int rowCountedInCurrentPartition; + + /** + * The number of groups counted so far. A group is counted only once it is complete + * (e.g the next one has been reached). + */ + protected int groupCounted; + + /** + * The number of groups in the current partition. + */ + protected int groupInCurrentPartition; + + protected boolean hasGroupStarted; + + protected boolean hasLiveStaticRow; + + protected boolean hasReturnedRowsFromCurrentPartition; + - private GroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) ++ private GroupByAwareCounter(int nowInSec, ++ boolean assumeLiveData, ++ boolean countPartitionsWithOnlyStaticData, ++ boolean enforceStrictLiveness) + { - super(nowInSec, assumeLiveData); ++ super(nowInSec, assumeLiveData, enforceStrictLiveness); + this.groupMaker = groupBySpec.newGroupMaker(state); + this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; + + // If the end of the partition was reached at the same time than the row limit, the last group might + // not have been counted yet. Due to that we need to guess, based on the state, if the previous group + // is still open. + hasGroupStarted = state.hasClustering(); + } + + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) + { + if (partitionKey.getKey().equals(state.partitionKey())) + { + // The only case were we could have state.partitionKey() equals to the partition key + // is if some of the partition rows have been returned in the previous page but the + // partition was not exhausted (as the state partition key has not been updated yet). + // Since we know we have returned rows, we know we have accounted for + // the static row if any already, so force hasLiveStaticRow to false so we make sure to not count it + // once more. + hasLiveStaticRow = false; + hasReturnedRowsFromCurrentPartition = true; + hasGroupStarted = true; + } + else + { + // We need to increment our count of groups if we have reached a new one and unless we had no new + // content added since we closed our last group (that is, if hasGroupStarted). Note that we may get + // here with hasGroupStarted == false in the following cases: + // * the partition limit was reached for the previous partition + // * the previous partition was containing only one static row + // * the rows of the last group of the previous partition were all marked as deleted + if (hasGroupStarted && groupMaker.isNewGroup(partitionKey, Clustering.STATIC_CLUSTERING)) + { + incrementGroupCount(); + // If we detect, before starting the new partition, that we are done, we need to increase + // the per partition group count of the previous partition as the next page will start from + // there. + if (isDone()) + incrementGroupInCurrentPartitionCount(); + hasGroupStarted = false; + } + hasReturnedRowsFromCurrentPartition = false; + hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow); + } + currentPartitionKey = partitionKey; + // If we are done we need to preserve the groupInCurrentPartition and rowCountedInCurrentPartition + // because the pager need to retrieve the count associated to the last value it has returned. + if (!isDone()) + { + groupInCurrentPartition = 0; + rowCountedInCurrentPartition = 0; + } + } + + @Override + protected Row applyToStatic(Row row) + { + // It's possible that we're "done" if the partition we just started bumped the number of groups (in + // applyToPartition() above), in which case Transformation will still call this method. In that case, we + // want to ignore the static row, it should (and will) be returned with the next page/group if needs be. + if (isDone()) + { + hasLiveStaticRow = false; // The row has not been returned + return Rows.EMPTY_STATIC_ROW; + } + return row; + } + + @Override + public Row applyToRow(Row row) + { + // We want to check if the row belongs to a new group even if it has been deleted. The goal being + // to minimize the chances of having to go through the same data twice if we detect on the next + // non deleted row that we have reached the limit. + if (groupMaker.isNewGroup(currentPartitionKey, row.clustering())) + { + if (hasGroupStarted) + { + incrementGroupCount(); + incrementGroupInCurrentPartitionCount(); + } + hasGroupStarted = false; + } + + // That row may have made us increment the group count, which may mean we're done for this partition, in + // which case we shouldn't count this row (it won't be returned). + if (isDoneForPartition()) + { + hasGroupStarted = false; + return null; + } + + if (isLive(row)) + { + hasGroupStarted = true; + incrementRowCount(); + hasReturnedRowsFromCurrentPartition = true; + } + + return row; + } + + @Override + public int counted() + { + return groupCounted; + } + + @Override + public int countedInCurrentPartition() + { + return groupInCurrentPartition; + } + + @Override + public int rowCounted() + { + return rowCounted; + } + + @Override + public int rowCountedInCurrentPartition() + { + return rowCountedInCurrentPartition; + } + + protected void incrementRowCount() + { + rowCountedInCurrentPartition++; + if (++rowCounted >= rowLimit) + stop(); + } + + private void incrementGroupCount() + { + groupCounted++; + if (groupCounted >= groupLimit) + stop(); + } + + private void incrementGroupInCurrentPartitionCount() + { + groupInCurrentPartition++; + if (groupInCurrentPartition >= groupPerPartitionLimit) + stopInPartition(); + } + + @Override + public boolean isDoneForPartition() + { + return isDone() || groupInCurrentPartition >= groupPerPartitionLimit; + } + + @Override + public boolean isDone() + { + return groupCounted >= groupLimit; + } + + @Override + public void onPartitionClose() + { + // Normally, we don't count static rows as from a CQL point of view, it will be merge with other + // rows in the partition. However, if we only have the static row, it will be returned as one group + // so count it. + if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition) + { + incrementRowCount(); + incrementGroupCount(); + incrementGroupInCurrentPartitionCount(); + hasGroupStarted = false; + } + super.onPartitionClose(); + } + + @Override + public void onClose() + { + // Groups are only counted when the end of the group is reached. + // The end of a group is detected by 2 ways: + // 1) a new group is reached + // 2) the end of the data is reached + // We know that the end of the data is reached if the group limit has not been reached + // and the number of rows counted is smaller than the internal page size. + if (hasGroupStarted && groupCounted < groupLimit && rowCounted < rowLimit) + { + incrementGroupCount(); + incrementGroupInCurrentPartitionCount(); + } + + super.onClose(); + } + } + } + + private static class CQLGroupByPagingLimits extends CQLGroupByLimits + { + private final ByteBuffer lastReturnedKey; + + private final int lastReturnedKeyRemaining; + + public CQLGroupByPagingLimits(int groupLimit, + int groupPerPartitionLimit, + int rowLimit, + AggregationSpecification groupBySpec, + GroupingState state, + ByteBuffer lastReturnedKey, + int lastReturnedKeyRemaining) + { + super(groupLimit, + groupPerPartitionLimit, + rowLimit, + groupBySpec, + state); + + this.lastReturnedKey = lastReturnedKey; + this.lastReturnedKeyRemaining = lastReturnedKeyRemaining; + } + + @Override + public Kind kind() + { + return Kind.CQL_GROUP_BY_PAGING_LIMIT; + } + + @Override + public DataLimits forPaging(int pageSize) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataLimits forGroupByInternalPaging(GroupingState state) + { + throw new UnsupportedOperationException(); + } + + @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) ++ public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) + { + assert state == GroupingState.EMPTY_STATE || lastReturnedKey.equals(state.partitionKey()); - return new PagingGroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); ++ return new PagingGroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); + } + + @Override + public DataLimits withoutState() + { + return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec); + } + + private class PagingGroupByAwareCounter extends GroupByAwareCounter + { - private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) ++ private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) + { - super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); ++ super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); + } + + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) + { + if (partitionKey.getKey().equals(lastReturnedKey)) + { + currentPartitionKey = partitionKey; + groupInCurrentPartition = groupPerPartitionLimit - lastReturnedKeyRemaining; + hasReturnedRowsFromCurrentPartition = true; + hasLiveStaticRow = false; + hasGroupStarted = state.hasClustering(); + } + else + { + super.applyToPartition(partitionKey, staticRow); + } + } + } + } + + /** * Limits used by thrift; this count partition and cells. */ private static class ThriftLimits extends DataLimits @@@ -1176,9 -609,9 +1210,9 @@@ } } - public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { -- return new ThriftCounter(nowInSec, assumeLiveData); ++ return new ThriftCounter(nowInSec, assumeLiveData, enforceStrictLiveness); } public int count() @@@ -1209,9 -640,10 +1243,9 @@@ protected int cellsCounted; protected int cellsInCurrentPartition; -- public ThriftCounter(int nowInSec, boolean assumeLiveData) ++ public ThriftCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness) { - super(nowInSec, assumeLiveData); - this.nowInSec = nowInSec; - this.assumeLiveData = assumeLiveData; ++ super(nowInSec, assumeLiveData, enforceStrictLiveness); } @Override @@@ -1324,9 -746,12 +1358,12 @@@ protected class SuperColumnCountingCounter extends ThriftCounter { - public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData) + private final boolean enforceStrictLiveness; + + public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness) { -- super(nowInSec, assumeLiveData); ++ super(nowInSec, assumeLiveData, enforceStrictLiveness); + this.enforceStrictLiveness = enforceStrictLiveness; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/rows/AbstractRow.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DataResolver.java index d5c0566,61bffe5..84e8685 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@@ -43,12 -43,13 +43,14 @@@ public class DataResolver extends Respo { @VisibleForTesting final List repairResults = Collections.synchronizedList(new ArrayList<>()); - + private final long queryStartNanoTime; + private final boolean enforceStrictLiveness; - DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime) { super(keyspace, command, consistency, maxResponseCount); + this.queryStartNanoTime = queryStartNanoTime; + this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); } public PartitionIterator getData() @@@ -127,10 -128,10 +129,10 @@@ for (int i = 0; i < results.size(); i++) { DataLimits.Counter singleResultCounter = - command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount(); + command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount(); ShortReadResponseProtection protection = - new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter); + new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter, queryStartNanoTime); /* * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 12d7e19,5af2ad0..c57b691 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -1678,7 -1597,10 +1678,10 @@@ public class StorageProxy implements St long start = System.nanoTime(); try { - PartitionIterator result = fetchRows(group.commands, consistencyLevel); + PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime); + // Note that the only difference between the command in a group must be the partition key on which + // they applied. + boolean enforceStrictLiveness = group.commands.get(0).metadata().enforceStrictLiveness(); // If we have more than one command, then despite each read command honoring the limit, the total result // might not honor it and so we should enforce it if (group.commands.size() > 1) @@@ -2103,9 -1998,9 +2106,10 @@@ private final PartitionRangeReadCommand command; private final Keyspace keyspace; private final ConsistencyLevel consistency; + private final boolean enforceStrictLiveness; private final long startTime; + private final long queryStartNanoTime; private DataLimits.Counter counter; private PartitionIterator sentQueryIterator; @@@ -2124,7 -2019,7 +2128,8 @@@ this.totalRangeCount = ranges.rangeCount(); this.consistency = consistency; this.keyspace = keyspace; + this.queryStartNanoTime = queryStartNanoTime; + this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); } public RowIterator computeNext() @@@ -2280,9 -2166,10 +2285,10 @@@ // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. - return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), + return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)), command.nowInSec(), - command.selectsFullPartition()); + command.selectsFullPartition(), + command.metadata().enforceStrictLiveness()); } public Map> getSchemaVersions() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index ef38242,f44aa24..f5134fa --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@@ -30,7 -31,8 +30,8 @@@ abstract class AbstractQueryPager imple { protected final ReadCommand command; protected final DataLimits limits; - protected final int protocolVersion; + protected final ProtocolVersion protocolVersion; + private final boolean enforceStrictLiveness; private int remaining; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/QueryPagers.java index 1b56417,6bc1f80..1a70864 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@@ -54,9 -53,9 +54,9 @@@ public class QueryPager int count = 0; while (!pager.isExhausted()) { - try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state)) + try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state, queryStartNanoTime)) { - DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition()); + DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition(), metadata.enforceStrictLiveness()); PartitionIterators.consume(counter.applyTo(iter)); count += counter.counted(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/test/long/org/apache/cassandra/cql3/CachingBench.java ---------------------------------------------------------------------- diff --cc test/long/org/apache/cassandra/cql3/CachingBench.java index 370b3ff,0000000..25f746b mode 100644,000000..100644 --- a/test/long/org/apache/cassandra/cql3/CachingBench.java +++ b/test/long/org/apache/cassandra/cql3/CachingBench.java @@@ -1,375 -1,0 +1,376 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +import com.google.common.collect.Iterables; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.cache.ChunkCache; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; + +public class CachingBench extends CQLTester +{ + private static final String STRATEGY = "LeveledCompactionStrategy"; + + private static final int DEL_SECTIONS = 1000; + private static final int FLUSH_FREQ = 10000; + private static final int SCAN_FREQUENCY_INV = 12000; + static final int COUNT = 29000; + static final int ITERS = 9; + + static final int KEY_RANGE = 30; + static final int CLUSTERING_RANGE = 210000; + + static final int EXTRA_SIZE = 1025; + static final boolean CONCURRENT_COMPACTIONS = true; + + // The name of this method is important! + // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we + // are effectively overriding it. + @BeforeClass + public static void setUpClass() + { + DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(100); + CQLTester.setUpClass(); + } + + String hashQuery; + + @Before + public void before() throws Throwable + { + createTable("CREATE TABLE %s(" + + " key int," + + " column int," + + " data int," + + " extra text," + + " PRIMARY KEY(key, column)" + + ")" + ); + + String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int", + " CREATE FUNCTION %s (state int, val int)" + + " CALLED ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE java" + + " AS 'return val != null ? state * 17 + val : state;'")).name; + String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text", + " CREATE FUNCTION %s (state int, val text)" + + " CALLED ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE java" + + " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name; + + String hashInt = createAggregate(KEYSPACE, "int", + " CREATE AGGREGATE %s (int)" + + " SFUNC " + hashIFunc + + " STYPE int" + + " INITCOND 1"); + String hashText = createAggregate(KEYSPACE, "text", + " CREATE AGGREGATE %s (text)" + + " SFUNC " + hashTFunc + + " STYPE int" + + " INITCOND 1"); + + hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s", + hashInt, hashInt, hashInt, hashText); + } + AtomicLong id = new AtomicLong(); + long compactionTimeNanos = 0; + + void pushData(Random rand, int count) throws Throwable + { + for (int i = 0; i < count; ++i) + { + long ii = id.incrementAndGet(); + if (ii % 1000 == 0) + System.out.print('.'); + int key = rand.nextInt(KEY_RANGE); + int column = rand.nextInt(CLUSTERING_RANGE); + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand)); + maybeCompact(ii); + } + } + + private String genExtra(Random rand) + { + StringBuilder builder = new StringBuilder(EXTRA_SIZE); + for (int i = 0; i < EXTRA_SIZE; ++i) + builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1))); + return builder.toString(); + } + + void readAndDelete(Random rand, int count) throws Throwable + { + for (int i = 0; i < count; ++i) + { + int key; + UntypedResultSet res; + long ii = id.incrementAndGet(); + if (ii % 1000 == 0) + System.out.print('-'); + if (rand.nextInt(SCAN_FREQUENCY_INV) != 1) + { + do + { + key = rand.nextInt(KEY_RANGE); + long cid = rand.nextInt(DEL_SECTIONS); + int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS); + int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS); + res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend); + } while (res.size() == 0); + UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size())); + int clustering = r.getInt("column"); + execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering); + } + else + { + execute(hashQuery); + } + maybeCompact(ii); + } + } + + private void maybeCompact(long ii) + { + if (ii % FLUSH_FREQ == 0) + { + System.out.print("F"); + flush(); + if (ii % (FLUSH_FREQ * 10) == 0) + { + System.out.println("C"); + long startTime = System.nanoTime(); + getCurrentColumnFamilyStore().enableAutoCompaction(!CONCURRENT_COMPACTIONS); + long endTime = System.nanoTime(); + compactionTimeNanos += endTime - startTime; + getCurrentColumnFamilyStore().disableAutoCompaction(); + } + } + } + + public void testSetup(String compactionClass, String compressorClass, DiskAccessMode mode, boolean cacheEnabled) throws Throwable + { + id.set(0); + compactionTimeNanos = 0; + ChunkCache.instance.enable(cacheEnabled); + DatabaseDescriptor.setDiskAccessMode(mode); + alterTable("ALTER TABLE %s WITH compaction = { 'class' : '" + compactionClass + "' };"); + alterTable("ALTER TABLE %s WITH compression = { 'sstable_compression' : '" + compressorClass + "' };"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.disableAutoCompaction(); + + long onStartTime = System.currentTimeMillis(); + ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List> tasks = new ArrayList<>(); + for (int ti = 0; ti < 1; ++ti) + { + Random rand = new Random(ti); + tasks.add(es.submit(() -> + { + for (int i = 0; i < ITERS; ++i) + try + { + pushData(rand, COUNT); + readAndDelete(rand, COUNT / 3); + } + catch (Throwable e) + { + throw new AssertionError(e); + } + })); + } + for (Future task : tasks) + task.get(); + + flush(); + long onEndTime = System.currentTimeMillis(); + int startRowCount = countRows(cfs); + int startTombCount = countTombstoneMarkers(cfs); + int startRowDeletions = countRowDeletions(cfs); + int startTableCount = cfs.getLiveSSTables().size(); + long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables()); + System.out.println("\nCompession: " + cfs.getCompressionParameters().toString()); + System.out.println("Reader " + cfs.getLiveSSTables().iterator().next().getFileDataInput(0).toString()); + if (cacheEnabled) + System.out.format("Cache size %s requests %,d hit ratio %f\n", + FileUtils.stringifyFileSize(ChunkCache.instance.metrics.size.getValue()), + ChunkCache.instance.metrics.requests.getCount(), + ChunkCache.instance.metrics.hitRate.getValue()); + else + { + Assert.assertTrue("Chunk cache had requests: " + ChunkCache.instance.metrics.requests.getCount(), ChunkCache.instance.metrics.requests.getCount() < COUNT); + System.out.println("Cache disabled"); + } + System.out.println(String.format("Operations completed in %.3fs", (onEndTime - onStartTime) * 1e-3)); + if (!CONCURRENT_COMPACTIONS) + System.out.println(String.format(", out of which %.3f for non-concurrent compaction", compactionTimeNanos * 1e-9)); + else + System.out.println(); + + String hashesBefore = getHashes(); + long startTime = System.currentTimeMillis(); + CompactionManager.instance.performMaximal(cfs, true); + long endTime = System.currentTimeMillis(); + + int endRowCount = countRows(cfs); + int endTombCount = countTombstoneMarkers(cfs); + int endRowDeletions = countRowDeletions(cfs); + int endTableCount = cfs.getLiveSSTables().size(); + long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables()); + + System.out.println(String.format("Major compaction completed in %.3fs", + (endTime - startTime) * 1e-3)); + System.out.println(String.format("At start: %,12d tables %12s %,12d rows %,12d deleted rows %,12d tombstone markers", + startTableCount, FileUtils.stringifyFileSize(startSize), startRowCount, startRowDeletions, startTombCount)); + System.out.println(String.format("At end: %,12d tables %12s %,12d rows %,12d deleted rows %,12d tombstone markers", + endTableCount, FileUtils.stringifyFileSize(endSize), endRowCount, endRowDeletions, endTombCount)); + String hashesAfter = getHashes(); + + Assert.assertEquals(hashesBefore, hashesAfter); + } + + private String getHashes() throws Throwable + { + long startTime = System.currentTimeMillis(); + String hashes = Arrays.toString(getRows(execute(hashQuery))[0]); + long endTime = System.currentTimeMillis(); + System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3)); + return hashes; + } + + @Test + public void testWarmup() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, false); + } + + @Test + public void testLZ4CachedMmap() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, true); + } + + @Test + public void testLZ4CachedStandard() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.standard, true); + } + + @Test + public void testLZ4UncachedMmap() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, false); + } + + @Test + public void testLZ4UncachedStandard() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.standard, false); + } + + @Test + public void testCachedStandard() throws Throwable + { + testSetup(STRATEGY, "", DiskAccessMode.standard, true); + } + + @Test + public void testUncachedStandard() throws Throwable + { + testSetup(STRATEGY, "", DiskAccessMode.standard, false); + } + + @Test + public void testMmapped() throws Throwable + { + testSetup(STRATEGY, "", DiskAccessMode.mmap, false /* doesn't matter */); + } + + int countTombstoneMarkers(ColumnFamilyStore cfs) + { + return count(cfs, x -> x.isRangeTombstoneMarker()); + } + + int countRowDeletions(ColumnFamilyStore cfs) + { + return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive()); + } + + int countRows(ColumnFamilyStore cfs) + { ++ boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness(); + int nowInSec = FBUtilities.nowInSeconds(); - return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec)); ++ return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec, enforceStrictLiveness)); + } + + private int count(ColumnFamilyStore cfs, Predicate predicate) + { + int count = 0; + for (SSTableReader reader : cfs.getLiveSSTables()) + count += count(reader, predicate); + return count; + } + + int count(SSTableReader reader, Predicate predicate) + { + int instances = 0; + try (ISSTableScanner partitions = reader.getScanner()) + { + while (partitions.hasNext()) + { + try (UnfilteredRowIterator iter = partitions.next()) + { + while (iter.hasNext()) + { + Unfiltered atom = iter.next(); + if (predicate.test(atom)) + ++instances; + } + } + } + } + return instances; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/test/long/org/apache/cassandra/cql3/GcCompactionBench.java ---------------------------------------------------------------------- diff --cc test/long/org/apache/cassandra/cql3/GcCompactionBench.java index ca39b55,0000000..84c0384 mode 100644,000000..100644 --- a/test/long/org/apache/cassandra/cql3/GcCompactionBench.java +++ b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java @@@ -1,374 -1,0 +1,375 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +import com.google.common.collect.Iterables; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CompactionParams.TombstoneOption; +import org.apache.cassandra.utils.FBUtilities; + +public class GcCompactionBench extends CQLTester +{ + private static final String SIZE_TIERED_STRATEGY = "SizeTieredCompactionStrategy', 'min_sstable_size' : '0"; + private static final String LEVELED_STRATEGY = "LeveledCompactionStrategy', 'sstable_size_in_mb' : '16"; + + private static final int DEL_SECTIONS = 1000; + private static final int FLUSH_FREQ = 10000; + private static final int RANGE_FREQUENCY_INV = 16; + static final int COUNT = 90000; + static final int ITERS = 9; + + static final int KEY_RANGE = 10; + static final int CLUSTERING_RANGE = 210000; + + static final int EXTRA_SIZE = 1025; + + // The name of this method is important! + // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we + // are effectively overriding it. + @BeforeClass + public static void setUpClass() // overrides CQLTester.setUpClass() + { + DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(100); + CQLTester.setUpClass(); + } + + String hashQuery; + + @Before + public void before() throws Throwable + { + createTable("CREATE TABLE %s(" + + " key int," + + " column int," + + " data int," + + " extra text," + + " PRIMARY KEY(key, column)" + + ")" + ); + + String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int", + " CREATE FUNCTION %s (state int, val int)" + + " CALLED ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE java" + + " AS 'return val != null ? state * 17 + val : state;'")).name; + String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text", + " CREATE FUNCTION %s (state int, val text)" + + " CALLED ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE java" + + " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name; + + String hashInt = createAggregate(KEYSPACE, "int", + " CREATE AGGREGATE %s (int)" + + " SFUNC " + hashIFunc + + " STYPE int" + + " INITCOND 1"); + String hashText = createAggregate(KEYSPACE, "text", + " CREATE AGGREGATE %s (text)" + + " SFUNC " + hashTFunc + + " STYPE int" + + " INITCOND 1"); + + hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s", + hashInt, hashInt, hashInt, hashText); + } + AtomicLong id = new AtomicLong(); + long compactionTimeNanos = 0; + + void pushData(Random rand, int count) throws Throwable + { + for (int i = 0; i < count; ++i) + { + long ii = id.incrementAndGet(); + if (ii % 1000 == 0) + System.out.print('.'); + int key = rand.nextInt(KEY_RANGE); + int column = rand.nextInt(CLUSTERING_RANGE); + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand)); + maybeCompact(ii); + } + } + + private String genExtra(Random rand) + { + StringBuilder builder = new StringBuilder(EXTRA_SIZE); + for (int i = 0; i < EXTRA_SIZE; ++i) + builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1))); + return builder.toString(); + } + + void deleteData(Random rand, int count) throws Throwable + { + for (int i = 0; i < count; ++i) + { + int key; + UntypedResultSet res; + long ii = id.incrementAndGet(); + if (ii % 1000 == 0) + System.out.print('-'); + if (rand.nextInt(RANGE_FREQUENCY_INV) != 1) + { + do + { + key = rand.nextInt(KEY_RANGE); + long cid = rand.nextInt(DEL_SECTIONS); + int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS); + int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS); + res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend); + } while (res.size() == 0); + UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size())); + int clustering = r.getInt("column"); + execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering); + } + else + { + key = rand.nextInt(KEY_RANGE); + long cid = rand.nextInt(DEL_SECTIONS); + int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS); + int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS); + res = execute("DELETE FROM %s WHERE key = ? AND column >= ? AND column < ?", key, cstart, cend); + } + maybeCompact(ii); + } + } + + private void maybeCompact(long ii) + { + if (ii % FLUSH_FREQ == 0) + { + System.out.print("F"); + flush(); + if (ii % (FLUSH_FREQ * 10) == 0) + { + System.out.println("C"); + long startTime = System.nanoTime(); + getCurrentColumnFamilyStore().enableAutoCompaction(true); + long endTime = System.nanoTime(); + compactionTimeNanos += endTime - startTime; + getCurrentColumnFamilyStore().disableAutoCompaction(); + } + } + } + + public void testGcCompaction(TombstoneOption tombstoneOption, TombstoneOption backgroundTombstoneOption, String compactionClass) throws Throwable + { + id.set(0); + compactionTimeNanos = 0; + alterTable("ALTER TABLE %s WITH compaction = { 'class' : '" + compactionClass + "', 'provide_overlapping_tombstones' : '" + backgroundTombstoneOption + "' };"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.disableAutoCompaction(); + + long onStartTime = System.currentTimeMillis(); + ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List> tasks = new ArrayList<>(); + for (int ti = 0; ti < 1; ++ti) + { + Random rand = new Random(ti); + tasks.add(es.submit(() -> + { + for (int i = 0; i < ITERS; ++i) + try + { + pushData(rand, COUNT); + deleteData(rand, COUNT / 3); + } + catch (Throwable e) + { + throw new AssertionError(e); + } + })); + } + for (Future task : tasks) + task.get(); + + flush(); + long onEndTime = System.currentTimeMillis(); + int startRowCount = countRows(cfs); + int startTombCount = countTombstoneMarkers(cfs); + int startRowDeletions = countRowDeletions(cfs); + int startTableCount = cfs.getLiveSSTables().size(); + long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables()); + System.out.println(); + + String hashesBefore = getHashes(); + + long startTime = System.currentTimeMillis(); + CompactionManager.instance.performGarbageCollection(cfs, tombstoneOption, 0); + long endTime = System.currentTimeMillis(); + + int endRowCount = countRows(cfs); + int endTombCount = countTombstoneMarkers(cfs); + int endRowDeletions = countRowDeletions(cfs); + int endTableCount = cfs.getLiveSSTables().size(); + long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables()); + + System.out.println(cfs.getCompactionParametersJson()); + System.out.println(String.format("%s compactions completed in %.3fs", + tombstoneOption.toString(), (endTime - startTime) * 1e-3)); + System.out.println(String.format("Operations completed in %.3fs, out of which %.3f for ongoing " + backgroundTombstoneOption + " background compactions", + (onEndTime - onStartTime) * 1e-3, compactionTimeNanos * 1e-9)); + System.out.println(String.format("At start: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers", + startTableCount, startSize, startRowCount, startRowDeletions, startTombCount)); + System.out.println(String.format("At end: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers", + endTableCount, endSize, endRowCount, endRowDeletions, endTombCount)); + + String hashesAfter = getHashes(); + Assert.assertEquals(hashesBefore, hashesAfter); + } + + private String getHashes() throws Throwable + { + long startTime = System.currentTimeMillis(); + String hashes = Arrays.toString(getRows(execute(hashQuery))[0]); + long endTime = System.currentTimeMillis(); + System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3)); + return hashes; + } + + @Test + public void testCellAtEnd() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY); + } + + @Test + public void testRowAtEnd() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY); + } + + @Test + public void testCellThroughout() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, LEVELED_STRATEGY); + } + + @Test + public void testRowThroughout() throws Throwable + { + testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, LEVELED_STRATEGY); + } + + @Test + public void testCopyCompaction() throws Throwable + { + testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, LEVELED_STRATEGY); + } + + @Test + public void testCellAtEndSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, SIZE_TIERED_STRATEGY); + } + + @Test + public void testRowAtEndSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.ROW, TombstoneOption.NONE, SIZE_TIERED_STRATEGY); + } + + @Test + public void testCellThroughoutSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, SIZE_TIERED_STRATEGY); + } + + @Test + public void testRowThroughoutSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, SIZE_TIERED_STRATEGY); + } + + @Test + public void testCopyCompactionSizeTiered() throws Throwable + { + testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, SIZE_TIERED_STRATEGY); + } + + int countTombstoneMarkers(ColumnFamilyStore cfs) + { + return count(cfs, x -> x.isRangeTombstoneMarker()); + } + + int countRowDeletions(ColumnFamilyStore cfs) + { + return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive()); + } + + int countRows(ColumnFamilyStore cfs) + { ++ boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness(); + int nowInSec = FBUtilities.nowInSeconds(); - return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec)); ++ return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec, enforceStrictLiveness)); + } + + private int count(ColumnFamilyStore cfs, Predicate predicate) + { + int count = 0; + for (SSTableReader reader : cfs.getLiveSSTables()) + count += count(reader, predicate); + return count; + } + + int count(SSTableReader reader, Predicate predicate) + { + int instances = 0; + try (ISSTableScanner partitions = reader.getScanner()) + { + while (partitions.hasNext()) + { + try (UnfilteredRowIterator iter = partitions.next()) + { + while (iter.hasNext()) + { + Unfiltered atom = iter.next(); + if (predicate.test(atom)) + ++instances; + } + } + } + } + return instances; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org For additional commands, e-mail: commits-help@cassandra.apache.org