cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Track maximum ttl and use to expire entire sstables
Date Fri, 22 Mar 2013 10:37:28 GMT
Updated Branches:
  refs/heads/trunk 480a1a8f5 -> 4937ac7e5


Track maximum ttl and use to expire entire sstables

patch by krummas; reviewed by slebresne for CASSANDRA-5528


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4937ac7e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4937ac7e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4937ac7e

Branch: refs/heads/trunk
Commit: 4937ac7e5776cf72e4c3b266fbffb29ff97c807a
Parents: 480a1a8
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Fri Mar 22 10:27:43 2013 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Mar 22 11:35:52 2013 +0100

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/ColumnFamily.java |    4 +-
 .../db/compaction/AbstractCompactionStrategy.java  |    4 +
 .../db/compaction/CompactionController.java        |   89 ++++++++--
 .../cassandra/db/compaction/CompactionManager.java |    2 +-
 .../cassandra/db/compaction/CompactionTask.java    |   14 +-
 .../db/compaction/LazilyCompactedRow.java          |    3 +
 .../apache/cassandra/db/compaction/Scrubber.java   |    2 +-
 .../apache/cassandra/io/sstable/ColumnStats.java   |    5 +-
 .../apache/cassandra/io/sstable/Descriptor.java    |   15 +-
 .../cassandra/io/sstable/SSTableMetadata.java      |   23 ++-
 .../apache/cassandra/io/sstable/SSTableWriter.java |    4 +
 .../cassandra/streaming/IncomingStreamReader.java  |    2 +-
 .../cassandra/tools/SSTableMetadataViewer.java     |    1 +
 .../compaction/LeveledCompactionStrategyTest.java  |    7 +-
 .../cassandra/db/compaction/TTLExpiryTest.java     |  132 +++++++++++++
 .../cassandra/io/LazilyCompactedRowTest.java       |    7 +-
 .../cassandra/io/sstable/SSTableMetadataTest.java  |  143 +++++++++++++++
 .../apache/cassandra/streaming/BootstrapTest.java  |    2 +-
 18 files changed, 419 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index fd255d3..1509f9e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -306,15 +306,17 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
         long minTimestampSeen = deletionInfo() == DeletionInfo.LIVE ? Long.MAX_VALUE : deletionInfo().minTimestamp();
         long maxTimestampSeen = deletionInfo().maxTimestamp();
         StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
+        int maxLocalDeletionTime = Integer.MIN_VALUE;
 
         for (Column column : columns)
         {
             minTimestampSeen = Math.min(minTimestampSeen, column.minTimestamp());
             maxTimestampSeen = Math.max(maxTimestampSeen, column.maxTimestamp());
+            maxLocalDeletionTime = Math.max(maxLocalDeletionTime, column.getLocalDeletionTime());
             int deletionTime = column.getLocalDeletionTime();
             if (deletionTime < Integer.MAX_VALUE)
                 tombstones.update(deletionTime);
         }
-        return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, tombstones);
+        return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, maxLocalDeletionTime, tombstones);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 81b3a8f..e8472ea 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -219,6 +219,10 @@ public abstract class AbstractCompactionStrategy
             // there is no overlap, tombstones are safely droppable
             return true;
         }
+        else if (CompactionController.getFullyExpiredSSTables(cfs, Collections.singleton(sstable), overlaps, gcBefore).size() > 0)
+        {
+            return true;
+        }
         else
         {
             // what percentage of columns do we expect to compact outside of overlap?

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e4dca56..b81d6cb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -17,8 +17,10 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -30,6 +32,7 @@ import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
@@ -47,9 +50,11 @@ public class CompactionController
     public final ColumnFamilyStore cfs;
     private final DataTracker.SSTableIntervalTree overlappingTree;
     private final Set<SSTableReader> overlappingSSTables;
+    private final Set<SSTableReader> compacting;
 
     public final int gcBefore;
     public final int mergeShardBefore;
+
     private final Throttle throttle = new Throttle("Cassandra_Throttle", new Throttle.ThroughputFunction()
     {
         /** @return Instantaneous throughput target in bytes per millisecond. */
@@ -65,35 +70,91 @@ public class CompactionController
         }
     });
 
