cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [08/20] cassandra git commit: Fix commit log replay after out-of-order flush completion.
Date Thu, 12 May 2016 13:23:13 GMT
Fix commit log replay after out-of-order flush completion.

Patch by Benedict Elliott Smith; reviewed by Branimir Lambov for
CASSANDRA-9669


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/849a4386
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/849a4386
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/849a4386

Branch: refs/heads/trunk
Commit: 849a438690aa97a361227781108cc90355dcbcd9
Parents: e614433
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Wed Sep 9 15:25:59 2015 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu May 12 15:17:15 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 258 +++++++++++--------
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 218 +++++++---------
 .../db/commitlog/CommitLogReplayer.java         |  58 +++--
 .../cassandra/db/commitlog/ReplayPosition.java  |  93 ++++---
 .../compaction/AbstractCompactionStrategy.java  |   3 -
 .../cassandra/db/compaction/Scrubber.java       |   2 -
 .../apache/cassandra/db/lifecycle/Tracker.java  |  47 +++-
 .../org/apache/cassandra/db/lifecycle/View.java |  38 ++-
 .../io/sstable/format/SSTableReader.java        |   5 -
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  10 +-
 .../io/sstable/format/big/BigTableWriter.java   |   2 +-
 .../io/sstable/metadata/CompactionMetadata.java |   4 +-
 .../metadata/IMetadataComponentSerializer.java  |   4 +-
 .../sstable/metadata/IMetadataSerializer.java   |   3 +-
 .../metadata/LegacyMetadataSerializer.java      |  15 +-
 .../io/sstable/metadata/MetadataCollector.java  |  36 ++-
 .../io/sstable/metadata/MetadataSerializer.java |   9 +-
 .../io/sstable/metadata/StatsMetadata.java      |  41 ++-
 .../io/sstable/metadata/ValidationMetadata.java |   4 +-
 .../cassandra/io/util/DiskAwareRunnable.java    |  42 ---
 .../cassandra/streaming/StreamReceiveTask.java  |   3 -
 .../cassandra/tools/SSTableMetadataViewer.java  |   3 +-
 .../db/commitlog/CommitLogStressTest.java       |   2 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  19 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |   2 +-
 .../CompressedRandomAccessReaderTest.java       |   6 +-
 .../CompressedSequentialWriterTest.java         |   2 +-
 .../metadata/MetadataSerializerTest.java        |  74 +++++-
 .../cassandra/utils/IntervalTreeTest.java       |   2 +-
 33 files changed, 600 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2bb3518..dfcad10 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,5 @@
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+
 2.2.7
  * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
  * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 513a138..88e22c0 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -146,6 +146,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    @VisibleForTesting
+    public static volatile ColumnFamilyStore discardFlushResults;
+
     public final Keyspace keyspace;
     public final String name;
     public final CFMetaData metadata;
@@ -867,14 +870,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      *
      * @param memtable
      */
-    public Future<?> switchMemtableIfCurrent(Memtable memtable)
+    public ListenableFuture<ReplayPosition> switchMemtableIfCurrent(Memtable memtable)
     {
         synchronized (data)
         {
             if (data.getView().getCurrentMemtable() == memtable)
                 return switchMemtable();
         }
-        return Futures.immediateFuture(null);
+        return waitForFlushes();
     }
 
     /*
@@ -884,14 +887,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL
      * marked clean up to the position owned by the Memtable.
      */
-    public ListenableFuture<?> switchMemtable()
+    public ListenableFuture<ReplayPosition> switchMemtable()
     {
         synchronized (data)
         {
             logFlush();
             Flush flush = new Flush(false);
             flushExecutor.execute(flush);
-            ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null);
+            ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(flush.postFlush);
             postFlushExecutor.submit(task);
             return task;
         }
@@ -926,77 +929,93 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
 
-    public ListenableFuture<?> forceFlush()
+    /**
+     * Flush if there is unflushed data in the memtables
+     *
+     * @return a Future yielding the commit log position that can be guaranteed to have been successfully written
+     *         to sstables for this table once the future completes
+     */
+    public ListenableFuture<ReplayPosition> forceFlush()
     {
-        return forceFlush(null);
+        Memtable current = data.getView().getCurrentMemtable();
+        for (ColumnFamilyStore cfs : concatWithIndexes())
+            if (!cfs.data.getView().getCurrentMemtable().isClean())
+                return switchMemtableIfCurrent(current);
+        return waitForFlushes();
     }
 
     /**
      * Flush if there is unflushed data that was written to the CommitLog before @param flushIfDirtyBefore
-     * (inclusive).  If @param flushIfDirtyBefore is null, flush if there is any unflushed data.
+     * (inclusive).
      *
-     * @return a Future such that when the future completes, all data inserted before forceFlush was called,
-     * will be flushed.
+     * @return a Future yielding the commit log position that can be guaranteed to have been successfully written
+     *         to sstables for this table once the future completes
      */
-    public ListenableFuture<?> forceFlush(ReplayPosition flushIfDirtyBefore)
+    public ListenableFuture<ReplayPosition> forceFlush(ReplayPosition flushIfDirtyBefore)
     {
-        // we synchronize on the data tracker to ensure we don't race against other calls to switchMemtable(),
-        // unnecessarily queueing memtables that are about to be made clean
-        synchronized (data)
-        {
-            // during index build, 2ary index memtables can be dirty even if parent is not.  if so,
-            // we want to flush the 2ary index ones too.
-            boolean clean = true;
-            for (ColumnFamilyStore cfs : concatWithIndexes())
-                clean &= cfs.data.getView().getCurrentMemtable().isCleanAfter(flushIfDirtyBefore);
+        // we don't loop through the remaining memtables since here we only care about commit log dirtiness
+        // and this does not vary between a table and its table-backed indexes
+        Memtable current = data.getView().getCurrentMemtable();
+        if (current.mayContainDataBefore(flushIfDirtyBefore))
+            return switchMemtableIfCurrent(current);
+        return waitForFlushes();
+    }
 
-            if (clean)
+    /**
+     * @return a Future yielding the commit log position that can be guaranteed to have been successfully written
+     *         to sstables for this table once the future completes
+     */
+    private ListenableFuture<ReplayPosition> waitForFlushes()
+    {
+        // we grab the current memtable; once any preceding memtables have flushed, we know its
+        // commitLogLowerBound has been set (as this it is set with the upper bound of the preceding memtable)
+        final Memtable current = data.getView().getCurrentMemtable();
+        ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(new Callable<ReplayPosition>()
+        {
+            public ReplayPosition call()
             {
-                // We could have a memtable for this column family that is being
-                // flushed. Make sure the future returned wait for that so callers can
-                // assume that any data inserted prior to the call are fully flushed
-                // when the future returns (see #5241).
-                ListenableFutureTask<?> task = ListenableFutureTask.create(new Runnable()
-                {
-                    public void run()
-                    {
-                        logger.trace("forceFlush requested but everything is clean in {}", name);
-                    }
-                }, null);
-                postFlushExecutor.execute(task);
-                return task;
+                logger.debug("forceFlush requested but everything is clean in {}", name);
+                return current.getCommitLogLowerBound();
             }
-
-            return switchMemtable();
-        }
+        });
+        postFlushExecutor.execute(task);
+        return task;
     }
 
