cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/2] git commit: Add DateTieredCompactionStrategy
Date Thu, 16 Oct 2014 16:41:34 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 1ef7d056d -> 1dea98396


Add DateTieredCompactionStrategy

Patch by Bjorn Hegerfors; reviewed by marcuse for CASSANDRA-6602


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77dae508
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77dae508
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77dae508

Branch: refs/heads/cassandra-2.1
Commit: 77dae508cf65d847e56dafdb77059c72448711a6
Parents: 98bcf40
Author: Bjorn Hegerfors <bj0rn@spotify.com>
Authored: Fri Oct 10 18:01:24 2014 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Thu Oct 16 18:17:21 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   8 +
 pylib/cqlshlib/cql3handling.py                  |   4 +
 pylib/cqlshlib/cqlhandling.py                   |   3 +-
 .../DateTieredCompactionStrategy.java           | 374 +++++++++++++++++++
 .../DateTieredCompactionStrategyOptions.java    | 100 +++++
 .../DateTieredCompactionStrategyTest.java       | 242 ++++++++++++
 7 files changed, 731 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 158a48b..cd4b6bb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,7 @@
  * Fix hint replay with many accumulated expired hints (CASSANDRA-6998)
  * Fix duplicate results in DISTINCT queries on static columns with query
    paging (CASSANDRA-8108)
+ * Add DateTieredCompactionStrategy (CASSANDRA-6602)
  * Properly validate ascii and utf8 string literals in CQL queries (CASSANDRA-8101)
  * (cqlsh) Fix autocompletion for alter keyspace (CASSANDRA-8021)
  * Create backup directories for commitlog archiving during startup (CASSANDRA-8111)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 7fa8be9..102a87b 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,14 @@ restore snapshots created with the previous major version using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+2.0.11
+======
+New features
+------------
+    - DateTieredCompactionStrategy added, optimized for time series data and groups
+      data that is written closely in time (CASSANDRA-6602 for details). Consider
+      this experimental for now.
+
 2.0.10
 ======
 New features

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index c08088a..0b7863c 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -479,6 +479,10 @@ def cf_prop_val_mapkey_completer(ctxt, cass):
             opts.add('cold_reads_to_omit')
         elif csc == 'LeveledCompactionStrategy':
             opts.add('sstable_size_in_mb')