-    public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
-    {
-        this(cfs,
-             gcBefore,
-             cfs.getAndReferenceOverlappingSSTables(sstables));
-    }
-
     /**
      * Constructor that subclasses may use when overriding shouldPurge to not need overlappingTree
      */
     protected CompactionController(ColumnFamilyStore cfs, int maxValue)
     {
-        this(cfs, maxValue, null);
+        this(cfs, null, maxValue);
     }
 
-    private CompactionController(ColumnFamilyStore cfs,
-                                   int gcBefore,
-                                   Set<SSTableReader> overlappingSSTables)
+    public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting,  int gcBefore)
     {
         assert cfs != null;
         this.cfs = cfs;
         this.gcBefore = gcBefore;
+        this.compacting = compacting;
         // If we merge an old CounterId id, we must make sure that no further increment for that id are in an active memtable.
         // For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We
         // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
         // current 'stop all write during memtable switch' situation).
         this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
-        this.overlappingSSTables = overlappingSSTables == null ? Collections.<SSTableReader>emptySet() : overlappingSSTables;
-        overlappingTree = overlappingSSTables == null ? null : DataTracker.buildIntervalTree(overlappingSSTables);
+        Set<SSTableReader> overlapping = compacting == null ? null : cfs.getAndReferenceOverlappingSSTables(compacting);
+        this.overlappingSSTables = overlapping == null ? Collections.<SSTableReader>emptySet() : overlapping;
+        this.overlappingTree = overlapping == null ? null : DataTracker.buildIntervalTree(overlapping);
+    }
+
+    public Set<SSTableReader> getFullyExpiredSSTables()
+    {
+        return getFullyExpiredSSTables(cfs, compacting, overlappingSSTables, gcBefore);
+    }
+
+    /**
+     * Finds expired sstables
+     *
+     * works something like this;
+     * 1. find "global" minTimestamp of overlapping sstables (excluding the possibly droppable ones)
+     * 2. build a list of candidates to be dropped
+     * 3. sort the candidate list, biggest maxTimestamp first in list
+     * 4. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp) and it is included in the compaction
+     *    - if not droppable, update global minTimestamp and remove from candidates
+     * 5. return candidates.
+     *
+     * @param cfStore
+     * @param compacting we take the drop-candidates from this set, it is usually the sstables included in the compaction
+     * @param overlapping the sstables that overlap the ones in compacting.
+     * @param gcBefore
+     * @return
+     */
+    public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfStore, Set<SSTableReader> compacting, Set<SSTableReader> overlapping, int gcBefore)
+    {
+        logger.debug("Checking droppable sstables in {}", cfStore);
+        List<SSTableReader> candidates = new ArrayList<SSTableReader>();
+
+        long minTimestamp = Integer.MAX_VALUE;
+
+        for (SSTableReader sstable : overlapping)
+            minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+
+        for (SSTableReader candidate : compacting)
+        {
+            if (candidate.getSSTableMetadata().maxLocalDeletionTime < gcBefore)
+                candidates.add(candidate);
+            else
+                minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
+        }
+
+        // we still need to keep candidates that might shadow something in a
+        // non-candidate sstable. And if we remove a sstable from the candidates, we
+        // must take it's timestamp into account (hence the sorting below).
+        Collections.sort(candidates, SSTable.maxTimestampComparator);
+
+        Iterator<SSTableReader> iterator = candidates.iterator();
+        while (iterator.hasNext())
+        {
+            SSTableReader candidate = iterator.next();
+            if (candidate.getMaxTimestamp() >= minTimestamp)
+            {
+                minTimestamp = Math.min(candidate.getMinTimestamp(), minTimestamp);
+                iterator.remove();
+            }
+            else
+            {
+               logger.debug("Dropping expired SSTable {} (maxLocalDeletionTime={}, gcBefore={})",
+                        candidate, candidate.getSSTableMetadata().maxLocalDeletionTime, gcBefore);
+            }
+        }
+        return new HashSet<SSTableReader>(candidates);
     }
 
     public String getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b2633cb..335f903 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -446,7 +446,7 @@ public class CompactionManager implements CompactionManagerMBean
                 continue;
             }
 