-    public void forceBlockingFlush()
+    public ReplayPosition forceBlockingFlush()
     {
-        FBUtilities.waitOnFuture(forceFlush());
+        return FBUtilities.waitOnFuture(forceFlush());
     }
 
     /**
      * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
      * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
      */
-    private final class PostFlush implements Runnable
+    private final class PostFlush implements Callable<ReplayPosition>
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        final ReplayPosition lastReplayPosition;
+        final ReplayPosition commitLogUpperBound;
+        final List<Memtable> memtables;
+        final List<SSTableReader> readers;
         volatile FSWriteError flushFailure = null;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
+        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound,
+                          List<Memtable> memtables, List<SSTableReader> readers)
         {
             this.writeBarrier = writeBarrier;
             this.flushSecondaryIndexes = flushSecondaryIndexes;
-            this.lastReplayPosition = lastReplayPosition;
+            this.commitLogUpperBound = commitLogUpperBound;
+            this.memtables = memtables;
+            this.readers = readers;
         }
 
-        public void run()
+        public ReplayPosition call()
         {
+            if (discardFlushResults == ColumnFamilyStore.this)
+                return commitLogUpperBound;
+
             writeBarrier.await();
 
             /**
@@ -1018,7 +1037,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             try
             {
-                // we wait on the latch for the lastReplayPosition to be set, and so that waiters
+                // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
                 latch.await();
             }
@@ -1027,18 +1046,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 throw new IllegalStateException();
             }
 
-            // must check lastReplayPosition != null because Flush may find that all memtables are clean
-            // and so not set a lastReplayPosition
             // If a flush errored out but the error was ignored, make sure we don't discard the commit log.
-            if (lastReplayPosition != null && flushFailure == null)
+            if (flushFailure == null)
             {
-                CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition);
+                CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound);
+                for (int i = 0 ; i < memtables.size() ; i++)
+                {
+                    Memtable memtable = memtables.get(i);
+                    SSTableReader reader = readers.get(i);
+                    memtable.cfs.data.permitCompactionOfFlushed(reader);
+                    memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, reader);
+                }
             }
-
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
                 throw flushFailure;
+            return commitLogUpperBound;
         }
     }
 
@@ -1053,7 +1077,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private final class Flush implements Runnable
     {
         final OpOrder.Barrier writeBarrier;
-        final List<Memtable> memtables;
+        final List<Memtable> memtables = new ArrayList<>();
+        final List<SSTableReader> readers = new ArrayList<>();
         final PostFlush postFlush;
         final boolean truncate;
 
@@ -1069,43 +1094,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * that all write operations register themselves with, and assigning this barrier to the memtables,
              * after which we *.issue()* the barrier. This barrier is used to direct write operations started prior
              * to the barrier.issue() into the memtable we have switched out, and any started after to its replacement.
-             * In doing so it also tells the write operations to update the lastReplayPosition of the memtable, so
+             * In doing so it also tells the write operations to update the commitLogUpperBound of the memtable, so
              * that we know the CL position we are dirty to, which can be marked clean when we complete.
              */
             writeBarrier = keyspace.writeOrder.newBarrier();
-            memtables = new ArrayList<>();
 
             // submit flushes for the memtable for any indexed sub-cfses, and our own
-            AtomicReference<ReplayPosition> lastReplayPositionHolder = new AtomicReference<>();
+            AtomicReference<ReplayPosition> commitLogUpperBound = new AtomicReference<>();
             for (ColumnFamilyStore cfs : concatWithIndexes())
             {
                 // switch all memtables, regardless of their dirty status, setting the barrier
                 // so that we can reach a coordinated decision about cleanliness once they
                 // are no longer possible to be modified
-                Memtable mt = cfs.data.switchMemtable(truncate);
-                mt.setDiscarding(writeBarrier, lastReplayPositionHolder);
-                memtables.add(mt);
+                Memtable newMemtable = new Memtable(commitLogUpperBound, cfs);
+                Memtable oldMemtable = cfs.data.switchMemtable(truncate, newMemtable);
+                oldMemtable.setDiscarding(writeBarrier, commitLogUpperBound);
+                memtables.add(oldMemtable);
             }
 
-            // we now attempt to define the lastReplayPosition; we do this by grabbing the current limit from the CL
-            // and attempting to set the holder to this value. at the same time all writes to the memtables are
-            // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
-            // so that we know all operations prior to the position have not reached it yet
-            ReplayPosition lastReplayPosition;
-            while (true)
-            {
-                lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext());
-                ReplayPosition currentLast = lastReplayPositionHolder.get();
-                if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
-                    && lastReplayPositionHolder.compareAndSet(currentLast, lastReplayPosition))
-                    break;
-            }
+            // we then ensure an atomic decision is made about the upper bound of the continuous range of commit log
+            // records owned by this memtable
+            setCommitLogUpperBound(commitLogUpperBound);
 
             // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
-            // since this happens after wiring up the lastReplayPosition, we also know all operations with earlier
+            // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
             // replay positions have also completed, i.e. the memtables are done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier, lastReplayPosition);
+            postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers);
         }
 
         public void run()
@@ -1124,7 +1139,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 memtable.cfs.data.markFlushing(memtable);
                 if (memtable.isClean() || truncate)
                 {
-                    memtable.cfs.replaceFlushed(memtable, null);
+                    memtable.cfs.data.replaceFlushed(memtable, null);
+                    memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, null);
                     reclaim(memtable);
                     iter.remove();
                 }
@@ -1143,8 +1159,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 for (Memtable memtable : memtables)
                 {
                     // flush the memtable
-                    MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+                    SSTableReader reader = memtable.flush();
+                    memtable.cfs.data.replaceFlushed(memtable, reader);
                     reclaim(memtable);
+                    readers.add(reader);
                 }
             }
             catch (FSWriteError e)
