cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/2] cassandra git commit: Add option to only purge tombstones from repaired sstables
Date Tue, 11 Aug 2015 06:31:46 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk e8ac7edb7 -> b1c739874


Add option to only purge tombstones from repaired sstables

Patch by marcuse; reviewed by yukim for CASSANDRA-6434


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f0c12f3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f0c12f3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f0c12f3

Branch: refs/heads/trunk
Commit: 6f0c12f3a4668a5dcae162969843f02498ee7e6d
Parents: 76ca697
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Tue Jul 7 07:50:11 2015 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Tue Aug 11 08:25:55 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   6 +
 pylib/cqlshlib/cql3handling.py                  |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  93 ++++--
 .../cassandra/db/PartitionRangeReadCommand.java |  13 +-
 .../org/apache/cassandra/db/ReadCommand.java    |   4 +-
 .../db/SinglePartitionNamesCommand.java         |  16 +-
 .../db/SinglePartitionSliceCommand.java         |  15 +-
 .../compaction/AbstractCompactionStrategy.java  |   4 +-
 .../db/compaction/CompactionController.java     |  20 +-
 .../db/compaction/CompactionIterator.java       |   2 +-
 .../compaction/CompactionStrategyManager.java   |   5 +
 .../db/partitions/PurgingPartitionIterator.java |   5 +-
 .../db/RepairedDataTombstonesTest.java          | 292 +++++++++++++++++++
 14 files changed, 436 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b23c2d1..b882c23 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta1
+ * Add option to only purge repaired tombstones (CASSANDRA-6434)
  * Change authorization handling for MVs (CASSANDRA-9927)
  * Add custom JMX enabled executor for UDF sandbox (CASSANDRA-10026)
  * Fix row deletion bug for Materialized Views (CASSANDRA-10014)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 65df769..26fe902 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -23,6 +23,12 @@ New features
      for non-primary key queries, and perform much better for indexing high
      cardinality columns.
      See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views
+   - Option to not purge unrepaired tombstones. To avoid users having data resurrected
+     if repair has not been run within gc_grace_seconds, an option has been added to
+     only allow tombstones from repaired sstables to be purged. To enable, set the
+     compaction option 'only_purge_repaired_tombstones':true but keep in mind that if
+     you do not run repair for a long time, you will keep all tombstones around which
+     can cause other problems.
 
 
 Upgrading

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index a46da91..44a1e23 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -69,7 +69,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
         # (CQL3 option name, schema_columnfamilies column name (or None if same),
         #  list of known map keys)
         ('compaction', 'compaction_strategy_options',
-            ('class', 'max_threshold', 'tombstone_compaction_interval', 'tombstone_threshold',
'enabled', 'unchecked_tombstone_compaction')),
+            ('class', 'max_threshold', 'tombstone_compaction_interval', 'tombstone_threshold',
'enabled', 'unchecked_tombstone_compaction', 'only_purge_repaired_tombstones')),
         ('compression', 'compression_parameters',
             ('sstable_compression', 'chunk_length_kb', 'crc_check_chance')),
         ('caching', None,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 2db0ce9..1b30fc7 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -260,7 +260,7 @@ public class Memtable implements Comparable<Memtable>
                              100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio());
     }
 