-            CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable), getDefaultGcBefore(cfs));
+            CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs));
             long startTime = System.currentTimeMillis();
 
             long totalkeysWritten = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d1361c1..748556a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -21,8 +21,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,6 +106,8 @@ public class CompactionTask extends AbstractCompactionTask
         UUID taskId = SystemTable.startCompaction(cfs, toCompact);
 
         CompactionController controller = new CompactionController(cfs, toCompact, gcBefore);
+        Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
+
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
@@ -114,15 +116,15 @@ public class CompactionTask extends AbstractCompactionTask
         long startTime = System.currentTimeMillis();
         long totalkeysWritten = 0;
 
-        long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact, cfs.metadata));
-        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / strategy.getMaxSSTableSize());
+        long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
+        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableSize());
         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : " + keysPerSSTable);
 
         AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
-                                      ? new ParallelCompactionIterable(compactionType, strategy.getScanners(toCompact), controller)
-                                      : new CompactionIterable(compactionType, strategy.getScanners(toCompact), controller);
+                                      ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller)
+                                      : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
 
@@ -170,7 +172,7 @@ public class CompactionTask extends AbstractCompactionTask
 
                 if (DatabaseDescriptor.getPreheatKeyCache())
                 {
-                    for (SSTableReader sstable : toCompact)
+                    for (SSTableReader sstable : actuallyCompact)
                     {
                         if (sstable.getCachedPosition(row.key, false) != null)
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index aa4c899..7418a18 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -99,6 +99,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, 
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen, 
                                       reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
+                                      reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones
         );
         columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
@@ -242,6 +243,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         int columns = 0;
         long minTimestampSeen = Long.MAX_VALUE;
         long maxTimestampSeen = Long.MIN_VALUE;
+        int maxLocalDeletionTimeSeen = Integer.MIN_VALUE;
         StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
 
         public void reduce(OnDiskAtom current)
@@ -297,6 +299,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 columns++;
                 minTimestampSeen = Math.min(minTimestampSeen, reduced.minTimestamp());
                 maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
+                maxLocalDeletionTimeSeen = Math.max(maxLocalDeletionTimeSeen, reduced.getLocalDeletionTime());
                 int deletionTime = reduced.getLocalDeletionTime();
                 if (deletionTime < Integer.MAX_VALUE)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 9a3bd08..5e77a46 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -86,7 +86,7 @@ public class Scrubber implements Closeable
         // If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes.
         this.controller = isOffline
                         ? new ScrubController(cfs)
-                        : new CompactionController(cfs, Collections.singletonList(sstable), CompactionManager.getDefaultGcBefore(cfs));
+                        : new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs));
         this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
         this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub,cfs.metadata)));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
index 12ef534..3107e2e 100644
--- a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
@@ -30,14 +30,15 @@ public class ColumnStats
     /** the largest (client-supplied) timestamp in the row */
     public final long minTimestamp;
     public final long maxTimestamp;
-
+    public final int maxLocalDeletionTime;
     /** histogram of tombstone drop time */
     public final StreamingHistogram tombstoneHistogram;
 