@@ -1174,6 +1192,38 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    // atomically set the upper bound for the commit log
+    private static void setCommitLogUpperBound(AtomicReference<ReplayPosition> commitLogUpperBound)
+    {
+        // we attempt to set the holder to the current commit log context. at the same time all writes to the memtables are
+        // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
+        // so that we know all operations prior to the position have not reached it yet
+        ReplayPosition lastReplayPosition;
+        while (true)
+        {
+            lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext());
+            ReplayPosition currentLast = commitLogUpperBound.get();
+            if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
+                && commitLogUpperBound.compareAndSet(currentLast, lastReplayPosition))
+                break;
+        }
+    }
+
+    @VisibleForTesting
+    // this method should ONLY be used for testing commit log behaviour; it discards the current memtable
+    // contents without marking the commit log clean, and prevents any proceeding flushes from marking
+    // the commit log as done, however they *will* terminate (unlike under typical failures) to ensure progress is made
+    public void simulateFailedFlush()
+    {
+        discardFlushResults = this;
+        data.markFlushing(data.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this)));
+    }
+
+    public void resumeFlushing()
+    {
+        discardFlushResults = null;
+    }
+
     /**
      * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
      * queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
@@ -1589,11 +1639,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null));
     }
 
-    void replaceFlushed(Memtable memtable, SSTableReader sstable)
-    {
-        compactionStrategyWrapper.replaceFlushed(memtable, sstable);
-    }
-
     public boolean isValid()
     {
         return valid;
@@ -1615,6 +1660,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return data.getSSTables();
     }
 
+    public Iterable<SSTableReader> getPermittedToCompactSSTables()
+    {
+        return data.getPermittedToCompact();
+    }
+
     public Set<SSTableReader> getUncompactingSSTables()
     {
         return data.getUncompacting();
@@ -2670,6 +2720,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 public Void call()
                 {
                     cfs.data.reset();
+                    cfs.getCompactionStrategy().shutdown();
+                    cfs.getCompactionStrategy().startup();
                     return null;
                 }
             }, true);
@@ -2707,39 +2759,42 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // position in the System keyspace.
         logger.trace("truncating {}", name);
 
-        if (keyspace.getMetadata().durableWrites || takeSnapshot)
-        {
-            // flush the CF being truncated before forcing the new segment
-            forceBlockingFlush();
+        final long truncatedAt;
+        final ReplayPosition replayAfter;
 
-            // sleep a little to make sure that our truncatedAt comes after any sstable
-            // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
-        }
-        else
+        synchronized (data)
         {
-            // just nuke the memtable data w/o writing to disk first
-            synchronized (data)
+            if (keyspace.getMetadata().durableWrites || takeSnapshot)
             {
+                replayAfter = forceBlockingFlush();
+            }
+            else
+            {
+                // just nuke the memtable data w/o writing to disk first
                 final Flush flush = new Flush(true);
                 flushExecutor.execute(flush);
-                postFlushExecutor.submit(flush.postFlush);
+                replayAfter = FBUtilities.waitOnFuture(postFlushExecutor.submit(flush.postFlush));
             }
+
+            long now = System.currentTimeMillis();
+            // make sure none of our sstables are somehow in the future (clock drift, perhaps)
+            for (ColumnFamilyStore cfs : concatWithIndexes())
+                for (SSTableReader sstable : cfs.data.getSSTables())
+                    now = Math.max(now, sstable.maxDataAge);
+            truncatedAt = now;
         }
 
         Runnable truncateRunnable = new Runnable()
         {
             public void run()
             {
-                logger.trace("Discarding sstable data for truncated CF + indexes");
-
-                final long truncatedAt = System.currentTimeMillis();
+                logger.debug("Discarding sstable data for truncated CF + indexes");
                 data.notifyTruncated(truncatedAt);
 
                 if (takeSnapshot)
                     snapshot(Keyspace.getTimestampedSnapshotName(name));
 
-                ReplayPosition replayAfter = discardSSTables(truncatedAt);
+                discardSSTables(truncatedAt);
 
                 for (SecondaryIndex index : indexManager.getIndexes())
                     index.truncateBlocking(truncatedAt);
@@ -2807,7 +2862,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             public LifecycleTransaction call() throws Exception
             {
                 assert data.getCompacting().isEmpty() : data.getCompacting();
-                Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
+                Iterable<SSTableReader> sstables = getPermittedToCompactSSTables();
+                sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
+                sstables = ImmutableList.copyOf(sstables);
                 LifecycleTransaction modifier = data.tryModify(sstables, operationType);
                 assert modifier != null: "something marked things compacting while compactions are disabled";
                 return modifier;
@@ -3025,10 +3082,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      *
      * @param truncatedAt The timestamp of the truncation
      *                    (all SSTables before that timestamp are going be marked as compacted)
-     *
-     * @return the most recent replay position of the truncated data
      */
-    public ReplayPosition discardSSTables(long truncatedAt)
+    public void discardSSTables(long truncatedAt)
     {
         assert data.getCompacting().isEmpty() : data.getCompacting();
 
@@ -3040,11 +3095,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 truncatedSSTables.add(sstable);
         }
 
-        if (truncatedSSTables.isEmpty())
-            return ReplayPosition.NONE;
-
-        markObsolete(truncatedSSTables, OperationType.UNKNOWN);
-        return ReplayPosition.getReplayPosition(truncatedSSTables);
+        if (!truncatedSSTables.isEmpty())
+            markObsolete(truncatedSSTables, OperationType.UNKNOWN);
     }
 
     public double getDroppableTombstoneRatio()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index c2901a4..2b3662f 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -346,7 +346,7 @@ public class Directories
 
         if (candidates.isEmpty())
             if (tooBig)
-                return null;
+                throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
             else
                 throw new FSWriteError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"), "");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index fb4da72..b4ada09 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,21 +29,20 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -62,14 +61,17 @@ public class Memtable implements Comparable<Memtable>
 
     // the write barrier for directing writes to this memtable during a switch
     private volatile OpOrder.Barrier writeBarrier;
-    // the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned by this or an earlier Memtable
-    private volatile AtomicReference<ReplayPosition> lastReplayPosition;
-    // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
-    private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+    // the precise upper bound of ReplayPosition owned by this memtable
+    private volatile AtomicReference<ReplayPosition> commitLogUpperBound;
+    // the precise lower bound of ReplayPosition owned by this memtable; equal to its predecessor's commitLogUpperBound
+    private AtomicReference<ReplayPosition> commitLogLowerBound;
+    // the approximate lower bound by this memtable; must be <= commitLogLowerBound once our predecessor
+    // has been finalised, and this is enforced in the ColumnFamilyStore.setCommitLogUpperBound
+    private final ReplayPosition approximateCommitLogLowerBound = CommitLog.instance.getContext();
 
     public int compareTo(Memtable that)
     {
-        return this.minReplayPosition.compareTo(that.minReplayPosition);
+        return this.approximateCommitLogLowerBound.compareTo(that.approximateCommitLogLowerBound);
     }
 
     public static final class LastReplayPosition extends ReplayPosition
@@ -84,7 +86,6 @@ public class Memtable implements Comparable<Memtable>
     // actually only store DecoratedKey.
     private final ConcurrentNavigableMap<RowPosition, AtomicBTreeColumns> rows = new ConcurrentSkipListMap<>();
     public final ColumnFamilyStore cfs;
-    private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
 
     // The smallest timestamp for all partitions stored in this memtable
@@ -95,9 +96,10 @@ public class Memtable implements Comparable<Memtable>
     // memtable was created with the new or old comparator.
     public final CellNameType initialComparator;
 