-    public UnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter,
final DataRange dataRange, final boolean isForThrift)
+    public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter,
final DataRange dataRange, final boolean isForThrift)
     {
         AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
 
@@ -278,35 +278,26 @@ public class Memtable implements Comparable<Memtable>
                    ? partitions.tailMap(keyRange.left, includeStart)
                    : partitions.subMap(keyRange.left, includeStart, keyRange.right, includeStop);
 
-        final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter
= subMap.entrySet().iterator();
+        int minLocalDeletionTime = Integer.MAX_VALUE;
 
-        return new AbstractUnfilteredPartitionIterator()
-        {
-            public boolean isForThrift()
-            {
-                return isForThrift;
-            }
+        // avoid iterating over the memtable if we purge all tombstones
+        if (cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
+            minLocalDeletionTime = findMinLocalDeletionTime(subMap.entrySet().iterator());
 
-            public CFMetaData metadata()
-            {
-                return cfs.metadata;
-            }
+        final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter
= subMap.entrySet().iterator();
 
-            public boolean hasNext()
-            {
-                return iter.hasNext();
-            }
+        return new MemtableUnfilteredPartitionIterator(cfs, iter, isForThrift, minLocalDeletionTime,
columnFilter, dataRange);
+    }
 
-            public UnfilteredRowIterator next()
-            {
-                Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iter.next();
-                // Actual stored key should be true DecoratedKey
-                assert entry.getKey() instanceof DecoratedKey;
-                DecoratedKey key = (DecoratedKey)entry.getKey();
-                ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
-                return filter.getUnfilteredRowIterator(columnFilter, entry.getValue());
-            }
-        };
+    private int findMinLocalDeletionTime(Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>>
iterator)
+    {
+        int minLocalDeletionTime = Integer.MAX_VALUE;
+        while (iterator.hasNext())
+        {
+            Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iterator.next();
+            minLocalDeletionTime = Math.min(minLocalDeletionTime, entry.getValue().stats().minLocalDeletionTime);
+        }
+        return minLocalDeletionTime;
     }
 
     public Partition getPartition(DecoratedKey key)
@@ -463,6 +454,56 @@ public class Memtable implements Comparable<Memtable>
         }
     }
 
+    public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator
+    {
+        private final ColumnFamilyStore cfs;
+        private final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>>
iter;
+        private final boolean isForThrift;
+        private final int minLocalDeletionTime;
+        private final ColumnFilter columnFilter;
+        private final DataRange dataRange;
+
+        public MemtableUnfilteredPartitionIterator(ColumnFamilyStore cfs, Iterator<Map.Entry<PartitionPosition,
AtomicBTreePartition>> iter, boolean isForThrift, int minLocalDeletionTime, ColumnFilter
columnFilter, DataRange dataRange)
+        {
+            this.cfs = cfs;
+            this.iter = iter;
+            this.isForThrift = isForThrift;
+            this.minLocalDeletionTime = minLocalDeletionTime;
+            this.columnFilter = columnFilter;
+            this.dataRange = dataRange;
+        }
+
+        public boolean isForThrift()
+        {
+            return isForThrift;
+        }
+
+        public int getMinLocalDeletionTime()
+        {
+            return minLocalDeletionTime;
+        }
+
+        public CFMetaData metadata()
+        {
+            return cfs.metadata;
+        }
+
+        public boolean hasNext()
+        {
+            return iter.hasNext();
+        }
+
+        public UnfilteredRowIterator next()
+        {
+            Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iter.next();
+            // Actual stored key should be true DecoratedKey
+            assert entry.getKey() instanceof DecoratedKey;
+            DecoratedKey key = (DecoratedKey)entry.getKey();
+            ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
+            return filter.getUnfilteredRowIterator(columnFilter, entry.getValue());
+        }
+    }
+
     private static class ColumnsCollector
     {
         private final HashMap<ColumnDefinition, AtomicBoolean> predefined = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 2219a84..e7288cc 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -53,6 +53,7 @@ public class PartitionRangeReadCommand extends ReadCommand
     protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 
     private final DataRange dataRange;
+    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 
     public PartitionRangeReadCommand(boolean isDigest,
                                      boolean isForThrift,
@@ -172,7 +173,8 @@ public class PartitionRangeReadCommand extends ReadCommand
             for (Memtable memtable : view.memtables)
             {
                 @SuppressWarnings("resource") // We close on exception and on closing the
result returned by this method
-                UnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(),
dataRange(), isForThrift());
+                Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(),
dataRange(), isForThrift());
+                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
                 iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(),
nowInSec()) : iter);
             }
 
@@ -181,8 +183,9 @@ public class PartitionRangeReadCommand extends ReadCommand
                 @SuppressWarnings("resource") // We close on exception and on closing the
result returned by this method
                 UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(),
isForThrift());
                 iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(),
nowInSec()) : iter);
+                if (!sstable.isRepaired())
+                    oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
             }
-
             return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()),
cfs);
         }
         catch (RuntimeException | Error e)