+        elif csc == 'DateTieredCompactionStrategy':
+            opts.add('base_time_seconds')
+            opts.add('max_sstable_age_days')
+            opts.add('timestamp_resolution')
         return map(escape_value, opts)
     return ()
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index 86abf02..0d54630 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -35,7 +35,8 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
 
     available_compaction_classes = (
         'LeveledCompactionStrategy',
-        'SizeTieredCompactionStrategy'
+        'SizeTieredCompactionStrategy',
+        'DateTieredCompactionStrategy'
     )
 
     replication_strategies = (

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
new file mode 100644
index 0000000..9c708db
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.statements.CFPropDefs;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.Pair;
+
+public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class);
+
+    protected DateTieredCompactionStrategyOptions options;
+    protected volatile int estimatedRemainingTasks;
+
+    public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String>
options)
+    {
+        super(cfs, options);
+        this.estimatedRemainingTasks = 0;
+        this.options = new DateTieredCompactionStrategyOptions(options);
+    }
+
+    @Override
+    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        if (!isEnabled())
+            return null;
+
+        while (true)
+        {
+            List<SSTableReader> latestBucket = getNextBackgroundSStables(gcBefore);
+
+            if (latestBucket.isEmpty())
+                return null;
+
+            if (cfs.getDataTracker().markCompacting(latestBucket))
+                return new CompactionTask(cfs, latestBucket, gcBefore);
+        }
+    }
+
+    /**
+     *
+     * @param gcBefore
+     * @return
+     */
+    private List<SSTableReader> getNextBackgroundSStables(final int gcBefore)
+    {
+        if (!isEnabled() || cfs.getSSTables().isEmpty())
+            return Collections.emptyList();
+
+        int base = cfs.getMinimumCompactionThreshold();
+        long now = getNow();
+
+        Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
+
+        List<SSTableReader> mostInteresting = getCompactionCandidates(candidates, now,
base);
+        if (mostInteresting != null)
+            return mostInteresting;
+
+        // if there is no sstable to compact in standard way, try compacting single sstable
whose droppable tombstone
+        // ratio is greater than threshold.
+        List<SSTableReader> sstablesWithTombstones = Lists.newArrayList();
+        for (SSTableReader sstable : candidates)
+        {
+            if (worthDroppingTombstones(sstable, gcBefore))
+                sstablesWithTombstones.add(sstable);
+        }
+        if (sstablesWithTombstones.isEmpty())
+            return Collections.emptyList();
+
+        return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+    }
+
+    private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader>
candidateSSTables, long now, int base)
+    {
+        Iterable<SSTableReader> candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables),
options.maxSSTableAge, now);
+
+        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates),
options.baseTime, base, now);
+        logger.debug("Compaction buckets are {}", buckets);
+        updateEstimatedCompactionsByTasks(buckets);
+        List<SSTableReader> mostInteresting = newestBucket(buckets, cfs.getMinimumCompactionThreshold(),
cfs.getMaximumCompactionThreshold());
+        if (!mostInteresting.isEmpty())
+            return mostInteresting;
+        return null;
+    }
+
+    /**
+     * Gets the timestamp that DateTieredCompactionStrategy considers to be the "current
time".
+     * @return the maximum timestamp across all SSTables.
+     * @throws java.util.NoSuchElementException if there are no SSTables.
+     */
+    private long getNow()
+    {
+        return Collections.max(cfs.getSSTables(), new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                return Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp());
+            }
+        }).getMaxTimestamp();
+    }
+
+    /**
+     * Removes all sstables with max timestamp older than maxSSTableAge.
+     * @param sstables all sstables to consider
+     * @param maxSSTableAge the age in milliseconds when an SSTable stops participating in
compactions
+     * @param now current time. SSTables with max timestamp less than (now - maxSSTableAge)
are filtered.
+     * @return a list of sstables with the oldest sstables excluded
+     */
+    @VisibleForTesting
+    static Iterable<SSTableReader> filterOldSSTables(List<SSTableReader> sstables,
long maxSSTableAge, long now)
+    {
+        if (maxSSTableAge == 0)
+            return sstables;
+        final long cutoff = now - maxSSTableAge;
+        return Iterables.filter(sstables, new Predicate<SSTableReader>()
+        {
+            @Override
+            public boolean apply(SSTableReader sstable)
+            {
+                return sstable.getMaxTimestamp() >= cutoff;
+            }
+        });
+    }
+
+    /**
+     *
+     * @param sstables
+     * @return
+     */
+    public static List<Pair<SSTableReader, Long>> createSSTableAndMinTimestampPairs(Iterable<SSTableReader>
sstables)
+    {
+        List<Pair<SSTableReader, Long>> sstableMinTimestampPairs = Lists.newArrayListWithCapacity(Iterables.size(sstables));
+        for (SSTableReader sstable : sstables)
+            sstableMinTimestampPairs.add(Pair.create(sstable, sstable.getMinTimestamp()));
+        return sstableMinTimestampPairs;
+    }
+
+
+    /**
+     * A target time span used for bucketing SSTables based on timestamps.
+     */
+    private static class Target
+    {
+        // How big a range of timestamps fit inside the target.
+        public final long size;
+        // A timestamp t hits the target iff t / size == divPosition.
+        public final long divPosition;
+
+        public Target(long size, long divPosition)
+        {
+            this.size = size;
+            this.divPosition = divPosition;
+        }
+
+        /**
+         * Compares the target to a timestamp.
+         * @param timestamp the timestamp to compare.
+         * @return a negative integer, zero, or a positive integer as the target lies before,
covering, or after than the timestamp.
+         */
+        public int compareToTimestamp(long timestamp)
+        {
+            return Long.compare(divPosition, timestamp / size);
+        }
+
+        /**
+         * Tells if the timestamp hits the target.
+         * @param timestamp the timestamp to test.
+         * @return <code>true</code> iff timestamp / size == divPosition.
+         */
+        public boolean onTarget(long timestamp)
+        {
+            return compareToTimestamp(timestamp) == 0;
+        }
+
+        /**
+         * Gets the next target, which represents an earlier time span.
+         * @param base The number of contiguous targets that will have the same size. Targets
following those will be <code>base</code> times as big.
+         * @return
+         */
+        public Target nextTarget(int base)
+        {
+            if (divPosition % base > 0)
+                return new Target(size, divPosition - 1);
+            else
+                return new Target(size * base, divPosition / base - 1);
+        }
+    }
+
+
+    /**
+     * Group files with similar min timestamp into buckets. Files with recent min timestamps
are grouped together into
+     * buckets designated to short timespans while files with older timestamps are grouped
into buckets representing
+     * longer timespans.
+     * @param files pairs consisting of a file and its min timestamp
+     * @param timeUnit
+     * @param base
+     * @param now
+     * @return a list of buckets of files. The list is ordered such that the files with newest
timestamps come first.
+     *         Each bucket is also a list of files ordered from newest to oldest.
+     */
+    @VisibleForTesting
+    static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>>
files, long timeUnit, int base, long now)
+    {
+        // Sort files by age. Newest first.
+        final List<Pair<T, Long>> sortedFiles = Lists.newArrayList(files);
+        Collections.sort(sortedFiles, Collections.reverseOrder(new Comparator<Pair<T,
Long>>()
+        {
+            public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
+            {
+                return p1.right.compareTo(p2.right);
+            }
+        }));
+
+        List<List<T>> buckets = Lists.newArrayList();
+        Target target = getInitialTarget(now, timeUnit);
+        PeekingIterator<Pair<T, Long>> it = Iterators.peekingIterator(sortedFiles.iterator());
+
+        outerLoop:
+        while (it.hasNext())
+        {
+            while (!target.onTarget(it.peek().right))
+            {
+                // If the file is too new for the target, skip it.
+                if (target.compareToTimestamp(it.peek().right) < 0)
+                {
+                    it.next();
+
+                    if (!it.hasNext())
+                        break outerLoop;
+                }
+                else // If the file is too old for the target, switch targets.
+                    target = target.nextTarget(base);
+            }
+
+            List<T> bucket = Lists.newArrayList();
+            while (target.onTarget(it.peek().right))
+            {
+                bucket.add(it.next().left);
+
+                if (!it.hasNext())
+                    break;
+            }
+            buckets.add(bucket);
+        }
+
+        return buckets;
+    }
+
+    @VisibleForTesting
+    static Target getInitialTarget(long now, long timeUnit)
+    {
+        return new Target(timeUnit, now / timeUnit);
+    }
+
+
+    private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>>
tasks)
+    {
+        int n = 0;
+        for (List<SSTableReader> bucket : tasks)
+        {
+            if (bucket.size() >= cfs.getMinimumCompactionThreshold())
+                n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold());
+        }
+        estimatedRemainingTasks = n;
+    }
+
+
+    /**
+     * @param buckets list of buckets, sorted from newest to oldest, from which to return
the newest bucket within thresholds.
+     * @param minThreshold minimum number of sstables in a bucket to qualify.
+     * @param maxThreshold maximum number of sstables to compact at once (the returned bucket
will be trimmed down to this).
+     * @return a bucket (list) of sstables to compact.
+     */
+    @VisibleForTesting
+    static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets,
int minThreshold, int maxThreshold)
+    {
+        // Skip buckets containing less than minThreshold sstables, and limit other buckets
to maxThreshold sstables.
+        for (List<SSTableReader> bucket : buckets)
+            if (bucket.size() >= minThreshold)
+                return trimToThreshold(bucket, maxThreshold);
+        return Collections.emptyList();
+    }
+
+    /**
+     * @param bucket list of sstables, ordered from newest to oldest by getMinTimestamp().
+     * @param maxThreshold maximum number of sstables in a single compaction task.
+     * @return A bucket trimmed to the <code>maxThreshold</code> newest sstables.
+     */
+    @VisibleForTesting
+    static List<SSTableReader> trimToThreshold(List<SSTableReader> bucket, int
maxThreshold)
+    {
+        // Trim the oldest sstables off the end to meet the maxThreshold
+        return bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+    }
+
+    @Override
+    public synchronized AbstractCompactionTask getMaximalTask(int gcBefore)
+    {
+        Iterable<SSTableReader> sstables = cfs.markAllCompacting();
+        if (sstables == null)
+            return null;
+
+        return new CompactionTask(cfs, sstables, gcBefore);
+    }
+
+    @Override
+    public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader>
sstables, int gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        if (!cfs.getDataTracker().markCompacting(sstables))
+        {
+            logger.debug("Unable to mark {} for compaction; probably a background compaction
got to it first.  You can disable background compactions temporarily if this is a problem",
sstables);
+            return null;
+        }
+
+        return new CompactionTask(cfs, sstables, gcBefore).setUserDefined(true);
+    }
+
+    public int getEstimatedRemainingTasks()
+    {
+        return estimatedRemainingTasks;
+    }
+
+    public long getMaxSSTableBytes()
+    {
+        return Long.MAX_VALUE;
+    }
+
+
+    public static Map<String, String> validateOptions(Map<String, String> options)
throws ConfigurationException
+    {
+        Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
+        uncheckedOptions = DateTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+        uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD);
+        uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD);
+
+        return uncheckedOptions;
+    }
+
+    public String toString()
+    {
+        return String.format("DateTieredCompactionStrategy[%s/%s]",
+                cfs.getMinimumCompactionThreshold(),
+                cfs.getMaximumCompactionThreshold());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
new file mode 100644
index 0000000..9fed3e0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public final class DateTieredCompactionStrategyOptions
+{
+    protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS;
+    protected static final long DEFAULT_MAX_SSTABLE_AGE_DAYS = 365;
+    protected static final long DEFAULT_BASE_TIME_SECONDS = 60 * 60;
+    protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
+    protected static final String MAX_SSTABLE_AGE_KEY = "max_sstable_age_days";
+    protected static final String BASE_TIME_KEY = "base_time_seconds";
+
+    protected final long maxSSTableAge;
+    protected final long baseTime;
+
+    public DateTieredCompactionStrategyOptions(Map<String, String> options)
+    {
+        String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+        TimeUnit timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION
: TimeUnit.valueOf(optionValue);
+        optionValue = options.get(MAX_SSTABLE_AGE_KEY);
+        maxSSTableAge = timestampResolution.convert(optionValue == null ? DEFAULT_MAX_SSTABLE_AGE_DAYS
: Long.parseLong(optionValue), TimeUnit.DAYS);
+        optionValue = options.get(BASE_TIME_KEY);
+        baseTime = timestampResolution.convert(optionValue == null ? DEFAULT_BASE_TIME_SECONDS
: Long.parseLong(optionValue), TimeUnit.SECONDS);
+    }
+
+    public DateTieredCompactionStrategyOptions()
+    {
+        maxSSTableAge = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_MAX_SSTABLE_AGE_DAYS,
TimeUnit.DAYS);
+        baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS);
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> options,
Map<String, String> uncheckedOptions) throws  ConfigurationException
+    {
+        String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+        try
+        {
+            if (optionValue != null)
+                TimeUnit.valueOf(optionValue);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException(String.format("timestamp_resolution %s is not
valid", optionValue));
+        }
+
+        optionValue = options.get(MAX_SSTABLE_AGE_KEY);
+        try
+        {
+            long maxSStableAge = optionValue == null ? DEFAULT_MAX_SSTABLE_AGE_DAYS : Long.parseLong(optionValue);
+            if (maxSStableAge < 0)
+            {
+                throw new ConfigurationException(String.format("%s must be non-negative:
%d", MAX_SSTABLE_AGE_KEY, maxSStableAge));
+            }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10)
for %s", optionValue, MAX_SSTABLE_AGE_KEY), e);
+        }
+
+        optionValue = options.get(BASE_TIME_KEY);
+        try
+        {
+            long baseTime = optionValue == null ? DEFAULT_BASE_TIME_SECONDS : Long.parseLong(optionValue);
+            if (baseTime <= 0)
+            {
+                throw new ConfigurationException(String.format("%s must be greater than 0,
but was %d", BASE_TIME_KEY, baseTime));
+            }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10)
for %s", optionValue, BASE_TIME_KEY), e);
+        }
+
+        uncheckedOptions.remove(MAX_SSTABLE_AGE_KEY);
+        uncheckedOptions.remove(BASE_TIME_KEY);
+        uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
+
+        return uncheckedOptions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77dae508/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
new file mode 100644
index 0000000..299e1af
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.getBuckets;
+import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.newestBucket;
+import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.trimToThreshold;
+import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.filterOldSSTables;
+import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.validateOptions;
+
+import static org.junit.Assert.*;
+
+public class DateTieredCompactionStrategyTest extends SchemaLoader
+{
+    public static final String KEYSPACE1 = "Keyspace1";
+    private static final String CF_STANDARD1 = "Standard1";
+
+    @Test
+    public void testOptionsValidation() throws ConfigurationException
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "30");
+        options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "1825");
+        Map<String, String> unvalidated = validateOptions(options);
+        assertTrue(unvalidated.isEmpty());
+
+        try
+        {
+            options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "0");
+            validateOptions(options);
+            fail(String.format("%s == 0 should be rejected", DateTieredCompactionStrategyOptions.BASE_TIME_KEY));
+        }
+        catch (ConfigurationException e) {}
+
+        try
+        {
+            options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "-1337");
+            validateOptions(options);
+            fail(String.format("%Negative %s should be rejected", DateTieredCompactionStrategyOptions.BASE_TIME_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "1");
+        }
+
+        try
+        {
+            options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "-1337");
+            validateOptions(options);
+            fail(String.format("%Negative %s should be rejected", DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "0");
+        }
+
+        options.put("bad_option", "1.0");
+        unvalidated = validateOptions(options);
+        assertTrue(unvalidated.containsKey("bad_option"));
+    }
+
+    @Test
+    public void testGetBuckets()
+    {
+        List<Pair<String, Long>> pairs = Lists.newArrayList(
+                Pair.create("a", 199L),
+                Pair.create("b", 299L),
+                Pair.create("a", 1L),
+                Pair.create("b", 201L)
+        );
+        List<List<String>> buckets = getBuckets(pairs, 100L, 2, 200L);
+        assertEquals(2, buckets.size());
+
+        for (List<String> bucket : buckets)
+        {
+            assertEquals(2, bucket.size());
+            assertEquals(bucket.get(0), bucket.get(1));
+        }
+
+
+        pairs = Lists.newArrayList(
+                Pair.create("a", 2000L),
+                Pair.create("b", 3600L),
+                Pair.create("a", 200L),
+                Pair.create("c", 3950L),
+                Pair.create("too new", 4125L),
+                Pair.create("b", 3899L),
+                Pair.create("c", 3900L)
+        );
+        buckets = getBuckets(pairs, 100L, 3, 4050L);
+        // targets (divPosition, size): (40, 100), (39, 100), (12, 300), (3, 900), (0, 2700)
+        // in other words: 0 - 2699, 2700 - 3599, 3600 - 3899, 3900 - 3999, 4000 - 4099
+        assertEquals(3, buckets.size());
+
+        for (List<String> bucket : buckets)
+        {
+            assertEquals(2, bucket.size());
+            assertEquals(bucket.get(0), bucket.get(1));
+        }
+
+
+        // Test base 1.
+        pairs = Lists.newArrayList(
+                Pair.create("a", 200L),
+                Pair.create("a", 299L),
+                Pair.create("b", 2000L),
+                Pair.create("b", 2014L),
+                Pair.create("c", 3610L),
+                Pair.create("c", 3690L),
+                Pair.create("d", 3898L),
+                Pair.create("d", 3899L),
+                Pair.create("e", 3900L),
+                Pair.create("e", 3950L),
+                Pair.create("too new", 4125L)
+        );
+        buckets = getBuckets(pairs, 100L, 1, 4050L);
+
+        assertEquals(5, buckets.size());
+
+        for (List<String> bucket : buckets)
+        {
+            assertEquals(2, bucket.size());
+            assertEquals(bucket.get(0), bucket.get(1));
+        }
+    }
+
+    @Test
+    public void testPrepBucket()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 3 sstables
+        int numSSTables = 3;
+        for (int r = 0; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(KEYSPACE1, key.key);
+            rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, r);
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+        cfs.forceBlockingFlush();
+
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+        List<SSTableReader> newBucket = newestBucket(Collections.singletonList(sstrs.subList(0,
2)), 4, 32);
+        assertTrue("nothing should be returned when all buckets are below the min threshold",
newBucket.isEmpty());
+
+        assertEquals("an sstable with a single value should have equal min/max timestamps",
sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp());
+        assertEquals("an sstable with a single value should have equal min/max timestamps",
sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp());
+        assertEquals("an sstable with a single value should have equal min/max timestamps",
sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp());
+
+        // if we have more than the max threshold, the oldest should be dropped
+        Collections.sort(sstrs, Collections.reverseOrder(new Comparator<SSTableReader>()
{
+            public int compare(SSTableReader o1, SSTableReader o2) {
+                return Long.compare(o1.getMinTimestamp(), o2.getMinTimestamp()) ;
+            }
+        }));
+
+        List<SSTableReader> bucket = trimToThreshold(sstrs, 2);
+        assertEquals("one bucket should have been dropped", 2, bucket.size());
+        for (SSTableReader sstr : bucket)
+            assertFalse("the oldest sstable should be dropped", sstr.getMinTimestamp() ==
0);
+    }
+
+    @Test
+    public void testFilterOldSSTables()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 3 sstables
+        int numSSTables = 3;
+        for (int r = 0; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(KEYSPACE1, key.key);
+            rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, r);
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+        cfs.forceBlockingFlush();
+
+        Iterable<SSTableReader> filtered;
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+        filtered = filterOldSSTables(sstrs, 0, 2);
+        assertEquals("when maxSSTableAge is zero, no sstables should be filtered", sstrs.size(),
Iterables.size(filtered));
+
+        filtered = filterOldSSTables(sstrs, 1, 2);
+        assertEquals("only the newest 2 sstables should remain", 2, Iterables.size(filtered));
+
+        filtered = filterOldSSTables(sstrs, 1, 3);
+        assertEquals("only the newest sstable should remain", 1, Iterables.size(filtered));
+
+        filtered = filterOldSSTables(sstrs, 1, 4);
+        assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered));
+    }
+}


Mime
View raw message