-    public Memtable(ColumnFamilyStore cfs)
+    public Memtable(AtomicReference<ReplayPosition> commitLogLowerBound, ColumnFamilyStore cfs)
     {
         this.cfs = cfs;
+        this.commitLogLowerBound = commitLogLowerBound;
         this.allocator = MEMORY_POOL.newAllocator();
         this.initialComparator = cfs.metadata.comparator;
         this.cfs.scheduleFlush();
@@ -131,7 +133,7 @@ public class Memtable implements Comparable<Memtable>
     public void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
     {
         assert this.writeBarrier == null;
-        this.lastReplayPosition = lastReplayPosition;
+        this.commitLogUpperBound = lastReplayPosition;
         this.writeBarrier = writeBarrier;
         allocator.setDiscarding();
     }
@@ -161,16 +163,21 @@ public class Memtable implements Comparable<Memtable>
             // its current value and ours; if it HAS been finalised, we simply accept its judgement
             // this permits us to coordinate a safe boundary, as the boundary choice is made
             // atomically wrt our max() maintenance, so an operation cannot sneak into the past
-            ReplayPosition currentLast = lastReplayPosition.get();
+            ReplayPosition currentLast = commitLogUpperBound.get();
             if (currentLast instanceof LastReplayPosition)
                 return currentLast.compareTo(replayPosition) >= 0;
             if (currentLast != null && currentLast.compareTo(replayPosition) >= 0)
                 return true;
-            if (lastReplayPosition.compareAndSet(currentLast, replayPosition))
+            if (commitLogUpperBound.compareAndSet(currentLast, replayPosition))
                 return true;
         }
     }
 
+    public ReplayPosition getCommitLogLowerBound()
+    {
+        return commitLogLowerBound.get();
+    }
+
     public boolean isLive()
     {
         return allocator.isLive();
@@ -181,9 +188,9 @@ public class Memtable implements Comparable<Memtable>
         return rows.isEmpty();
     }
 