@@ -200,6 +203,12 @@ public class PartitionRangeReadCommand extends ReadCommand
         }
     }
 
+    @Override
+    protected int oldestUnrepairedTombstone()
+    {
+        return oldestUnrepairedTombstone;
+    }
+
     private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter,
final ColumnFamilyStore cfs)
     {
         return new WrappingUnfilteredPartitionIterator(iter)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 913a1de..5c40492 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -249,6 +249,8 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup
orderGroup);
 
+    protected abstract int oldestUnrepairedTombstone();
+
     public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
     {
         return isDigestQuery()
@@ -426,7 +428,7 @@ public abstract class ReadCommand implements ReadQuery
     // are to some extend an artefact of compaction lagging behind and hence counting them
is somewhat unintuitive).
     protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator
iterator, ColumnFamilyStore cfs)
     {
-        return new PurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec()))
+        return new PurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(),
cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
         {
             protected long getMaxPurgeableTimestamp()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index b0958fc..518e299 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.utils.memory.HeapAllocator;
  */
 public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<ClusteringIndexNamesFilter>
 {
+    private int oldestUnrepairedDeletionTime = Integer.MAX_VALUE;
     protected SinglePartitionNamesCommand(boolean isDigest,
                                           boolean isForThrift,
                                           CFMetaData metadata,
@@ -84,6 +85,12 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
         return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), metadata(),
nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
     }
 
+    @Override
+    protected int oldestUnrepairedTombstone()
+    {
+        return oldestUnrepairedDeletionTime;
+    }
+
     protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean
copyOnHeap)
     {
         Tracing.trace("Acquiring sstable references");
@@ -107,7 +114,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
                 UnfilteredRowIterator clonedFilter = copyOnHeap
                                                    ? UnfilteredRowIterators.cloningIterator(iter,
HeapAllocator.instance)
                                                    : iter;
-                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter,
nowInSec()) : clonedFilter, result);
+                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter,
nowInSec()) : clonedFilter, result, false);
             }
         }
 
@@ -137,7 +144,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
                     continue;
 
                 sstablesIterated++;
-                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec())
: iter, result);
+                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec())
: iter, result, sstable.isRepaired());
             }
         }
 
@@ -175,8 +182,11 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
         return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
     }
 
-    private ArrayBackedPartition add(UnfilteredRowIterator iter, ArrayBackedPartition result)
+    private ArrayBackedPartition add(UnfilteredRowIterator iter, ArrayBackedPartition result,
boolean isRepaired)
     {
+        if (!isRepaired)
+            oldestUnrepairedDeletionTime = Math.min(oldestUnrepairedDeletionTime, iter.stats().minLocalDeletionTime);
+
         int maxRows = Math.max(clusteringIndexFilter().requestedRows().size(), 1);
         if (result == null)
             return ArrayBackedPartition.create(iter, maxRows);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
index bb9a35e..2dbf7b1 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -41,6 +41,8 @@ import org.apache.cassandra.utils.memory.HeapAllocator;
  */
 public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<ClusteringIndexSliceFilter>
 {
+    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
     public SinglePartitionSliceCommand(boolean isDigest,
                                        boolean isForThrift,
                                        CFMetaData metadata,
@@ -119,6 +121,12 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
         return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(),
nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
     }
 
+    @Override
+    protected int oldestUnrepairedTombstone()
+    {
+        return oldestUnrepairedTombstone;
+    }
+
     protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean
copyOnHeap)
     {
         Tracing.trace("Acquiring sstable references");
@@ -139,9 +147,9 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
                 UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(),
partition);
                 @SuppressWarnings("resource") // same as above
                 UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter,
HeapAllocator.instance) : iter;
+                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
                 iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied,
nowInSec()) : maybeCopied);
             }