-    public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, StreamingHistogram tombstoneHistogram)
+    public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, int maxLocalDeletionTime, StreamingHistogram tombstoneHistogram)
     {
         this.minTimestamp = minTimestamp;
         this.maxTimestamp = maxTimestamp;
+        this.maxLocalDeletionTime = maxLocalDeletionTime;
         this.columnCount = columnCount;
         this.tombstoneHistogram = tombstoneHistogram;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index f6b8c13..e2c7d8b 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -47,7 +47,7 @@ public class Descriptor
     public static class Version
     {
         // This needs to be at the begining for initialization sake
-        private static final String current_version = "ic";
+        private static final String current_version = "ja";
 
         public static final Version LEGACY = new Version("a"); // "pre-history"
         // b (0.7.0): added version to sstable filenames
@@ -66,10 +66,11 @@ public class Descriptor
         //             records estimated histogram of deletion times in tombstones
         //             bloom filter (keys and columns) upgraded to Murmur3
         // ib (1.2.1): tracks min client timestamp in metadata component
-        // ja (1.3.0): super columns are serialized as composites
-        //             (note that there is no real format change, this is mostly a marker to know if we should expect super
-        //             columns or not. We do need a major version bump however, because we should not allow streaming of
-        //             super columns into this new format)
+        // ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
+        //               this is mostly a marker to know if we should expect super columns or not. We do need
+        //               a major version bump however, because we should not allow streaming of super columns
+        //               into this new format)
+        //             tracks max local deletiontime in sstable metadata
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -90,6 +91,7 @@ public class Descriptor
         public final FilterFactory.Type filterType;
         public final boolean hasAncestors;
         public final boolean hasSuperColumns;
+        public final boolean tracksMaxLocalDeletionTime;
 
         public Version(String version)
         {
@@ -102,6 +104,7 @@ public class Descriptor
             hasPartitioner = version.compareTo("hc") >= 0;
             tracksMaxTimestamp = version.compareTo("hd") >= 0;
             tracksMinTimestamp = version.compareTo("ib") >= 0;
+            tracksMaxLocalDeletionTime = version.compareTo("ja") >= 0;
             hasAncestors = version.compareTo("he") >= 0;
             metadataIncludesModernReplayPosition = version.compareTo("hf") >= 0;
             tracksTombstones = version.compareTo("ia") >= 0;
@@ -113,7 +116,7 @@ public class Descriptor
                 filterType = FilterFactory.Type.MURMUR2;
             else
                 filterType = FilterFactory.Type.MURMUR3;
-            hasSuperColumns = version.compareTo("ib") < 0;
+            hasSuperColumns = version.compareTo("ja") < 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index bd28ed1..e7a4534 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.utils.EstimatedHistogram;
  *  - estimated column count histogram
  *  - replay position
  *  - max column timestamp
+ *  - max local deletion time
  *  - compression ratio
  *  - partitioner
  *  - generations of sstables from which this sstable was compacted, if any
@@ -53,6 +54,8 @@ public class SSTableMetadata
     public final ReplayPosition replayPosition;
     public final long minTimestamp;
     public final long maxTimestamp;
+    public final int maxLocalDeletionTime;
+
     public final double compressionRatio;
     public final String partitioner;
     public final Set<Integer> ancestors;
@@ -66,6 +69,7 @@ public class SSTableMetadata
              ReplayPosition.NONE,
              Long.MAX_VALUE,
              Long.MIN_VALUE,
+             Integer.MAX_VALUE,
              NO_COMPRESSION_RATIO,
              null,
              Collections.<Integer>emptySet(),
@@ -74,13 +78,14 @@ public class SSTableMetadata
     }
 
     private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long minTimestamp,
-            long maxTimestamp, double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime, int sstableLevel)
+            long maxTimestamp, int maxLocalDeletionTime, double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime, int sstableLevel)
     {
         this.estimatedRowSize = rowSizes;
         this.estimatedColumnCount = columnCounts;
         this.replayPosition = replayPosition;
         this.minTimestamp = minTimestamp;
         this.maxTimestamp = maxTimestamp;
+        this.maxLocalDeletionTime = maxLocalDeletionTime;
         this.compressionRatio = cr;
         this.partitioner = partitioner;
         this.ancestors = ancestors;
@@ -112,6 +117,7 @@ public class SSTableMetadata
                                    metadata.replayPosition,
                                    metadata.minTimestamp,
                                    metadata.maxTimestamp,
+                                   metadata.maxLocalDeletionTime,
                                    metadata.compressionRatio,
                                    metadata.partitioner,
                                    metadata.ancestors,
@@ -169,6 +175,7 @@ public class SSTableMetadata
         protected ReplayPosition replayPosition = ReplayPosition.NONE;
         protected long minTimestamp = Long.MAX_VALUE;
         protected long maxTimestamp = Long.MIN_VALUE;
+        protected int maxLocalDeletionTime = Integer.MIN_VALUE;
         protected double compressionRatio = NO_COMPRESSION_RATIO;
         protected Set<Integer> ancestors = new HashSet<Integer>();
         protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
@@ -208,6 +215,11 @@ public class SSTableMetadata
             maxTimestamp = Math.max(maxTimestamp, potentialMax);
         }
 
