Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 10ABF17E77 for ; Thu, 16 Oct 2014 16:41:17 +0000 (UTC) Received: (qmail 90153 invoked by uid 500); 16 Oct 2014 16:41:16 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 90117 invoked by uid 500); 16 Oct 2014 16:41:16 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 90106 invoked by uid 99); 16 Oct 2014 16:41:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Oct 2014 16:41:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 739FD9CC2B7; Thu, 16 Oct 2014 16:41:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Message-Id: <2881973245e0473191fba2656a4112be@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Add DateTieredCompactionStrategy Date: Thu, 16 Oct 2014 16:41:16 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 98bcf4022 -> 77dae508c Add DateTieredCompactionStrategy Patch by Bjorn Hegerfors; reviewed by marcuse for CASSANDRA-6602 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77dae508 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77dae508 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77dae508 Branch: refs/heads/cassandra-2.0 Commit: 77dae508cf65d847e56dafdb77059c72448711a6 Parents: 98bcf40 Author: Bjorn Hegerfors Authored: Fri Oct 10 18:01:24 2014 +0200 Committer: Marcus Eriksson Committed: Thu Oct 16 18:17:21 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 8 + pylib/cqlshlib/cql3handling.py | 4 + pylib/cqlshlib/cqlhandling.py | 3 +- .../DateTieredCompactionStrategy.java | 374 +++++++++++++++++++ .../DateTieredCompactionStrategyOptions.java | 100 +++++ .../DateTieredCompactionStrategyTest.java | 242 ++++++++++++ 7 files changed, 731 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 158a48b..cd4b6bb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Fix hint replay with many accumulated expired hints (CASSANDRA-6998) * Fix duplicate results in DISTINCT queries on static columns with query paging (CASSANDRA-8108) + * Add DateTieredCompactionStrategy (CASSANDRA-6602) * Properly validate ascii and utf8 string literals in CQL queries (CASSANDRA-8101) * (cqlsh) Fix autocompletion for alter keyspace (CASSANDRA-8021) * Create backup directories for commitlog archiving during startup (CASSANDRA-8111) http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 7fa8be9..102a87b 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -13,6 +13,14 @@ restore snapshots created with the previous major version using the 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +2.0.11 +====== +New features +------------ + - DateTieredCompactionStrategy added, optimized for time series data and groups + data that is written closely in time (CASSANDRA-6602 for details). Consider + this experimental for now. + 2.0.10 ====== New features http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index c08088a..0b7863c 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -479,6 +479,10 @@ def cf_prop_val_mapkey_completer(ctxt, cass): opts.add('cold_reads_to_omit') elif csc == 'LeveledCompactionStrategy': opts.add('sstable_size_in_mb') + elif csc == 'DateTieredCompactionStrategy': + opts.add('base_time_seconds') + opts.add('max_sstable_age_days') + opts.add('timestamp_resolution') return map(escape_value, opts) return () http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/pylib/cqlshlib/cqlhandling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py index 86abf02..0d54630 100644 --- a/pylib/cqlshlib/cqlhandling.py +++ b/pylib/cqlshlib/cqlhandling.py @@ -35,7 +35,8 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet): available_compaction_classes = ( 'LeveledCompactionStrategy', - 'SizeTieredCompactionStrategy' + 'SizeTieredCompactionStrategy', + 'DateTieredCompactionStrategy' ) replication_strategies = ( http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java new file mode 100644 index 0000000..9c708db --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.util.*; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.cql3.statements.CFPropDefs; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.utils.Pair; + +public class DateTieredCompactionStrategy extends AbstractCompactionStrategy +{ + private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class); + + protected DateTieredCompactionStrategyOptions options; + protected volatile int estimatedRemainingTasks; + + public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map options) + { + super(cfs, options); + this.estimatedRemainingTasks = 0; + this.options = new DateTieredCompactionStrategyOptions(options); + } + + @Override + public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) + { + if (!isEnabled()) + return null; + + while (true) + { + List latestBucket = getNextBackgroundSStables(gcBefore); + + if (latestBucket.isEmpty()) + return null; + + if (cfs.getDataTracker().markCompacting(latestBucket)) + return new CompactionTask(cfs, latestBucket, gcBefore); + } + } + + /** + * + * @param gcBefore + * @return + */ + private List getNextBackgroundSStables(final int gcBefore) + { + if (!isEnabled() || cfs.getSSTables().isEmpty()) + return Collections.emptyList(); + + int base = cfs.getMinimumCompactionThreshold(); + long now = getNow(); + + Iterable candidates = filterSuspectSSTables(cfs.getUncompactingSSTables()); + + List mostInteresting = getCompactionCandidates(candidates, now, base); + if (mostInteresting != null) + return mostInteresting; + + // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone + // ratio is greater than threshold. + List sstablesWithTombstones = Lists.newArrayList(); + for (SSTableReader sstable : candidates) + { + if (worthDroppingTombstones(sstable, gcBefore)) + sstablesWithTombstones.add(sstable); + } + if (sstablesWithTombstones.isEmpty()) + return Collections.emptyList(); + + return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator())); + } + + private List getCompactionCandidates(Iterable candidateSSTables, long now, int base) + { + Iterable candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge, now); + + List> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates), options.baseTime, base, now); + logger.debug("Compaction buckets are {}", buckets); + updateEstimatedCompactionsByTasks(buckets); + List mostInteresting = newestBucket(buckets, cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold()); + if (!mostInteresting.isEmpty()) + return mostInteresting; + return null; + } + + /** + * Gets the timestamp that DateTieredCompactionStrategy considers to be the "current time". + * @return the maximum timestamp across all SSTables. + * @throws java.util.NoSuchElementException if there are no SSTables. + */ + private long getNow() + { + return Collections.max(cfs.getSSTables(), new Comparator() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + return Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp()); + } + }).getMaxTimestamp(); + } + + /** + * Removes all sstables with max timestamp older than maxSSTableAge. + * @param sstables all sstables to consider + * @param maxSSTableAge the age in milliseconds when an SSTable stops participating in compactions + * @param now current time. SSTables with max timestamp less than (now - maxSSTableAge) are filtered. + * @return a list of sstables with the oldest sstables excluded + */ + @VisibleForTesting + static Iterable filterOldSSTables(List sstables, long maxSSTableAge, long now) + { + if (maxSSTableAge == 0) + return sstables; + final long cutoff = now - maxSSTableAge; + return Iterables.filter(sstables, new Predicate() + { + @Override + public boolean apply(SSTableReader sstable) + { + return sstable.getMaxTimestamp() >= cutoff; + } + }); + } + + /** + * + * @param sstables + * @return + */ + public static List> createSSTableAndMinTimestampPairs(Iterable sstables) + { + List> sstableMinTimestampPairs = Lists.newArrayListWithCapacity(Iterables.size(sstables)); + for (SSTableReader sstable : sstables) + sstableMinTimestampPairs.add(Pair.create(sstable, sstable.getMinTimestamp())); + return sstableMinTimestampPairs; + } + + + /** + * A target time span used for bucketing SSTables based on timestamps. + */ + private static class Target + { + // How big a range of timestamps fit inside the target. + public final long size; + // A timestamp t hits the target iff t / size == divPosition. + public final long divPosition; + + public Target(long size, long divPosition) + { + this.size = size; + this.divPosition = divPosition; + } + + /** + * Compares the target to a timestamp. + * @param timestamp the timestamp to compare. + * @return a negative integer, zero, or a positive integer as the target lies before, covering, or after than the timestamp. + */ + public int compareToTimestamp(long timestamp) + { + return Long.compare(divPosition, timestamp / size); + } + + /** + * Tells if the timestamp hits the target. + * @param timestamp the timestamp to test. + * @return true iff timestamp / size == divPosition. + */ + public boolean onTarget(long timestamp) + { + return compareToTimestamp(timestamp) == 0; + } + + /** + * Gets the next target, which represents an earlier time span. + * @param base The number of contiguous targets that will have the same size. Targets following those will be base times as big. + * @return + */ + public Target nextTarget(int base) + { + if (divPosition % base > 0) + return new Target(size, divPosition - 1); + else + return new Target(size * base, divPosition / base - 1); + } + } + + + /** + * Group files with similar min timestamp into buckets. Files with recent min timestamps are grouped together into + * buckets designated to short timespans while files with older timestamps are grouped into buckets representing + * longer timespans. + * @param files pairs consisting of a file and its min timestamp + * @param timeUnit + * @param base + * @param now + * @return a list of buckets of files. The list is ordered such that the files with newest timestamps come first. + * Each bucket is also a list of files ordered from newest to oldest. + */ + @VisibleForTesting + static List> getBuckets(Collection> files, long timeUnit, int base, long now) + { + // Sort files by age. Newest first. + final List> sortedFiles = Lists.newArrayList(files); + Collections.sort(sortedFiles, Collections.reverseOrder(new Comparator>() + { + public int compare(Pair p1, Pair p2) + { + return p1.right.compareTo(p2.right); + } + })); + + List> buckets = Lists.newArrayList(); + Target target = getInitialTarget(now, timeUnit); + PeekingIterator> it = Iterators.peekingIterator(sortedFiles.iterator()); + + outerLoop: + while (it.hasNext()) + { + while (!target.onTarget(it.peek().right)) + { + // If the file is too new for the target, skip it. + if (target.compareToTimestamp(it.peek().right) < 0) + { + it.next(); + + if (!it.hasNext()) + break outerLoop; + } + else // If the file is too old for the target, switch targets. + target = target.nextTarget(base); + } + + List bucket = Lists.newArrayList(); + while (target.onTarget(it.peek().right)) + { + bucket.add(it.next().left); + + if (!it.hasNext()) + break; + } + buckets.add(bucket); + } + + return buckets; + } + + @VisibleForTesting + static Target getInitialTarget(long now, long timeUnit) + { + return new Target(timeUnit, now / timeUnit); + } + + + private void updateEstimatedCompactionsByTasks(List> tasks) + { + int n = 0; + for (List bucket : tasks) + { + if (bucket.size() >= cfs.getMinimumCompactionThreshold()) + n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold()); + } + estimatedRemainingTasks = n; + } + + + /** + * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds. + * @param minThreshold minimum number of sstables in a bucket to qualify. + * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this). + * @return a bucket (list) of sstables to compact. + */ + @VisibleForTesting + static List newestBucket(List> buckets, int minThreshold, int maxThreshold) + { + // Skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold sstables. + for (List bucket : buckets) + if (bucket.size() >= minThreshold) + return trimToThreshold(bucket, maxThreshold); + return Collections.emptyList(); + } + + /** + * @param bucket list of sstables, ordered from newest to oldest by getMinTimestamp(). + * @param maxThreshold maximum number of sstables in a single compaction task. + * @return A bucket trimmed to the maxThreshold newest sstables. + */ + @VisibleForTesting + static List trimToThreshold(List bucket, int maxThreshold) + { + // Trim the oldest sstables off the end to meet the maxThreshold + return bucket.subList(0, Math.min(bucket.size(), maxThreshold)); + } + + @Override + public synchronized AbstractCompactionTask getMaximalTask(int gcBefore) + { + Iterable sstables = cfs.markAllCompacting(); + if (sstables == null) + return null; + + return new CompactionTask(cfs, sstables, gcBefore); + } + + @Override + public synchronized AbstractCompactionTask getUserDefinedTask(Collection sstables, int gcBefore) + { + assert !sstables.isEmpty(); // checked for by CM.submitUserDefined + + if (!cfs.getDataTracker().markCompacting(sstables)) + { + logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); + return null; + } + + return new CompactionTask(cfs, sstables, gcBefore).setUserDefined(true); + } + + public int getEstimatedRemainingTasks() + { + return estimatedRemainingTasks; + } + + public long getMaxSSTableBytes() + { + return Long.MAX_VALUE; + } + + + public static Map validateOptions(Map options) throws ConfigurationException + { + Map uncheckedOptions = AbstractCompactionStrategy.validateOptions(options); + uncheckedOptions = DateTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions); + + uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD); + uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD); + + return uncheckedOptions; + } + + public String toString() + { + return String.format("DateTieredCompactionStrategy[%s/%s]", + cfs.getMinimumCompactionThreshold(), + cfs.getMaximumCompactionThreshold()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java new file mode 100644 index 0000000..9fed3e0 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.exceptions.ConfigurationException; + +public final class DateTieredCompactionStrategyOptions +{ + protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS; + protected static final long DEFAULT_MAX_SSTABLE_AGE_DAYS = 365; + protected static final long DEFAULT_BASE_TIME_SECONDS = 60 * 60; + protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution"; + protected static final String MAX_SSTABLE_AGE_KEY = "max_sstable_age_days"; + protected static final String BASE_TIME_KEY = "base_time_seconds"; + + protected final long maxSSTableAge; + protected final long baseTime; + + public DateTieredCompactionStrategyOptions(Map options) + { + String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY); + TimeUnit timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue); + optionValue = options.get(MAX_SSTABLE_AGE_KEY); + maxSSTableAge = timestampResolution.convert(optionValue == null ? DEFAULT_MAX_SSTABLE_AGE_DAYS : Long.parseLong(optionValue), TimeUnit.DAYS); + optionValue = options.get(BASE_TIME_KEY); + baseTime = timestampResolution.convert(optionValue == null ? DEFAULT_BASE_TIME_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS); + } + + public DateTieredCompactionStrategyOptions() + { + maxSSTableAge = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_MAX_SSTABLE_AGE_DAYS, TimeUnit.DAYS); + baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS); + } + + public static Map validateOptions(Map options, Map uncheckedOptions) throws ConfigurationException + { + String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY); + try + { + if (optionValue != null) + TimeUnit.valueOf(optionValue); + } + catch (IllegalArgumentException e) + { + throw new ConfigurationException(String.format("timestamp_resolution %s is not valid", optionValue)); + } + + optionValue = options.get(MAX_SSTABLE_AGE_KEY); + try + { + long maxSStableAge = optionValue == null ? DEFAULT_MAX_SSTABLE_AGE_DAYS : Long.parseLong(optionValue); + if (maxSStableAge < 0) + { + throw new ConfigurationException(String.format("%s must be non-negative: %d", MAX_SSTABLE_AGE_KEY, maxSStableAge)); + } + } + catch (NumberFormatException e) + { + throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, MAX_SSTABLE_AGE_KEY), e); + } + + optionValue = options.get(BASE_TIME_KEY); + try + { + long baseTime = optionValue == null ? DEFAULT_BASE_TIME_SECONDS : Long.parseLong(optionValue); + if (baseTime <= 0) + { + throw new ConfigurationException(String.format("%s must be greater than 0, but was %d", BASE_TIME_KEY, baseTime)); + } + } + catch (NumberFormatException e) + { + throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, BASE_TIME_KEY), e); + } + + uncheckedOptions.remove(MAX_SSTABLE_AGE_KEY); + uncheckedOptions.remove(BASE_TIME_KEY); + uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY); + + return uncheckedOptions; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java new file mode 100644 index 0000000..299e1af --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.junit.Test; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.getBuckets; +import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.newestBucket; +import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.trimToThreshold; +import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.filterOldSSTables; +import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.validateOptions; + +import static org.junit.Assert.*; + +public class DateTieredCompactionStrategyTest extends SchemaLoader +{ + public static final String KEYSPACE1 = "Keyspace1"; + private static final String CF_STANDARD1 = "Standard1"; + + @Test + public void testOptionsValidation() throws ConfigurationException + { + Map options = new HashMap<>(); + options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "30"); + options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "1825"); + Map unvalidated = validateOptions(options); + assertTrue(unvalidated.isEmpty()); + + try + { + options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "0"); + validateOptions(options); + fail(String.format("%s == 0 should be rejected", DateTieredCompactionStrategyOptions.BASE_TIME_KEY)); + } + catch (ConfigurationException e) {} + + try + { + options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "-1337"); + validateOptions(options); + fail(String.format("%Negative %s should be rejected", DateTieredCompactionStrategyOptions.BASE_TIME_KEY)); + } + catch (ConfigurationException e) + { + options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "1"); + } + + try + { + options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "-1337"); + validateOptions(options); + fail(String.format("%Negative %s should be rejected", DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY)); + } + catch (ConfigurationException e) + { + options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "0"); + } + + options.put("bad_option", "1.0"); + unvalidated = validateOptions(options); + assertTrue(unvalidated.containsKey("bad_option")); + } + + @Test + public void testGetBuckets() + { + List> pairs = Lists.newArrayList( + Pair.create("a", 199L), + Pair.create("b", 299L), + Pair.create("a", 1L), + Pair.create("b", 201L) + ); + List> buckets = getBuckets(pairs, 100L, 2, 200L); + assertEquals(2, buckets.size()); + + for (List bucket : buckets) + { + assertEquals(2, bucket.size()); + assertEquals(bucket.get(0), bucket.get(1)); + } + + + pairs = Lists.newArrayList( + Pair.create("a", 2000L), + Pair.create("b", 3600L), + Pair.create("a", 200L), + Pair.create("c", 3950L), + Pair.create("too new", 4125L), + Pair.create("b", 3899L), + Pair.create("c", 3900L) + ); + buckets = getBuckets(pairs, 100L, 3, 4050L); + // targets (divPosition, size): (40, 100), (39, 100), (12, 300), (3, 900), (0, 2700) + // in other words: 0 - 2699, 2700 - 3599, 3600 - 3899, 3900 - 3999, 4000 - 4099 + assertEquals(3, buckets.size()); + + for (List bucket : buckets) + { + assertEquals(2, bucket.size()); + assertEquals(bucket.get(0), bucket.get(1)); + } + + + // Test base 1. + pairs = Lists.newArrayList( + Pair.create("a", 200L), + Pair.create("a", 299L), + Pair.create("b", 2000L), + Pair.create("b", 2014L), + Pair.create("c", 3610L), + Pair.create("c", 3690L), + Pair.create("d", 3898L), + Pair.create("d", 3899L), + Pair.create("e", 3900L), + Pair.create("e", 3950L), + Pair.create("too new", 4125L) + ); + buckets = getBuckets(pairs, 100L, 1, 4050L); + + assertEquals(5, buckets.size()); + + for (List bucket : buckets) + { + assertEquals(2, bucket.size()); + assertEquals(bucket.get(0), bucket.get(1)); + } + } + + @Test + public void testPrepBucket() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + // create 3 sstables + int numSSTables = 3; + for (int r = 0; r < numSSTables; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + RowMutation rm = new RowMutation(KEYSPACE1, key.key); + rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, r); + rm.apply(); + cfs.forceBlockingFlush(); + } + cfs.forceBlockingFlush(); + + List sstrs = new ArrayList<>(cfs.getSSTables()); + + List newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32); + assertTrue("nothing should be returned when all buckets are below the min threshold", newBucket.isEmpty()); + + assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp()); + assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp()); + assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp()); + + // if we have more than the max threshold, the oldest should be dropped + Collections.sort(sstrs, Collections.reverseOrder(new Comparator() { + public int compare(SSTableReader o1, SSTableReader o2) { + return Long.compare(o1.getMinTimestamp(), o2.getMinTimestamp()) ; + } + })); + + List bucket = trimToThreshold(sstrs, 2); + assertEquals("one bucket should have been dropped", 2, bucket.size()); + for (SSTableReader sstr : bucket) + assertFalse("the oldest sstable should be dropped", sstr.getMinTimestamp() == 0); + } + + @Test + public void testFilterOldSSTables() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + // create 3 sstables + int numSSTables = 3; + for (int r = 0; r < numSSTables; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + RowMutation rm = new RowMutation(KEYSPACE1, key.key); + rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, r); + rm.apply(); + cfs.forceBlockingFlush(); + } + cfs.forceBlockingFlush(); + + Iterable filtered; + List sstrs = new ArrayList<>(cfs.getSSTables()); + + filtered = filterOldSSTables(sstrs, 0, 2); + assertEquals("when maxSSTableAge is zero, no sstables should be filtered", sstrs.size(), Iterables.size(filtered)); + + filtered = filterOldSSTables(sstrs, 1, 2); + assertEquals("only the newest 2 sstables should remain", 2, Iterables.size(filtered)); + + filtered = filterOldSSTables(sstrs, 1, 3); + assertEquals("only the newest sstable should remain", 1, Iterables.size(filtered)); + + filtered = filterOldSSTables(sstrs, 1, 4); + assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered)); + } +}