-
             /*
              * We can't eliminate full sstables based on the timestamp of what we've already
read like
              * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp
< mostRecentTombstone
@@ -185,6 +193,9 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
                 sstable.incrementReadCount();
                 @SuppressWarnings("resource") // 'iter' is added to iterators which is closed
on exception, or through the closing of the final merged iterator
                 UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(),
columnFilter(), filter.isReversed(), isForThrift()));
+                if (!sstable.isRepaired())
+                    oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
                 iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec())
: iter);
                 mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
                 sstablesIterated++;
@@ -205,6 +216,8 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
                     if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
                     {
                         iterators.add(iter);
+                        if (!sstable.isRepaired())
+                            oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone,
sstable.getMinLocalDeletionTime());
                         includedDueToTombstones++;
                         sstablesIterated++;
                     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 4279f6e..d9c9ea3 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +38,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -64,6 +62,7 @@ public abstract class AbstractCompactionStrategy
     // disable range overlap check when deciding if an SSTable is candidate for tombstone
compaction (CASSANDRA-6563)
     protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
     protected static final String COMPACTION_ENABLED = "enabled";
+    public static final String ONLY_PURGE_REPAIRED_TOMBSTONES = "only_purge_repaired_tombstones";
 
     protected Map<String, String> options;
 
@@ -453,6 +452,7 @@ public abstract class AbstractCompactionStrategy
         uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
         uncheckedOptions.remove(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
         uncheckedOptions.remove(COMPACTION_ENABLED);
+        uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES);
         return uncheckedOptions;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 1e91dca..179d12d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -20,14 +20,13 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 
 import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -45,6 +44,7 @@ public class CompactionController implements AutoCloseable
     private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
 
     public final ColumnFamilyStore cfs;
+    private final boolean compactingRepaired;
     private Refs<SSTableReader> overlappingSSTables;
     private OverlapIterator<PartitionPosition, SSTableReader> overlapIterator;
     private final Iterable<SSTableReader> compacting;
@@ -56,12 +56,13 @@ public class CompactionController implements AutoCloseable
         this(cfs, null, maxValue);
     }
 
-    public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting,
 int gcBefore)
+    public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting,
int gcBefore)
     {
         assert cfs != null;
         this.cfs = cfs;
         this.gcBefore = gcBefore;
         this.compacting = compacting;
+        compactingRepaired = compacting != null && compacting.stream().allMatch(SSTableReader::isRepaired);
         refreshOverlaps();
     }
 
@@ -117,6 +118,9 @@ public class CompactionController implements AutoCloseable
         if (compacting == null)
             return Collections.<SSTableReader>emptySet();
 
+        if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones() &&
!Iterables.all(compacting, SSTableReader::isRepaired))
+            return Collections.emptySet();
+
         List<SSTableReader> candidates = new ArrayList<>();
 
         long minTimestamp = Long.MAX_VALUE;
@@ -177,6 +181,9 @@ public class CompactionController implements AutoCloseable
      */
     public long maxPurgeableTimestamp(DecoratedKey key)
     {
+        if (!compactingRepaired())
+            return Long.MIN_VALUE;
+
         long min = Long.MAX_VALUE;
         overlapIterator.update(key);
         for (SSTableReader sstable : overlapIterator.overlaps())
@@ -201,4 +208,9 @@ public class CompactionController implements AutoCloseable
         overlappingSSTables.release();
     }
 