-    public boolean isCleanAfter(ReplayPosition position)
+    public boolean mayContainDataBefore(ReplayPosition position)
     {
-        return isClean() || (position != null && minReplayPosition.compareTo(position) >= 0);
+        return approximateCommitLogLowerBound.compareTo(position) < 0;
     }
 
     /**
@@ -252,11 +259,6 @@ public class Memtable implements Comparable<Memtable>
         return rows.size();
     }
 
-    public FlushRunnable flushRunnable()
-    {
-        return new FlushRunnable(lastReplayPosition.get());
-    }
-
     public String toString()
     {
         return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)",
@@ -285,20 +287,21 @@ public class Memtable implements Comparable<Memtable>
 
             public Map.Entry<DecoratedKey, ColumnFamily> next()
             {
-                Map.Entry<? extends RowPosition, ? extends ColumnFamily> entry = iter.next();
+                Map.Entry<? extends RowPosition, ? extends ColumnFamily> entryRowPosition = iter.next();
                 // Actual stored key should be true DecoratedKey
-                assert entry.getKey() instanceof DecoratedKey;
+                assert entryRowPosition.getKey() instanceof DecoratedKey;
+                @SuppressWarnings("unchecked") // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey
+                Map.Entry<DecoratedKey, ColumnFamily> entry = (Map.Entry<DecoratedKey, ColumnFamily>) entryRowPosition;
                 if (MEMORY_POOL.needToCopyOnHeap())
                 {
-                    DecoratedKey key = (DecoratedKey) entry.getKey();
+                    DecoratedKey key = entry.getKey();
                     key = new BufferDecoratedKey(key.getToken(), HeapAllocator.instance.clone(key.getKey()));
                     ColumnFamily cells = ArrayBackedSortedColumns.localCopy(entry.getValue(), HeapAllocator.instance);
                     entry = new AbstractMap.SimpleImmutableEntry<>(key, cells);
                 }
                 // Store the reference to the current entry so that remove() can update the current size.
                 currentEntry = entry;
-                // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey
-                return (Map.Entry<DecoratedKey, ColumnFamily>) entry;
+                return entry;
             }
 
             public void remove()
@@ -315,9 +318,13 @@ public class Memtable implements Comparable<Memtable>
         return rows.get(key);
     }
 
-    public long creationTime()
+    public SSTableReader flush()
     {
-        return creationTime;
+        long estimatedSize = estimatedSize();
+        Directories.DataDirectory dataDirectory = cfs.directories.getWriteableLocation(estimatedSize);
+        File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
+        assert sstableDirectory != null : "Flush task is not bound to any disk";
+        return writeSortedContents(sstableDirectory);
     }
 
     public long getMinTimestamp()
@@ -325,115 +332,88 @@ public class Memtable implements Comparable<Memtable>
         return minTimestamp;
     }
 
-    class FlushRunnable extends DiskAwareRunnable
+    private long estimatedSize()
     {
-        private final ReplayPosition context;
-        private final long estimatedSize;
-
-        FlushRunnable(ReplayPosition context)
-        {
-            this.context = context;
-
-            long keySize = 0;
-            for (RowPosition key : rows.keySet())
-            {
-                //  make sure we don't write non-sensical keys
-                assert key instanceof DecoratedKey;
-                keySize += ((DecoratedKey)key).getKey().remaining();
-            }
-            estimatedSize = (long) ((keySize // index entries
-                                    + keySize // keys in data file
-                                    + liveDataSize.get()) // data
-                                    * 1.2); // bloom filter and row index overhead
-        }
-
-        public long getExpectedWriteSize()
-        {
-            return estimatedSize;
-        }
-
-        protected void runMayThrow() throws Exception
+        long keySize = 0;
+        for (RowPosition key : rows.keySet())
         {
-            long writeSize = getExpectedWriteSize();
-            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
-            File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
-            assert sstableDirectory != null : "Flush task is not bound to any disk";
-            SSTableReader sstable = writeSortedContents(context, sstableDirectory);
-            cfs.replaceFlushed(Memtable.this, sstable);
+            //  make sure we don't write non-sensical keys
+            assert key instanceof DecoratedKey;
+            keySize += ((DecoratedKey)key).getKey().remaining();
         }
+        return (long) ((keySize // index entries
+                        + keySize // keys in data file
+                        + liveDataSize.get()) // data
+                       * 1.2); // bloom filter and row index overhead
+    }
 
-        protected Directories getDirectories()
-        {
-            return cfs.directories;
-        }
+    private SSTableReader writeSortedContents(File sstableDirectory)
+    {
+        logger.info("Writing {}", Memtable.this.toString());
 
-        private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
+        SSTableReader ssTable;
+        // errors when creating the writer that may leave empty temp files.
+        try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
         {
-            logger.debug("Writing {}", Memtable.this.toString());
-
-            SSTableReader ssTable;
-            // errors when creating the writer that may leave empty temp files.
-            try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
+            boolean trackContention = logger.isTraceEnabled();
+            int heavilyContendedRowCount = 0;
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending flush" category)
+            for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
             {
-                boolean trackContention = logger.isTraceEnabled();
-                int heavilyContendedRowCount = 0;
-                // (we can't clear out the map as-we-go to free up memory,
-                //  since the memtable is being used for queries in the "pending flush" category)
-                for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
+                AtomicBTreeColumns cf = entry.getValue();
+
+                if (cf.isMarkedForDelete() && cf.hasColumns())
                 {
-                    AtomicBTreeColumns cf = entry.getValue();
-
-                    if (cf.isMarkedForDelete() && cf.hasColumns())
-                    {
-                        // When every node is up, there's no reason to write batchlog data out to sstables
-                        // (which in turn incurs cost like compaction) since the BL write + delete cancel each other out,
-                        // and BL data is strictly local, so we don't need to preserve tombstones for repair.
-                        // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
-                        // See CASSANDRA-4667.
-                        if (cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME))
-                            continue;
-                    }
-
-                    if (trackContention && cf.usePessimisticLocking())
-                        heavilyContendedRowCount++;
-
-                    if (!cf.isEmpty())
-                        writer.append((DecoratedKey)entry.getKey(), cf);
+                    // When every node is up, there's no reason to write batchlog data out to sstables
+                    // (which in turn incurs cost like compaction) since the BL write + delete cancel each other out,
+                    // and BL data is strictly local, so we don't need to preserve tombstones for repair.
+                    // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
+                    // See CASSANDRA-4667.
+                    if (cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME))
+                        continue;
                 }
 
-                if (writer.getFilePointer() > 0)
-                {
-                    logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                              writer.getFilename(),
-                                              FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
-                                              context));
+                if (trackContention && cf.usePessimisticLocking())
+                    heavilyContendedRowCount++;
 
-                    // temp sstables should contain non-repaired data.
-                    ssTable = writer.finish(true);
-                }
-                else
-                {
-                    logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                                writer.getFilename(), context);
-                    writer.abort();
-                    ssTable = null;
-                }
+                if (!cf.isEmpty())
+                    writer.append((DecoratedKey)entry.getKey(), cf);
+            }
 
-                if (heavilyContendedRowCount > 0)
-                    logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString()));
+            if (writer.getFilePointer() > 0)
+            {
+                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                           writer.getFilename(),
+                                           FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
+                                           commitLogUpperBound));
 
-                return ssTable;
+                // temp sstables should contain non-repaired data.
+                ssTable = writer.finish(true);
+            }
+            else
+            {
+                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                             writer.getFilename(), commitLogUpperBound);
+                writer.abort();
+                ssTable = null;
             }
-        }
 
-        public SSTableWriter createFlushWriter(String filename)
-        {
-            MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
+            if (heavilyContendedRowCount > 0)
+                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString()));
 
-            return SSTableWriter.create(Descriptor.fromFilename(filename), (long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+            return ssTable;
         }
     }
 
+    private SSTableWriter createFlushWriter(String filename)
+    {
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
+                                                     .commitLogLowerBound(commitLogLowerBound.get())
+                                                     .commitLogUpperBound(commitLogUpperBound.get());
+        return SSTableWriter.create(Descriptor.fromFilename(filename), (long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+    }
+
     private static int estimateRowOverhead(final int count)
     {
         // calculate row overhead

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 98fb556..a58aeb4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -33,7 +33,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Ordering;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +69,7 @@ public class CommitLogReplayer
     private final List<Future<?>> futures;
     private final Map<UUID, AtomicInteger> invalidMutations;
     private final AtomicInteger replayedCount;
-    private final Map<UUID, ReplayPosition> cfPositions;
+    private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
     private final ReplayPosition globalPosition;
     private final ICRC32 checksum;
     private byte[] buffer;
@@ -79,7 +78,7 @@ public class CommitLogReplayer
     private final ReplayFilter replayFilter;
     private final CommitLogArchiver archiver;
 
-    CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
+    CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter)
     {
         this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
         this.futures = new ArrayList<Future<?>>();
@@ -89,7 +88,7 @@ public class CommitLogReplayer
         // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
         this.checksum = CRC32Factory.instance.create();
-        this.cfPositions = cfPositions;
+        this.cfPersisted = cfPersisted;
         this.globalPosition = globalPosition;
         this.replayFilter = replayFilter;
         this.archiver = commitLog.archiver;
@@ -98,17 +97,12 @@ public class CommitLogReplayer
     public static CommitLogReplayer construct(CommitLog commitLog)
     {
         // compute per-CF and global replay positions
-        Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, ReplayPosition>();
-        Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
+        Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>();
         ReplayFilter replayFilter = ReplayFilter.create();
+        ReplayPosition globalPosition = null;
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
-            // below: gRP will return NONE if there are no flushed sstables, which is important to have in the
-            // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
-            ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
-
-            // but, if we've truncated the cf in question, then we need to need to start replay after the truncation
+            // but, if we've truncted the cf in question, then we need to need to start replay after the truncation
             ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
             if (truncatedAt != null)
             {
@@ -125,19 +119,21 @@ public class CommitLogReplayer
                                     cfs.metadata.ksName,
                                     cfs.metadata.cfName);
                         SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId);
+                        truncatedAt = null;
                     }
                 }
-                else
-                {
-                    rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt));
-                }
             }
 
-            cfPositions.put(cfs.metadata.cfId, rp);
+            ReplayPosition.ReplayFilter filter = new ReplayPosition.ReplayFilter(cfs.getSSTables(), truncatedAt);
+            if (!filter.isEmpty())
+                cfPersisted.put(cfs.metadata.cfId, filter);
+            else
+                globalPosition = ReplayPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
         }
-        ReplayPosition globalPosition = replayPositionOrdering.min(cfPositions.values());
-        logger.trace("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPositions));
-        return new CommitLogReplayer(commitLog, globalPosition, cfPositions, replayFilter);
+        if (globalPosition == null)
+            globalPosition = ReplayPosition.firstNotCovered(cfPersisted.values());
+        logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
+        return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
     }
 
     public void recover(File[] clogs) throws IOException
@@ -273,6 +269,18 @@ public class CommitLogReplayer
         }
     }
 
+    /**
+     * consult the known-persisted ranges for our sstables;
+     * if the position is covered by one of them it does not need to be replayed
+     *
+     * @return true iff replay is necessary
+     */
+    private boolean shouldReplay(UUID cfId, ReplayPosition position)
+    {
+        ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId);
+        return filter == null || filter.shouldReplay(position);
+    }
+
     @SuppressWarnings("resource")
     public void recover(File file, boolean tolerateTruncation) throws IOException
     {
@@ -495,7 +503,7 @@ public class CommitLogReplayer
                                   mutationStart, errorContext);
                 continue;
             }
-            replayMutation(buffer, serializedSize, reader.getFilePointer(), desc);
+            replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc);
         }
         return true;
     }