+        public void updateMaxLocalDeletionTime(int maxLocalDeletionTime)
+        {
+            this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
+        }
+
         public SSTableMetadata finalizeMetadata(String partitioner)
         {
             return new SSTableMetadata(estimatedRowSize,
@@ -215,6 +227,7 @@ public class SSTableMetadata
                                        replayPosition,
                                        minTimestamp,
                                        maxTimestamp,
+                                       maxLocalDeletionTime,
                                        compressionRatio,
                                        partitioner,
                                        ancestors,
@@ -257,6 +270,7 @@ public class SSTableMetadata
              * that in this case we will not use EchoedRow, since CompactionControler.needsDeserialize() will be true).
             */
             updateMaxTimestamp(stats.maxTimestamp);
+            updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
             addRowSize(size);
             addColumnCount(stats.columnCount);
             mergeTombstoneHistogram(stats.tombstoneHistogram);
@@ -283,6 +297,7 @@ public class SSTableMetadata
             ReplayPosition.serializer.serialize(sstableStats.replayPosition, dos);
             dos.writeLong(sstableStats.minTimestamp);
             dos.writeLong(sstableStats.maxTimestamp);
+            dos.writeInt(sstableStats.maxLocalDeletionTime);
             dos.writeDouble(sstableStats.compressionRatio);
             dos.writeUTF(sstableStats.partitioner);
             dos.writeInt(sstableStats.ancestors.size());
@@ -314,6 +329,8 @@ public class SSTableMetadata
                 dos.writeLong(sstableStats.minTimestamp);
             if (legacyDesc.version.tracksMaxTimestamp)
                 dos.writeLong(sstableStats.maxTimestamp);
+            if (legacyDesc.version.tracksMaxLocalDeletionTime)
+                dos.writeInt(sstableStats.maxLocalDeletionTime);
             if (legacyDesc.version.hasCompressionRatio)
                 dos.writeDouble(sstableStats.compressionRatio);
             if (legacyDesc.version.hasPartitioner)
@@ -379,6 +396,8 @@ public class SSTableMetadata
             long maxTimestamp = desc.version.containsTimestamp() ? dis.readLong() : Long.MIN_VALUE;
             if (!desc.version.tracksMaxTimestamp) // see javadoc to Descriptor.containsTimestamp
                 maxTimestamp = Long.MAX_VALUE;
+            int maxLocalDeletionTime = desc.version.tracksMaxLocalDeletionTime ? dis.readInt() : Integer.MAX_VALUE;
+
             double compressionRatio = desc.version.hasCompressionRatio
                                     ? dis.readDouble()
                                     : NO_COMPRESSION_RATIO;
@@ -395,7 +414,7 @@ public class SSTableMetadata
             if (loadSSTableLevel && dis.available() > 0)
                 sstableLevel = dis.readInt();
 
-            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, minTimestamp, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram, sstableLevel);
+            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, minTimestamp, maxTimestamp, maxLocalDeletionTime, compressionRatio, partitioner, ancestors, tombstoneHistogram, sstableLevel);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 80316e8..d4943e1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -240,6 +240,7 @@ public class SSTableWriter extends SSTable
         // deserialize each column to obtain maxTimestamp and immediately serialize it.
         long minTimestamp = Long.MAX_VALUE;
         long maxTimestamp = Long.MIN_VALUE;
+        int maxLocalDeletionTime = Integer.MIN_VALUE;
         StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
         ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
         cf.delete(deletionInfo);
@@ -261,6 +262,8 @@ public class SSTableWriter extends SSTable
             }
             minTimestamp = Math.min(minTimestamp, atom.minTimestamp());
             maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