+    public boolean compactingRepaired()
+    {
+        return !cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones() || compactingRepaired;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index a1a9d25..cab96fb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -252,7 +252,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements
Unfilte
 
         private PurgeIterator(UnfilteredPartitionIterator toPurge, CompactionController controller)
         {
-            super(toPurge, controller.gcBefore);
+            super(toPurge, controller.gcBefore, controller.compactingRepaired() ? Integer.MIN_VALUE
: Integer.MAX_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
             this.controller = controller;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 7204da0..f5097af 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -479,4 +479,9 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         return params;
     }
+
+    public boolean onlyPurgeRepairedTombstones()
+    {
+        return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
index 492fe1d..e53e17b 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
@@ -27,7 +27,7 @@ public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartiti
 
     private UnfilteredRowIterator next;
 
-    public PurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore)
+    public PurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore, int
oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
     {
         super(iterator);
         this.gcBefore = gcBefore;
@@ -35,6 +35,9 @@ public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartiti
         {
             public boolean shouldPurge(long timestamp, int localDeletionTime)
             {
+                if (onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
+                    return false;
+
                 return timestamp < getMaxPurgeableTimestamp() && localDeletionTime
< gcBefore;
             }
         };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
new file mode 100644
index 0000000..3a74029
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.AbstractRow;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RepairedDataTombstonesTest extends CQLTester
+{
+    @Test
+    public void compactionTest() throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t text, primary key (id, id2)) with
gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+        // insert a live row to make sure that the sstables are not dropped (we test dropping
in compactionDropExpiredSSTableTest() below)
+        execute("insert into %s (id, id2, t) values (999,999,'live')");
+        for (int i = 0; i < 10; i++)
+        {
+            execute("delete from %s where id=? and id2=?", 1, i);
+        }
+        flush();
+        SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+        repair(getCurrentColumnFamilyStore(), repairedSSTable);
+        Thread.sleep(2000);
+        execute("insert into %s (id, id2, t) values (999,999,'live')");
+        for (int i = 10; i < 20; i++)
+        {
+            execute("delete from %s where id=? and id2=?", 1, i);
+        }
+        flush();
+        Thread.sleep(1000);
+        // at this point we have 2 sstables, one repaired and one unrepaired. Both sstables
contain expired tombstones, but we should only drop the tombstones from the repaired sstable.
+        getCurrentColumnFamilyStore().forceMajorCompaction();
+        verify();
+        verify2(1);
+        assertEquals(2, Iterables.size(getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE)));
+
+    }
+
+    @Test
+    public void compactionDropExpiredSSTableTest() throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t text, primary key (id, id2)) with
gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+        for (int i = 0; i < 10; i++)
+        {
+            execute("delete from %s where id=? and id2=?", 1, i);
+        }
+        flush();
+        SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+        repair(getCurrentColumnFamilyStore(), repairedSSTable);
+        Thread.sleep(2000);
+        for (int i = 10; i < 20; i++)
+        {
+            execute("delete from %s where id=? and id2=?", 1, i);
+        }
+        flush();
+        Thread.sleep(1000);
+        getCurrentColumnFamilyStore().forceMajorCompaction();
+        verify();
+        verify2(1);
+        assertEquals(1, Iterables.size(getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE)));
+        assertFalse(getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next().isRepaired());
+
+    }
+
+    @Test
+    public void readTest() throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id,
id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+        for (int i = 0; i < 10; i++)
+        {
+            execute("update %s set t2=null where id=? and id2=?", 123, i);
+        }
+        flush();
+        SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+        repair(getCurrentColumnFamilyStore(), repairedSSTable);
+        Thread.sleep(2000);
+        for (int i = 10; i < 20; i++)
+        {
+            execute("update %s set t2=null where id=? and id2=?", 123, i);
+        }
+        flush();
+        // allow gcgrace to properly expire:
+        Thread.sleep(1000);
+        // make sure we only see the unrepaired tombstones, the other ones are expired and
can be purged
+        verify();
+        verify2(123);
+    }
+
+    @Test
+    public void readOnlyUnrepairedTest() throws Throwable
+    {
+        // make sure we keep all tombstones if we only have unrepaired data
+        createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id,
id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+        for (int i = 10; i < 20; i++)
+        {
+            execute("update %s set t2=null where id=? and id2=?", 123, i);
+        }
+        flush();
+
+        // allow gcgrace to properly expire:
+        Thread.sleep(1000);
+        verify();
+        verify2(123);
+    }
+
+
+    @Test
+    public void readTestRowTombstones() throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id,
id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+        for (int i = 0; i < 10; i++)
+        {
+            execute("delete from %s where id=? and id2=?", 1, i);
+        }
+        flush();
+        SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+        repair(getCurrentColumnFamilyStore(), repairedSSTable);
+        Thread.sleep(2000);
+        for (int i = 10; i < 20; i++)
+        {
+            execute("delete from %s where id=? and id2=?", 1, i);
+        }
+        flush();
+        Thread.sleep(1000);
+        verify();
+        verify2(1);
+    }
+
+    @Test
+    public void readTestPartitionTombstones() throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id,
id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+        for (int i = 0; i < 10; i++)
+        {
+            execute("delete from %s where id=?", i);
+        }
+        flush();
+        SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+        repair(getCurrentColumnFamilyStore(), repairedSSTable);
+        Thread.sleep(2000);
+        for (int i = 10; i < 20; i++)
+        {
+            execute("delete from %s where id=?", i);
+        }
+        flush();
+
+        Thread.sleep(1000);
+        ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore()).build();
+        int partitionsFound = 0;
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator
iterator = cmd.executeLocally(orderGroup))
+        {
+            while (iterator.hasNext())
+            {
+                partitionsFound++;
+                UnfilteredRowIterator rowIter = iterator.next();
+                int val = ByteBufferUtil.toInt(rowIter.partitionKey().getKey());
+                assertTrue("val=" + val, val >= 10 && val < 20);
+            }
+        }
+        assertEquals(10, partitionsFound);
+    }
+
+    @Test
+    public void readTestOldUnrepaired() throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id,
id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 10; i++)
+        {
+            execute("delete from %s where id=1 and id2=?", i);
+        }
+        flush();
+        SSTableReader oldSSTable = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+        Thread.sleep(2000);
+        for (int i = 10; i < 20; i++)
+        {
+            execute("delete from %s where id=1 and id2=?", i);
+        }
+        flush();
+        for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+            if (sstable != oldSSTable)
+                repair(getCurrentColumnFamilyStore(), sstable);
+        Thread.sleep(2000);
+        for (int i = 20; i < 30; i++)
+        {
+            execute("delete from %s where id=1 and id2=?", i);
+        }
+        flush();
+
+        Thread.sleep(2000);
+        // we will keep all tombstones since the oldest tombstones are unrepaired:
+        verify(30, 0, 30);
+        verify2(1, 30, 0, 30);
+    }
+
+    private void verify()
+    {
+        verify(10, 10, 20);
+    }
+
+    private void verify(int expectedRows, int minVal, int maxVal)
+    {
+        ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore()).build();
+        int foundRows = 0;
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator
iterator = cmd.executeLocally(orderGroup))
+        {
+            while (iterator.hasNext())
+            {
+                UnfilteredRowIterator rowIter = iterator.next();
+                if (!rowIter.partitionKey().equals(Util.dk(ByteBufferUtil.bytes(999)))) //
partition key 999 is 'live' and used to avoid sstables from being dropped
+                {
+                    while (rowIter.hasNext())
+                    {
+                        AbstractRow row = (AbstractRow) rowIter.next();
+                        for (int i = 0; i < row.clustering().size(); i++)
+                        {
+                            foundRows++;
+                            int val = ByteBufferUtil.toInt(row.clustering().get(i));
+                            assertTrue("val=" + val, val >= minVal && val <
maxVal);
+                        }
+                    }
+                }
+            }
+        }
+        assertEquals(expectedRows, foundRows);
+    }
+    private void verify2(int key)
+    {
+        verify2(key, 10, 10, 20);
+    }
+
+    private void verify2(int key, int expectedRows, int minVal, int maxVal)
+    {
+        ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore(), Util.dk(ByteBufferUtil.bytes(key))).build();
+        int foundRows = 0;
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator
iterator = cmd.executeLocally(orderGroup))
+        {
+            while (iterator.hasNext())
+            {
+                UnfilteredRowIterator rowIter = iterator.next();
+                while (rowIter.hasNext())
+                {
+                    AbstractRow row = (AbstractRow) rowIter.next();
+                    for (int i = 0; i < row.clustering().size(); i++)
+                    {
+                        foundRows++;
+                        int val = ByteBufferUtil.toInt(row.clustering().get(i));
+                        assertTrue("val=" + val, val >= minVal && val < maxVal);
+                    }
+                }
+            }
+        }
+        assertEquals(expectedRows, foundRows);
+    }
+
+    public static void repair(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
+    {
+        sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 1);
+        sstable.reloadSSTableMetadata();
+        cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
+    }
+}


Mime
View raw message