@@ -504,7 +512,7 @@ public class CommitLogReplayer
      * Deserializes and replays a commit log entry.
      */
     void replayMutation(byte[] inputBuffer, int size,
-            final long entryLocation, final CommitLogDescriptor desc) throws IOException
+            final int entryLocation, final CommitLogDescriptor desc) throws IOException
     {
 
         final Mutation mutation;
@@ -577,11 +585,7 @@ public class CommitLogReplayer
                     if (Schema.instance.getCF(columnFamily.id()) == null)
                         continue; // dropped
 
-                    ReplayPosition rp = cfPositions.get(columnFamily.id());
-
-                    // replay if current segment is newer than last flushed one or,
-                    // if it is the last known segment, if we are after the replay position
-                    if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
+                    if (shouldReplay(columnFamily.id(), new ReplayPosition(desc.id, entryLocation)))
                     {
                         if (newMutation == null)
                             newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index ca1969f..17802ad 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -19,10 +19,10 @@ package org.apache.cassandra.db.commitlog;
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.util.Comparator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.db.TypeSizes;
@@ -35,46 +35,78 @@ public class ReplayPosition implements Comparable<ReplayPosition>
     public static final ReplayPositionSerializer serializer = new ReplayPositionSerializer();
 
     // NONE is used for SSTables that are streamed from other nodes and thus have no relationship
-    // with our local commitlog. The values satisfy the critera that
+    // with our local commitlog. The values satisfy the criteria that
     //  - no real commitlog segment will have the given id
     //  - it will sort before any real replayposition, so it will be effectively ignored by getReplayPosition
     public static final ReplayPosition NONE = new ReplayPosition(-1, 0);
 
+    public final long segment;
+    public final int position;
+
     /**
-     * Convenience method to compute the replay position for a group of SSTables.
-     * @param sstables
-     * @return the most recent (highest) replay position
+     * A filter of known safe-to-discard commit log replay positions, based on
+     * the range covered by on disk sstables and those prior to the most recent truncation record
      */
-    public static ReplayPosition getReplayPosition(Iterable<? extends SSTableReader> sstables)
+    public static class ReplayFilter
     {
-        if (Iterables.isEmpty(sstables))
-            return NONE;
-
-        Function<SSTableReader, ReplayPosition> f = new Function<SSTableReader, ReplayPosition>()
+        final NavigableMap<ReplayPosition, ReplayPosition> persisted = new TreeMap<>();
+        public ReplayFilter(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt)
         {
-            public ReplayPosition apply(SSTableReader sstable)
+            for (SSTableReader reader : onDisk)
             {
-                return sstable.getReplayPosition();
+                ReplayPosition start = reader.getSSTableMetadata().commitLogLowerBound;
+                ReplayPosition end = reader.getSSTableMetadata().commitLogUpperBound;
+                add(persisted, start, end);
             }
-        };
-        Ordering<ReplayPosition> ordering = Ordering.from(ReplayPosition.comparator);
-        return ordering.max(Iterables.transform(sstables, f));
-    }
+            if (truncatedAt != null)
+                add(persisted, ReplayPosition.NONE, truncatedAt);
+        }
 
+        private static void add(NavigableMap<ReplayPosition, ReplayPosition> ranges, ReplayPosition start, ReplayPosition end)
+        {
+            // extend ourselves to cover any ranges we overlap
+            // record directly preceding our end may extend past us, so take the max of our end and its
+            Map.Entry<ReplayPosition, ReplayPosition> extend = ranges.floorEntry(end);
+            if (extend != null && extend.getValue().compareTo(end) > 0)
+                end = extend.getValue();
+
+            // record directly preceding our start may extend into us; if it does, we take it as our start
+            extend = ranges.lowerEntry(start);
+            if (extend != null && extend.getValue().compareTo(start) >= 0)
+                start = extend.getKey();
+
+            ranges.subMap(start, end).clear();
+            ranges.put(start, end);
+        }
 
-    public final long segment;
-    public final int position;
+        public boolean shouldReplay(ReplayPosition position)
+        {
+            // replay ranges are start exclusive, end inclusive
+            Map.Entry<ReplayPosition, ReplayPosition> range = persisted.lowerEntry(position);
+            return range == null || position.compareTo(range.getValue()) > 0;
+        }
 
-    public static final Comparator<ReplayPosition> comparator = new Comparator<ReplayPosition>()
-    {
-        public int compare(ReplayPosition o1, ReplayPosition o2)
+        public boolean isEmpty()
         {
-            if (o1.segment != o2.segment)
-                return Long.valueOf(o1.segment).compareTo(o2.segment);
+            return persisted.isEmpty();
+        }
+    }
 
-            return Integer.valueOf(o1.position).compareTo(o2.position);
+    public static ReplayPosition firstNotCovered(Iterable<ReplayFilter> ranges)
+    {
+        ReplayPosition min = null;
+        for (ReplayFilter map : ranges)
+        {
+            ReplayPosition first = map.persisted.firstEntry().getValue();
+            if (min == null)
+                min = first;
+            else
+                min = Ordering.natural().min(min, first);
         }
-    };
+        if (min == null)
+            return NONE;
+        return min;
+    }
 
     public ReplayPosition(long segment, int position)
     {
@@ -83,9 +115,12 @@ public class ReplayPosition implements Comparable<ReplayPosition>
         this.position = position;
     }
 
-    public int compareTo(ReplayPosition other)
+    public int compareTo(ReplayPosition that)
     {
-        return comparator.compare(this, other);
+        if (this.segment != that.segment)
+            return Long.compare(this.segment, that.segment);
+
+        return Integer.compare(this.position, that.position);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 02497df..575aa51 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -232,9 +232,6 @@ public abstract class AbstractCompactionStrategy
      */
     public void replaceFlushed(Memtable memtable, SSTableReader sstable)
     {
-        cfs.getTracker().replaceFlushed(memtable, sstable);
-        if (sstable != null)
-            CompactionManager.instance.submitBackground(cfs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e9137e2..d1fe702 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,8 +98,6 @@ public class Scrubber implements Closeable
 
         // Calculate the expected compacted filesize
         this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
-        if (destination == null)
-            throw new IOException("disk full");
 
         // If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes.
         this.controller = transaction.isOffline()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index a074216..5d5701f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -31,11 +31,13 @@ import com.google.common.collect.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -45,6 +47,8 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static com.google.common.base.Predicates.and;
+import static com.google.common.base.Predicates.in;
+import static com.google.common.base.Predicates.not;
 import static com.google.common.collect.ImmutableSet.copyOf;
 import static com.google.common.collect.Iterables.filter;
 import static java.util.Collections.singleton;
@@ -195,10 +199,12 @@ public class Tracker
     public void reset()
     {
         view.set(new View(
-                         !isDummy() ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(),
+                         !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore))
+                                    : ImmutableList.<Memtable>of(),
                          ImmutableList.<Memtable>of(),
                          Collections.<SSTableReader, SSTableReader>emptyMap(),
                          Collections.<SSTableReader>emptySet(),
+                         Collections.<SSTableReader>emptySet(),
                          SSTableIntervalTree.empty()));
     }
 
@@ -294,9 +300,8 @@ public class Tracker
      *
      * @return the previously active memtable
      */
-    public Memtable switchMemtable(boolean truncating)
+    public Memtable switchMemtable(boolean truncating, Memtable newMemtable)
     {
-        Memtable newMemtable = new Memtable(cfstore);
         Pair<View, View> result = apply(View.switchMemtable(newMemtable));
         if (truncating)
             notifyRenewed(newMemtable);
@@ -328,15 +333,35 @@ public class Tracker
 
         Throwable fail;
         fail = updateSizeTracking(emptySet(), singleton(sstable), null);
-        // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
-        fail = notifyAdded(sstable, fail);
-
-        if (!isDummy() && !cfstore.isValid())
-            dropSSTables();
 
         maybeFail(fail);
     }
 
+    /**
+     * permit compaction of the provided sstable; this translates to notifying compaction
+     * strategies of its existence, and potentially submitting a background task
+     */
+    public void permitCompactionOfFlushed(SSTableReader sstable)
+    {
+        if (sstable == null)
+            return;
+
+        apply(View.permitCompactionOfFlushed(sstable));
+
+        if (isDummy())
+            return;
+
+        if (cfstore.isValid())
+        {
+            notifyAdded(sstable);
+            CompactionManager.instance.submitBackground(cfstore);
+        }
+        else
+        {
+            dropSSTables();
+        }
+    }
+
 
 
     // MISCELLANEOUS public utility calls
@@ -346,6 +371,12 @@ public class Tracker
         return view.get().sstables;
     }
 