+            maxLocalDeletionTime = Math.max(maxLocalDeletionTime, atom.getLocalDeletionTime());
+
             try
             {
                 columnIndexer.add(atom); // This write the atom on disk too
@@ -275,6 +278,7 @@ public class SSTableWriter extends SSTable
                 : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
         sstableMetadataCollector.updateMinTimestamp(minTimestamp);
         sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
+        sstableMetadataCollector.updateMaxLocalDeletionTime(maxLocalDeletionTime);
         sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
         sstableMetadataCollector.addColumnCount(columnCount);
         sstableMetadataCollector.mergeTombstoneHistogram(tombstones);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 8dca4d6..de69fc6 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -134,7 +134,7 @@ public class IncomingStreamReader
         ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
         DecoratedKey key;
         SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
-        CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MIN_VALUE);
+        CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptySet(), Integer.MIN_VALUE);
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 03f4887..ddfe69c 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -49,6 +49,7 @@ public class SSTableMetadataViewer
             out.printf("SSTable: %s%n", descriptor);
             out.printf("Partitioner: %s%n", metadata.partitioner);
             out.printf("Maximum timestamp: %s%n", metadata.maxTimestamp);
+            out.printf("SSTable max local deletion time: %s%n", metadata.maxLocalDeletionTime);
             out.printf("Compression ratio: %s%n", metadata.compressionRatio);
             out.printf("Estimated droppable tombstones: %s%n", metadata.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
             out.printf("SSTable Level: %d%n", metadata.sstableLevel);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index f9dff11..0373862 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db.compaction;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import com.google.common.collect.Iterables;
 import org.junit.Test;
@@ -168,19 +170,20 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
             store.forceMajorCompaction();
             Thread.sleep(200);
         }
-
+        Set<SSTableReader> changedSSTables = new HashSet<SSTableReader>();
         for (SSTableReader s : table.getColumnFamilyStore(cfname).getSSTables())
         {
             assertTrue(s.getSSTableLevel() != 6);
             strat.manifest.remove(s);
             LeveledManifest.mutateLevel(s.getSSTableMetadata(), s.descriptor, s.descriptor.filenameFor(Component.STATS), 6);
             s.reloadSSTableMetadata();
+            changedSSTables.add(s);
             strat.manifest.add(s);
         }
 
         for(SSTableReader s : table.getColumnFamilyStore(cfname).getSSTables())
         {
-            assertEquals(6, s.getSSTableLevel());
+            assertTrue(changedSSTables.contains(s) && s.getSSTableLevel() == 6);
         }
 
         int [] levels = strat.manifest.getAllLevelSize();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
new file mode 100644
index 0000000..b01b806
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -0,0 +1,132 @@
+package org.apache.cassandra.db.compaction;
+
+
+import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class TTLExpiryTest extends SchemaLoader
+{
+    @Test
+    public void testSimpleExpire() throws ExecutionException, InterruptedException
+    {
+        ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Standard1");
+        cfs.disableAutoCompaction();
+        cfs.metadata.gcGraceSeconds(0);
+        long timestamp = System.currentTimeMillis();
+        RowMutation rm = new RowMutation("Keyspace1", Util.dk("ttl").key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col"),
+               ByteBufferUtil.EMPTY_BYTE_BUFFER,
+               timestamp,
+               1);
+        rm.add("Standard1", ByteBufferUtil.bytes("col7"),
+               ByteBufferUtil.EMPTY_BYTE_BUFFER,
+               timestamp,
+               1);
+
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm = new RowMutation("Keyspace1", Util.dk("ttl").key);
+                rm.add("Standard1", ByteBufferUtil.bytes("col2"),
+                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                       timestamp,
+                       1);
+                rm.apply();
+        cfs.forceBlockingFlush();
+        rm = new RowMutation("Keyspace1", Util.dk("ttl").key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col3"),
+                   ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                   timestamp,
+                   1);
+        rm.apply();
+        cfs.forceBlockingFlush();
+        rm = new RowMutation("Keyspace1", Util.dk("ttl").key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col311"),
+                   ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                   timestamp,
+                   1);
+        rm.apply();
+
+        cfs.forceBlockingFlush();
+        Thread.sleep(2000); // wait for ttl to expire
+        assertEquals(4, cfs.getSSTables().size());
+        cfs.enableAutoCompaction();
+        FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
+        assertEquals(0, cfs.getSSTables().size());
+    }
+
+    @Test
+    public void testNoExpire() throws ExecutionException, InterruptedException
+    {
+        ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Standard1");
+        cfs.disableAutoCompaction();
+        cfs.metadata.gcGraceSeconds(0);
+        long timestamp = System.currentTimeMillis();
+        RowMutation rm = new RowMutation("Keyspace1", Util.dk("ttl").key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col"),
+               ByteBufferUtil.EMPTY_BYTE_BUFFER,
+               timestamp,
+               1);
+        rm.add("Standard1", ByteBufferUtil.bytes("col7"),
+               ByteBufferUtil.EMPTY_BYTE_BUFFER,
+               timestamp,
+               1);
+
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm = new RowMutation("Keyspace1", Util.dk("ttl").key);
+                rm.add("Standard1", ByteBufferUtil.bytes("col2"),
+                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                       timestamp,
+                       1);
+                rm.apply();
+        cfs.forceBlockingFlush();
+        rm = new RowMutation("Keyspace1", Util.dk("ttl").key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col3"),
+                   ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                   timestamp,
+                   1);
+        rm.apply();
+        cfs.forceBlockingFlush();
+        DecoratedKey noTTLKey = Util.dk("nottl");
+        rm = new RowMutation("Keyspace1", noTTLKey.key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col311"),
+                   ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                   timestamp);
+        rm.apply();
+        cfs.forceBlockingFlush();
+        Thread.sleep(2000); // wait for ttl to expire
+        assertEquals(4, cfs.getSSTables().size());
+        cfs.enableAutoCompaction();
+        FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
+        assertEquals(1, cfs.getSSTables().size());
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Standard1", new IdentityQueryFilter()));
+        assertTrue(scanner.hasNext());
+        while(scanner.hasNext())
+        {
+            OnDiskAtomIterator iter = scanner.next();
+            assertEquals(noTTLKey, iter.getKey());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
index eff932c..820565a 100644
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -71,7 +72,7 @@ public class LazilyCompactedRowTest extends SchemaLoader
                                        new PreCompactingController(cfs, sstables, gcBefore, false));
         AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN,
                                                                              strategy.getScanners(sstables),
-                                                                             new CompactionController(cfs, sstables, gcBefore),
+                                                                             new CompactionController(cfs, new HashSet<SSTableReader>(sstables), gcBefore),
                                                                              0);
         assertBytes(cfs, sstables, eager, parallel);
     }