+    public Iterable<SSTableReader> getPermittedToCompact()
+    {
+        View view = this.view.get();
+        return filter(view.sstables, not(in(view.premature)));
+    }
+
     public Set<SSTableReader> getCompacting()
     {
         return view.get().compacting;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 73ba131..fba1627 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.db.lifecycle;
 
 import java.util.*;
 
+import javax.annotation.Nullable;
+
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
@@ -66,6 +68,7 @@ public class View
     public final List<Memtable> flushingMemtables;
     public final Set<SSTableReader> compacting;
     public final Set<SSTableReader> sstables;
+    public final Set<SSTableReader> premature;
     // we use a Map here so that we can easily perform identity checks as well as equality checks.
     // When marking compacting, we now  indicate if we expect the sstables to be present (by default we do),
     // and we then check that not only are they all present in the live set, but that the exact instance present is
@@ -74,7 +77,7 @@ public class View
 
     public final SSTableIntervalTree intervalTree;
 
-    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
+    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
     {
         assert liveMemtables != null;
         assert flushingMemtables != null;
@@ -88,6 +91,7 @@ public class View
         this.sstablesMap = sstables;
         this.sstables = sstablesMap.keySet();
         this.compacting = compacting;
+        this.premature = premature;
         this.intervalTree = intervalTree;
     }
 
@@ -155,7 +159,7 @@ public class View
                 assert all(mark, Helpers.idIn(view.sstablesMap));
                 return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
                                 replace(view.compacting, unmark, mark),
-                                view.intervalTree);
+                                view.premature, view.intervalTree);
             }
         };
     }
@@ -169,7 +173,7 @@ public class View
             public boolean apply(View view)
             {
                 for (SSTableReader reader : readers)
-                    if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted())
+                    if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted() || view.premature.contains(reader))
                         return false;
                 return true;
             }
@@ -186,7 +190,7 @@ public class View
             public View apply(View view)
             {
                 Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
-                return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting,
+                return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting, view.premature,
                                 SSTableIntervalTree.build(sstableMap.keySet()));
             }
         };
@@ -201,7 +205,7 @@ public class View
             {
                 List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
                 assert newLive.size() == view.liveMemtables.size() + 1;
-                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.intervalTree);
+                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.premature, view.intervalTree);
             }
         };
     }
@@ -220,7 +224,7 @@ public class View
                                                            filter(flushing, not(lessThan(toFlush)))));
                 assert newLive.size() == live.size() - 1;
                 assert newFlushing.size() == flushing.size() + 1;
-                return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.intervalTree);
+                return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.premature, view.intervalTree);
             }
         };
     }
@@ -237,15 +241,33 @@ public class View
 
                 if (flushed == null)
                     return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
-                                    view.compacting, view.intervalTree);
+                                    view.compacting, view.premature, view.intervalTree);
 
                 Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
-                return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compacting,
+                Set<SSTableReader> compacting = replace(view.compacting, emptySet(), singleton(flushed));
+                Set<SSTableReader> premature = replace(view.premature, emptySet(), singleton(flushed));
+                return new View(view.liveMemtables, flushingMemtables, sstableMap, compacting, premature,
                                 SSTableIntervalTree.build(sstableMap.keySet()));
             }
         };
     }
 
+    static Function<View, View> permitCompactionOfFlushed(final SSTableReader reader)
+    {
+        return new Function<View, View>()
+        {
+
+            @Nullable
+            public View apply(View view)
+            {
+                Set<SSTableReader> premature = ImmutableSet.copyOf(filter(view.premature, not(equalTo(reader))));
+                Set<SSTableReader> compacting = ImmutableSet.copyOf(filter(view.compacting, not(equalTo(reader))));
+                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compacting, premature, view.intervalTree);
+            }
+        };
+    }
+
+
     private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
     {
         return new Predicate<T>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index e81e4e9..c303975 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1864,11 +1864,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return sstableMetadata.compressionRatio;
     }
 