@@ -297,7 +298,7 @@ public class LazilyCompactedRowTest extends SchemaLoader
     {
         public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
         {
-            super(cfs, sstables, gcBefore);
+            super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
         }
 
         @Override
@@ -311,7 +312,7 @@ public class LazilyCompactedRowTest extends SchemaLoader
     {
         public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
         {
-            super(cfs, sstables, gcBefore);
+            super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
new file mode 100644
index 0000000..6ab995f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
@@ -0,0 +1,143 @@
+package org.apache.cassandra.io.sstable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableMetadataTest extends SchemaLoader
+{
+    @Test
+    public void testTrackMaxDeletionTime() throws ExecutionException, InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+        long timestamp = System.currentTimeMillis();
+        for(int i = 0; i < 10; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i));
+            RowMutation rm = new RowMutation("Keyspace1", key.key);
+            for (int j = 0; j < 10; j++)
+                rm.add("Standard1", ByteBufferUtil.bytes(Integer.toString(j)),
+                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                       timestamp,
+                       10 + j);
+            rm.apply();
+        }
+        RowMutation rm = new RowMutation("Keyspace1", Util.dk("longttl").key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col"),
+               ByteBufferUtil.EMPTY_BYTE_BUFFER,
+               timestamp,
+               10000);
+        rm.apply();
+        store.forceBlockingFlush();
+        assertEquals(1, store.getSSTables().size());
+        int ttltimestamp = (int)(System.currentTimeMillis()/1000);
+        int firstDelTime = 0;
+        for(SSTableReader sstable : store.getSSTables())
+        {
+            firstDelTime = sstable.getSSTableMetadata().maxLocalDeletionTime;
+            assertEquals(ttltimestamp + 10000, firstDelTime, 10);
+
+        }
+        rm = new RowMutation("Keyspace1", Util.dk("longttl2").key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col"),
+               ByteBufferUtil.EMPTY_BYTE_BUFFER,
+               timestamp,
+               20000);
+        rm.apply();
+        ttltimestamp = (int) (System.currentTimeMillis()/1000);
+        store.forceBlockingFlush();
+        assertEquals(2, store.getSSTables().size());
+        List<SSTableReader> sstables = new ArrayList<SSTableReader>(store.getSSTables());
+        if(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime < sstables.get(1).getSSTableMetadata().maxLocalDeletionTime)
+        {
+            assertEquals(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime, firstDelTime);
+            assertEquals(sstables.get(1).getSSTableMetadata().maxLocalDeletionTime, ttltimestamp + 20000, 10);
+        }
+        else
+        {
+            assertEquals(sstables.get(1).getSSTableMetadata().maxLocalDeletionTime, firstDelTime);
+            assertEquals(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime, ttltimestamp + 20000, 10);
+        }
+
+        Util.compact(store, store.getSSTables());
+        assertEquals(1, store.getSSTables().size());
+        for(SSTableReader sstable : store.getSSTables())
+        {
+            assertEquals(sstable.getSSTableMetadata().maxLocalDeletionTime, ttltimestamp + 20000, 10);
+        }
+    }
+
+    /**
+     * 1. create a row with columns with ttls, 5x100 and 1x1000
+     * 2. flush, verify (maxLocalDeletionTime = time+1000)
+     * 3. delete column with ttl=1000
+     * 4. flush, verify the new sstable (maxLocalDeletionTime = ~now)
+     * 5. compact
+     * 6. verify resulting sstable has maxLocalDeletionTime = time + 100.
+     *
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithDeletes() throws ExecutionException, InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard2");
+        long timestamp = System.currentTimeMillis();
+        DecoratedKey key = Util.dk("deletetest");
+        RowMutation rm = new RowMutation("Keyspace1", key.key);
+        for (int i = 0; i<5; i++)
+            rm.add("Standard2", ByteBufferUtil.bytes("deletecolumn"+i),
+                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                       timestamp,
+                       100);
+        rm.add("Standard2", ByteBufferUtil.bytes("todelete"),
+                   ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                   timestamp,
+                   1000);
+        rm.apply();
+        store.forceBlockingFlush();
+        assertEquals(1,store.getSSTables().size());
+        int ttltimestamp = (int) (System.currentTimeMillis()/1000);
+        int firstMaxDelTime = 0;
+        for(SSTableReader sstable : store.getSSTables())
+        {
+            firstMaxDelTime = sstable.getSSTableMetadata().maxLocalDeletionTime;
+            assertEquals(ttltimestamp + 1000, firstMaxDelTime, 10);
+        }
+        rm = new RowMutation("Keyspace1", key.key);
+        rm.delete("Standard2", ByteBufferUtil.bytes("todelete"), timestamp + 1);
+        rm.apply();
+        store.forceBlockingFlush();
+        assertEquals(2,store.getSSTables().size());
+        boolean foundDelete = false;
+        for(SSTableReader sstable : store.getSSTables())
+        {
+            if(sstable.getSSTableMetadata().maxLocalDeletionTime != firstMaxDelTime)
+            {
+                assertEquals(sstable.getSSTableMetadata().maxLocalDeletionTime, ttltimestamp, 10);
+                foundDelete = true;
+            }
+        }
+        assertTrue(foundDelete);
+        Util.compact(store, store.getSSTables());
+        assertEquals(1,store.getSSTables().size());
+        for(SSTableReader sstable : store.getSSTables())
+        {
+            assertEquals(ttltimestamp + 100, sstable.getSSTableMetadata().maxLocalDeletionTime, 10);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4937ac7e/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java b/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
index ab7296a..1ecdf2d 100644
--- a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
+++ b/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
@@ -36,7 +36,7 @@ public class BootstrapTest extends SchemaLoader
     @Test
     public void testGetNewNames() throws IOException
     {
-        Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Keyspace1-Standard1-ic-500-Data.db").toString());
+        Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Keyspace1-Standard1-ja-500-Data.db").toString());
         // assert !desc.isLatestVersion; // minimum compatible version -- for now it is the latest as well
         PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(Pair.create(0L, 1L)), OperationType.BOOTSTRAP);
 


Mime
View raw message