-    public ReplayPosition getReplayPosition()
-    {
-        return sstableMetadata.replayPosition;
-    }
-
     public long getMinTimestamp()
     {
         return sstableMetadata.minTimestamp;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index faaa89e..41a83e1 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -52,6 +52,8 @@ public abstract class Version
 
     public abstract boolean hasNewFileName();
 
+    public abstract boolean hasCommitLogLowerBound();
+
     public String getVersion()
     {
         return version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index a1e32cf..9244bbb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -125,7 +125,7 @@ public class BigFormat implements SSTableFormat
     // we always incremented the major version.
     static class BigVersion extends Version
     {
-        public static final String current_version = "la";
+        public static final String current_version = "lb";
         public static final String earliest_supported_version = "jb";
 
         // jb (2.0.1): switch from crc32 to adler32 for compression checksums
@@ -135,6 +135,7 @@ public class BigFormat implements SSTableFormat
         //             switch uncompressed checksums to adler32
         //             tracks presense of legacy (local and remote) counter shards
         // la (2.2.0): new file name format
+        // lb (2.2.7): commit log lower bound included
 
         private final boolean isLatestVersion;
         private final boolean hasSamplingLevel;
@@ -143,6 +144,7 @@ public class BigFormat implements SSTableFormat
         private final boolean hasRepairedAt;
         private final boolean tracksLegacyCounterShards;
         private final boolean newFileName;
+        private final boolean hasCommitLogLowerBound;
 
         public BigVersion(String version)
         {
@@ -155,6 +157,7 @@ public class BigFormat implements SSTableFormat
             hasRepairedAt = version.compareTo("ka") >= 0;
             tracksLegacyCounterShards = version.compareTo("ka") >= 0;
             newFileName = version.compareTo("la") >= 0;
+            hasCommitLogLowerBound = version.compareTo("lb") >= 0;
         }
 
         @Override
@@ -199,6 +202,11 @@ public class BigFormat implements SSTableFormat
             return newFileName;
         }
 
+        public boolean hasCommitLogLowerBound()
+        {
+            return hasCommitLogLowerBound;
+        }
+
         @Override
         public boolean isCompatible()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 505bac0..3a01f87 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -432,7 +432,7 @@ public class BigTableWriter extends SSTableWriter
         File file = new File(desc.filenameFor(Component.STATS));
         try (SequentialWriter out = SequentialWriter.open(file);)
         {
-            desc.getMetadataSerializer().serialize(components, out.stream);
+            desc.getMetadataSerializer().serialize(components, desc.version, out.stream);
             out.setDescriptor(desc).finish();
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
index ed1f327..c8e6ee8 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@ -72,7 +72,7 @@ public class CompactionMetadata extends MetadataComponent
 
     public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>
     {
-        public int serializedSize(CompactionMetadata component) throws IOException
+        public int serializedSize(CompactionMetadata component, Version version) throws IOException
         {
             int size = 0;
             size += TypeSizes.NATIVE.sizeof(component.ancestors.size());
@@ -83,7 +83,7 @@ public class CompactionMetadata extends MetadataComponent
             return size;
         }
 
-        public void serialize(CompactionMetadata component, DataOutputPlus out) throws IOException
+        public void serialize(CompactionMetadata component, Version version, DataOutputPlus out) throws IOException
         {
             out.writeInt(component.ancestors.size());
             for (int g : component.ancestors)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
index dc8fbdf..e3d867f 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
@@ -35,7 +35,7 @@ public interface IMetadataComponentSerializer<T extends MetadataComponent>
      * @return serialized size of this component
      * @throws IOException
      */
-    int serializedSize(T component) throws IOException;
+    int serializedSize(T component, Version version) throws IOException;
 
     /**
      * Serialize metadata component to given output.
@@ -45,7 +45,7 @@ public interface IMetadataComponentSerializer<T extends MetadataComponent>
      * @param out  serialize destination
      * @throws IOException
      */
-    void serialize(T component, DataOutputPlus out) throws IOException;
+    void serialize(T component, Version version, DataOutputPlus out) throws IOException;
 
     /**
      * Deserialize metadata component from given input.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index df577df..a7d23f4 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -22,6 +22,7 @@ import java.util.EnumSet;
 import java.util.Map;
 
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -37,7 +38,7 @@ public interface IMetadataSerializer
      * @param out
      * @throws IOException
      */
-    void serialize(Map<MetadataType, MetadataComponent> components, DataOutputPlus out) throws IOException;
+    void serialize(Map<MetadataType, MetadataComponent> components, Version version, DataOutputPlus out) throws IOException;
 
     /**
      * Deserialize specified metadata components from given descriptor.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 4bd060e..bfeb930 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Maps;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -42,7 +43,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
      * Legacy serialization is only used for SSTable level reset.
      */
     @Override
-    public void serialize(Map<MetadataType, MetadataComponent> components, DataOutputPlus out) throws IOException
+    public void serialize(Map<MetadataType, MetadataComponent> components, Version version, DataOutputPlus out) throws IOException
     {
         ValidationMetadata validation = (ValidationMetadata) components.get(MetadataType.VALIDATION);
         StatsMetadata stats = (StatsMetadata) components.get(MetadataType.STATS);
@@ -52,7 +53,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
 
         EstimatedHistogram.serializer.serialize(stats.estimatedRowSize, out);
         EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
-        ReplayPosition.serializer.serialize(stats.replayPosition, out);
+        ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out);
         out.writeLong(stats.minTimestamp);
         out.writeLong(stats.maxTimestamp);
         out.writeInt(stats.maxLocalDeletionTime);
@@ -70,6 +71,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
         out.writeInt(stats.maxColumnNames.size());
         for (ByteBuffer columnName : stats.maxColumnNames)
             ByteBufferUtil.writeWithShortLength(columnName, out);
+        if (version.hasCommitLogLowerBound())
+            ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out);
     }
 
     /**
@@ -91,7 +94,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
             {
                 EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
                 EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
-                ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+                ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
+                ReplayPosition commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
                 long minTimestamp = in.readLong();
                 long maxTimestamp = in.readLong();
                 int maxLocalDeletionTime = in.readInt();
@@ -116,6 +120,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
                 List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount);
                 for (int i = 0; i < colCount; i++)
                     maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                if (descriptor.version.hasCommitLogLowerBound())
+                    commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
 
                 if (types.contains(MetadataType.VALIDATION))
                     components.put(MetadataType.VALIDATION,
@@ -124,7 +130,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
                     components.put(MetadataType.STATS,
                                    new StatsMetadata(rowSizes,
                                                      columnCounts,
-                                                     replayPosition,
+                                                     commitLogLowerBound,
+                                                     commitLogUpperBound,
                                                      minTimestamp,
                                                      maxTimestamp,
                                                      maxLocalDeletionTime,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 5962a46..579ff7a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
@@ -69,6 +70,7 @@ public class MetadataCollector
         return new StatsMetadata(defaultRowSizeHistogram(),
                                  defaultColumnCountHistogram(),
                                  ReplayPosition.NONE,
+                                 ReplayPosition.NONE,
                                  Long.MIN_VALUE,
                                  Long.MAX_VALUE,
                                  Integer.MAX_VALUE,
@@ -83,7 +85,8 @@ public class MetadataCollector
 
     protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
     protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
-    protected ReplayPosition replayPosition = ReplayPosition.NONE;
+    protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
+    protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
     protected long minTimestamp = Long.MAX_VALUE;
     protected long maxTimestamp = Long.MIN_VALUE;
     protected int maxLocalDeletionTime = Integer.MIN_VALUE;
@@ -113,7 +116,23 @@ public class MetadataCollector
     {
         this(columnNameComparator);
 
-        replayPosition(ReplayPosition.getReplayPosition(sstables));
+        ReplayPosition min = null, max = null;
+        for (SSTableReader sstable : sstables)
+        {
+            if (min == null)
+            {
+                min = sstable.getSSTableMetadata().commitLogLowerBound;
+                max = sstable.getSSTableMetadata().commitLogUpperBound;
+            }
+            else
+            {
+                min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound);
+                max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound);
+            }
+        }
+
+        commitLogLowerBound(min);
+        commitLogUpperBound(max);
         sstableLevel(level);
         // Get the max timestamp of the precompacted sstables
         // and adds generation of live ancestors
@@ -199,9 +218,15 @@ public class MetadataCollector
         return this;
     }
 
-    public MetadataCollector replayPosition(ReplayPosition replayPosition)
+    public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound)
+    {
+        this.commitLogLowerBound = commitLogLowerBound;
+        return this;
+    }
+
+    public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound)
     {
-        this.replayPosition = replayPosition;
+        this.commitLogUpperBound = commitLogUpperBound;
         return this;
     }
 
@@ -257,7 +282,8 @@ public class MetadataCollector
         components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
         components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize,
                                                              estimatedColumnCount,
-                                                             replayPosition,
+                                                             commitLogLowerBound,
+                                                             commitLogUpperBound,
                                                              minTimestamp,
                                                              maxTimestamp,
                                                              maxLocalDeletionTime,


Mime
View raw message