cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject cassandra git commit: Fix regression in SSTableRewriter causing some rows to become unreadable during compaction
Date Fri, 19 Dec 2014 14:29:12 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 bedd97f7a -> 871f0039c


Fix regression in SSTableRewriter causing some rows to become unreadable during compaction

patch by marcus and benedict for CASSANDRA-8429


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/871f0039
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/871f0039
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/871f0039

Branch: refs/heads/cassandra-2.1
Commit: 871f0039c5bf89be343039478c64ce835b04b5cf
Parents: bedd97f
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Fri Dec 19 14:04:38 2014 +0000
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Fri Dec 19 14:24:47 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../db/compaction/CompactionManager.java        |   3 -
 .../io/compress/CompressedSequentialWriter.java |  12 +-
 .../io/compress/CompressionMetadata.java        |  31 +--
 .../cassandra/io/sstable/SSTableReader.java     |  26 +--
 .../cassandra/io/sstable/SSTableRewriter.java   | 189 +++++++++----------
 .../cassandra/io/sstable/SSTableWriter.java     | 118 +++++++-----
 .../io/util/BufferedPoolingSegmentedFile.java   |   9 +-
 .../io/util/BufferedSegmentedFile.java          |   9 +-
 .../io/util/CompressedPoolingSegmentedFile.java |  11 +-
 .../io/util/CompressedSegmentedFile.java        |  16 +-
 .../cassandra/io/util/MmappedSegmentedFile.java |   8 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  12 +-
 .../io/sstable/SSTableRewriterTest.java         |  71 ++++++-
 14 files changed, 278 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e5a8f05..ac28d78 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1.3
+ * Fix regression in SSTableRewriter causing some rows to become unreadable 
+   during compaction (CASSANDRA-8429)
  * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
  * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
    is disabled (CASSANDRA-8288)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9f5951c..872ebed 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1035,9 +1035,6 @@ public class CompactionManager implements CompactionManagerMBean
                         unrepairedKeyCount++;
                     }
                 }
-                // we have the same readers being rewritten by both writers, so we ask the
first one NOT to close them
-                // so that the second one can do so safely, without leaving us with references
< 0 or any other ugliness
-                // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
                 anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
                 anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
                 cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables,
OperationType.ANTICOMPACTION);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index e533b1e..d3c41fa 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.FileMark;
@@ -139,15 +140,10 @@ public class CompressedSequentialWriter extends SequentialWriter
         chunkOffset += compressedLength + 4;
     }
 
-    public CompressionMetadata openEarly()
+    public CompressionMetadata open(SSTableWriter.FinishType finishType)
     {
-        return metadataWriter.openEarly(originalSize, chunkOffset);
-    }
-
-    public CompressionMetadata openAfterClose()
-    {
-        assert current == originalSize;
-        return metadataWriter.openAfterClose(current, chunkOffset);
+        assert finishType != SSTableWriter.FinishType.NORMAL || current == originalSize;
+        return metadataWriter.open(originalSize, chunkOffset, finishType);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index c922963..173722f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -47,10 +47,10 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -217,7 +217,7 @@ public class CompressionMetadata
             throw new CorruptSSTableException(new EOFException(), indexFilePath);
 
         long chunkOffset = chunkOffsets.getLong(idx);
-        long nextChunkOffset = (idx + 8 == chunkOffsets.size())
+        long nextChunkOffset = (idx + 8 == chunkOffsetsSize)
                                 ? compressedFileLength
                                 : chunkOffsets.getLong(idx + 8);
 
@@ -319,18 +319,25 @@ public class CompressionMetadata
             }
         }
 
-        public CompressionMetadata openEarly(long dataLength, long compressedLength)
+        public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType
finishType)
         {
+            RefCountedMemory offsets;
+            switch (finishType)
+            {
+                case EARLY:
+                    offsets = this.offsets;
+                    break;
+                case NORMAL:
+                case FINISH_EARLY:
+                    offsets = this.offsets.copy(count * 8L);
+                    this.offsets.unreference();
+                    break;
+                default:
+                    throw new AssertionError();
+            }
             return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength,
compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
         }
 
-        public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
-        {
-            RefCountedMemory newOffsets = offsets.copy(count * 8L);
-            offsets.unreference();
-            return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L,
dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
-        }
-
         /**
          * Get a chunk offset by it's index.
          *
@@ -360,8 +367,8 @@ public class CompressionMetadata
             	out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
 	            assert chunks == count;
 	            writeHeader(out, dataLength, chunks);
-	            for (int i = 0 ; i < count ; i++)
-	                out.writeLong(offsets.getLong(i * 8));
+                for (int i = 0 ; i < count ; i++)
+                    out.writeLong(offsets.getLong(i * 8));
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index bd20226..217a109 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -201,7 +201,6 @@ public class SSTableReader extends SSTable
     private Object replaceLock = new Object();
     private SSTableReader replacedBy;
     private SSTableReader replaces;
-    private SSTableReader sharesBfWith;
     private SSTableDeletingTask deletingTask;
     private Runnable runOnClose;
 
@@ -575,7 +574,7 @@ public class SSTableReader extends SSTable
 
         synchronized (replaceLock)
         {
-            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles =
false;
+            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles =
isCompacted.get();
 
             if (replacedBy != null)
             {
@@ -594,19 +593,11 @@ public class SSTableReader extends SSTable
                 deleteFiles &= !dfile.path.equals(replaces.dfile.path);
             }
 
-            if (sharesBfWith != null)
-            {
-                closeBf &= sharesBfWith.bf != bf;
-                closeSummary &= sharesBfWith.indexSummary != indexSummary;
-                closeFiles &= sharesBfWith.dfile != dfile;
-                deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
-            }
-
             boolean deleteAll = false;
             if (release && isCompacted.get())
             {
                 assert replacedBy == null;
-                if (replaces != null)
+                if (replaces != null && !deleteFiles)
                 {
                     replaces.replacedBy = null;
                     replaces.deletingTask = deletingTask;
@@ -936,19 +927,6 @@ public class SSTableReader extends SSTable
         }
     }
 
-    /**
-     * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
-     *
-     * note that the reason we don't use replacedBy is that we are not yet actually replaced
-     *
-     * @param newReader
-     */
-    public void sharesBfWith(SSTableReader newReader)
-    {
-        assert openReason.equals(OpenReason.EARLY);
-        this.sharesBfWith = newReader;
-    }
-
     public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
     {
         synchronized (replaceLock)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index cd9435d..43ac4b6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -17,18 +17,11 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
-import com.google.common.collect.Lists;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -38,7 +31,6 @@ import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
@@ -84,8 +76,10 @@ public class SSTableRewriter
     private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening
of the target file
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last
(re)opened at
 
-    private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>();
// the 'finished' tmplink sstables
-    private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters =
new ArrayList<>();
+    private final Queue<Finished> finishedEarly = new ArrayDeque<>();
+    // as writers are closed from finishedEarly, their last readers are moved
+    // into discard, so that abort can cleanup after us safely
+    private final List<SSTableReader> discard = new ArrayList<>();
     private final boolean isOffline; // true for operations that are performed without Cassandra
running (prevents updates of DataTracker)
 
     private SSTableWriter writer;
@@ -183,42 +177,30 @@ public class SSTableRewriter
     public void abort()
     {
         switchWriter(null);
-
         moveStarts(null, Functions.forMap(originalStarts), true);
 
-        List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
-
-        for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
-        {
-            // we should close the bloom filter if we have not opened an sstable reader from
this
-            // writer (it will get closed when we release the sstable reference below):
-            w.left.abort(w.right == null);
-            if (isOffline && w.right != null)
-            {
-                // the pairs get removed from finishedWriters when they are closedAndOpened
in finish(), the ones left need to be removed here:
-                w.right.markObsolete();
-                w.right.releaseReference();
-            }
-        }
-
-        // also remove already completed SSTables
-        for (SSTableReader sstable : close)
-            sstable.markObsolete();
-
+        // remove already completed SSTables
         for (SSTableReader sstable : finished)
         {
             sstable.markObsolete();
             sstable.releaseReference();
         }
 
-        // releases reference in replaceReaders
-        if (!isOffline)
+        // abort the writers
+        for (Finished finished : finishedEarly)
         {
-            dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList());
-            dataTracker.unmarkCompacting(close);
+            boolean opened = finished.reader != null;
+            finished.writer.abort(!opened);
+            if (opened)
+            {
+                // if we've already been opened, add ourselves to the discard pile
+                discard.add(finished.reader);
+                finished.reader.markObsolete();
+            }
         }
-    }
 
+        replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+    }
 
     /**
      * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache
that is no longer
@@ -274,8 +256,6 @@ public class SSTableRewriter
         rewriting.addAll(replaceWith);
     }
 
-
-
     private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
     {
         if (isOffline)
@@ -301,15 +281,19 @@ public class SSTableRewriter
             writer = newWriter;
             return;
         }
-        // we leave it as a tmp file, but we open it early and add it to the dataTracker
-        SSTableReader reader = writer.openEarly(maxAge);
-        if (reader != null)
+
+        // we leave it as a tmp file, but we open it and add it to the dataTracker
+        if (writer.getFilePointer() != 0)
         {
-            finishedOpenedEarly.add(reader);
+            SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge,
-1);
             replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
             moveStarts(reader, Functions.constant(reader.last), false);
+            finishedEarly.add(new Finished(writer, reader));
+        }
+        else
+        {
+            writer.abort();
         }
-        finishedWriters.add(Pair.create(writer, reader));
         currentlyOpenedEarly = null;
         currentlyOpenedEarlyAt = 0;
         writer = newWriter;
@@ -337,85 +321,82 @@ public class SSTableRewriter
      */
     public List<SSTableReader> finish(long repairedAt)
     {
-        List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
+        return finishAndMaybeThrow(repairedAt, false, false);
+    }
+
+    @VisibleForTesting
+    void finishAndThrow(boolean throwEarly)
+    {
+        finishAndMaybeThrow(-1, throwEarly, !throwEarly);
+    }
+
+    private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly,
boolean throwLate)
+    {
+        List<SSTableReader> newReaders = new ArrayList<>();
         switchWriter(null);
-        // make real sstables of the written ones:
-        Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
-        while(it.hasNext())
+
+        if (throwEarly)
+            throw new RuntimeException("exception thrown early in finish, for testing");
+
+        while (!finishedEarly.isEmpty())
         {
-            Pair<SSTableWriter, SSTableReader> w = it.next();
-            if (w.left.getFilePointer() > 0)
+            Finished f = finishedEarly.poll();
+            if (f.writer.getFilePointer() > 0)
             {
-                SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge)
: w.left.closeAndOpenReader(maxAge, repairedAt);
-                finished.add(newReader);
+                if (f.reader != null)
+                    discard.add(f.reader);
 
-                if (w.right != null)
-                {
-                    w.right.sharesBfWith(newReader);
-                    if (isOffline)
-                    {
-                        // remove the tmplink files if we are offline - no one is using them
-                        w.right.markObsolete();
-                        w.right.releaseReference();
-                    }
-                }
-                // w.right is the tmplink-reader we added when switching writer, replace
with the real sstable.
-                toReplace.add(Pair.create(w.right, newReader));
+                SSTableReader newReader = f.writer.finish(SSTableWriter.FinishType.FINISH_EARLY,
maxAge, repairedAt);
+
+                if (f.reader != null)
+                    f.reader.setReplacedBy(newReader);
+
+                finished.add(newReader);
+                newReaders.add(newReader);
             }
             else
             {
-                assert w.right == null;
-                w.left.abort(true);
+                f.writer.abort(true);
+                assert f.reader == null;
             }
-            it.remove();
         }
 
-        if (!isOffline)
-        {
-            for (Pair<SSTableReader, SSTableReader> replace : toReplace)
-                replaceEarlyOpenedFile(replace.left, replace.right);
-            dataTracker.unmarkCompacting(finished);
-        }
+        if (throwLate)
+            throw new RuntimeException("exception thrown after all sstables finished, for
testing");
+
+        replaceWithFinishedReaders(newReaders);
         return finished;
     }
 
-    @VisibleForTesting
-    void finishAndThrow(boolean early)
+    // cleanup all our temporary readers and swap in our new ones
+    private void replaceWithFinishedReaders(List<SSTableReader> finished)
     {
-        List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
-        switchWriter(null);
-        if (early)
-            throw new RuntimeException("exception thrown early in finish");
-        // make real sstables of the written ones:
-        Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
-        while(it.hasNext())
+        if (isOffline)
         {
-            Pair<SSTableWriter, SSTableReader> w = it.next();
-            if (w.left.getFilePointer() > 0)
-            {
-                SSTableReader newReader = w.left.closeAndOpenReader(maxAge);
-                finished.add(newReader);
-
-                if (w.right != null)
-                {
-                    w.right.sharesBfWith(newReader);
-                    if (isOffline)
-                    {
-                        w.right.markObsolete();
-                        w.right.releaseReference();
-                    }
-                }
-                // w.right is the tmplink-reader we added when switching writer, replace
with the real sstable.
-                toReplace.add(Pair.create(w.right, newReader));
-            }
-            else
+            for (SSTableReader reader : discard)
             {
-                assert w.right == null;
-                w.left.abort(true);
+                if (reader.getCurrentReplacement() == null)
+                    reader.markObsolete();
+                reader.releaseReference();
             }
-            it.remove();
         }
+        else
+        {
+            dataTracker.replaceEarlyOpenedFiles(discard, finished);
+            dataTracker.unmarkCompacting(discard);
+        }
+        discard.clear();
+    }
 
-        throw new RuntimeException("exception thrown after all sstables finished");
+    private static final class Finished
+    {
+        final SSTableWriter writer;
+        final SSTableReader reader;
+
+        private Finished(SSTableWriter writer, SSTableReader reader)
+        {
+            this.writer = writer;
+            this.reader = reader;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ec64561..595012d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -380,6 +380,18 @@ public class SSTableWriter extends SSTable
         last = lastWrittenKey = getMinimalKey(last);
     }
 
+    private Descriptor makeTmpLinks()
+    {
+        // create temp links if they don't already exist
+        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+        {
+            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
new File(link.filenameFor(Component.PRIMARY_INDEX)));
+            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new
File(link.filenameFor(Component.DATA)));
+        }
+        return link;
+    }
+
     public SSTableReader openEarly(long maxDataAge)
     {
         StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
@@ -391,17 +403,10 @@ public class SSTableWriter extends SSTable
         if (exclusiveUpperBoundOfReadableIndex == null)
             return null;
 
-        // create temp links if they don't already exist
-        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
-        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
-        {
-            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
new File(link.filenameFor(Component.PRIMARY_INDEX)));
-            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new
File(link.filenameFor(Component.DATA)));
-        }
-
+        Descriptor link = makeTmpLinks();
         // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable
for other consumers
-        SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
-        SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
+        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX),
FinishType.EARLY);
+        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
         SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
                                                            components, metadata,
                                                            partitioner, ifile,
@@ -435,6 +440,19 @@ public class SSTableWriter extends SSTable
         return sstable;
     }
 
+    public static enum FinishType
+    {
+        NORMAL(SSTableReader.OpenReason.NORMAL),
+        EARLY(SSTableReader.OpenReason.EARLY), // no renaming
+        FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish
+        final SSTableReader.OpenReason openReason;
+
+        FinishType(SSTableReader.OpenReason openReason)
+        {
+            this.openReason = openReason;
+        }
+    }
+
     public SSTableReader closeAndOpenReader()
     {
         return closeAndOpenReader(System.currentTimeMillis());
@@ -442,68 +460,84 @@ public class SSTableWriter extends SSTable
 
     public SSTableReader closeAndOpenReader(long maxDataAge)
     {
-        return closeAndOpenReader(maxDataAge, this.repairedAt);
+        return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
     }
 
-    public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
+    public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
     {
-        Pair<Descriptor, StatsMetadata> p = close(repairedAt);
-        Descriptor newdesc = p.left;
-        StatsMetadata sstableMetadata = p.right;
+        Pair<Descriptor, StatsMetadata> p;
+
+        p = close(finishType, repairedAt);
+        Descriptor desc = p.left;
+        StatsMetadata metadata = p.right;
+
+        if (finishType == FinishType.EARLY)
+            desc = makeTmpLinks();
 
         // finalize in-memory state for the reader
-        SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
-        SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
-        SSTableReader sstable = SSTableReader.internalOpen(newdesc,
+        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX),
finishType);
+        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
+        SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
                                                            components,
-                                                           metadata,
+                                                           this.metadata,
                                                            partitioner,
                                                            ifile,
                                                            dfile,
                                                            iwriter.summary.build(partitioner),
                                                            iwriter.bf,
                                                            maxDataAge,
-                                                           sstableMetadata,
-                                                           SSTableReader.OpenReason.NORMAL);
+                                                           metadata,
+                                                           finishType.openReason);
         sstable.first = getMinimalKey(first);
         sstable.last = getMinimalKey(last);
-        // try to save the summaries to disk
-        sstable.saveSummary(iwriter.builder, dbuilder);
-        iwriter = null;
-        dbuilder = null;
+
+        switch (finishType)
+        {
+            case NORMAL: case FINISH_EARLY:
+            // try to save the summaries to disk
+            sstable.saveSummary(iwriter.builder, dbuilder);
+            iwriter = null;
+            dbuilder = null;
+        }
         return sstable;
     }
 
     // Close the writer and return the descriptor to the new sstable and it's metadata
     public Pair<Descriptor, StatsMetadata> close()
     {
-        return close(this.repairedAt);
+        return close(FinishType.NORMAL, this.repairedAt);
     }
 
-    private Pair<Descriptor, StatsMetadata> close(long repairedAt)
+    private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
     {
+        switch (type)
+        {
+            case EARLY: case NORMAL:
+            iwriter.close();
+            dataFile.close();
+        }
 
-        // index and filter
-        iwriter.close();
-        // main data, close will truncate if necessary
-        dataFile.close();
-        dataFile.writeFullChecksum(descriptor);
         // write sstable statistics
-        Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
-                                                                                    partitioner.getClass().getCanonicalName(),
-                                                                                    metadata.getBloomFilterFpChance(),
-                                                                                    repairedAt);
-        writeMetadata(descriptor, metadataComponents);
-
-        // save the table of components
-        SSTable.appendTOC(descriptor, components);
+        Map<MetadataType, MetadataComponent> metadataComponents ;
+        metadataComponents = sstableMetadataCollector
+                             .finalizeMetadata(partitioner.getClass().getCanonicalName(),
+                                               metadata.getBloomFilterFpChance(),repairedAt);
 
         // remove the 'tmp' marker from all components
-        return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+        Descriptor descriptor = this.descriptor;
+        switch (type)
+        {
+            case NORMAL: case FINISH_EARLY:
+            dataFile.writeFullChecksum(descriptor);
+            writeMetadata(descriptor, metadataComponents);
+            // save the table of components
+            SSTable.appendTOC(descriptor, components);
+            descriptor = rename(descriptor, components);
+        }
 
+        return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
     }
 
-
     private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent>
components)
     {
         SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index b284f61..57f465f 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
 
 import java.io.File;
 
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
 public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
 {
     public BufferedPoolingSegmentedFile(String path, long length)
@@ -33,16 +35,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
             long length = new File(path).length();
             return new BufferedPoolingSegmentedFile(path, length);
         }
-
-        public SegmentedFile openEarly(String path)
-        {
-            return complete(path);
-        }
     }
 
     protected RandomAccessReader createReader(String path)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index aa031e3..2f715da 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
 
 import java.io.File;
 
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
 public class BufferedSegmentedFile extends SegmentedFile
 {
     public BufferedSegmentedFile(String path, long length)
@@ -33,16 +35,11 @@ public class BufferedSegmentedFile extends SegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
             long length = new File(path).length();
             return new BufferedSegmentedFile(path, length);
         }
-
-        public SegmentedFile openEarly(String path)
-        {
-            return complete(path);
-        }
     }
 
     public FileDataInput getSegment(long position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 1803e69..11d091a 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 
 public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
 {
@@ -43,17 +44,11 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile
impleme
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
-            return new CompressedPoolingSegmentedFile(path, metadata(path, false));
-        }
-
-        public SegmentedFile openEarly(String path)
-        {
-            return new CompressedPoolingSegmentedFile(path, metadata(path, true));
+            return new CompressedPoolingSegmentedFile(path, metadata(path, finishType));
         }
     }
-
     protected RandomAccessReader createReader(String path)
     {
         return CompressedRandomAccessReader.open(path, metadata, this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 4afe0a0..b788715 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 
 public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
 {
@@ -44,24 +45,17 @@ public class CompressedSegmentedFile extends SegmentedFile implements
ICompresse
             // only one segment in a standard-io file
         }
 
-        protected CompressionMetadata metadata(String path, boolean early)
+        protected CompressionMetadata metadata(String path, SSTableWriter.FinishType finishType)
         {
             if (writer == null)
                 return CompressionMetadata.create(path);
-            else if (early)
-                return writer.openEarly();
-            else
-                return writer.openAfterClose();
-        }
 
-        public SegmentedFile complete(String path)
-        {
-            return new CompressedSegmentedFile(path, metadata(path, false));
+            return writer.open(finishType);
         }
 
-        public SegmentedFile openEarly(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
-            return new CompressedSegmentedFile(path, metadata(path, true));
+            return new CompressedSegmentedFile(path, metadata(path, finishType));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index ccc03fc..3b2cc98 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 public class MmappedSegmentedFile extends SegmentedFile
@@ -160,18 +161,13 @@ public class MmappedSegmentedFile extends SegmentedFile
             }
         }
 
-        public SegmentedFile complete(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
             long length = new File(path).length();
             // create the segments
             return new MmappedSegmentedFile(path, length, createSegments(path));
         }
 
-        public SegmentedFile openEarly(String path)
-        {
-            return complete(path);
-        }
-
         private Segment[] createSegments(String path)
         {
             RandomAccessFile raf;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index be549a6..badae56 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -115,13 +116,12 @@ public abstract class SegmentedFile
          * Called after all potential boundaries have been added to apply this Builder to
a concrete file on disk.
          * @param path The file on disk.
          */
-        public abstract SegmentedFile complete(String path);
+        public abstract SegmentedFile complete(String path, SSTableWriter.FinishType openType);
 
-        /**
-         * Called after all potential boundaries have been added to apply this Builder to
a concrete file on disk.
-         * @param path The file on disk.
-         */
-        public abstract SegmentedFile openEarly(String path);
+        public SegmentedFile complete(String path)
+        {
+            return complete(path, SSTableWriter.FinishType.NORMAL);
+        }
 
         public void serializeBounds(DataOutput out) throws IOException
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 6f9acea..392936d 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -483,7 +483,6 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.disableAutoCompaction();
 
         SSTableReader s = writeFile(cfs, 400);
-        DecoratedKey origFirst = s.first;
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(1000000);
@@ -499,8 +498,7 @@ public class SSTableRewriterTest extends SchemaLoader
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
                 if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
                 {
-                    assertEquals(1, cfs.getSSTables().size()); // we dont open small files
early ...
-                    assertEquals(origFirst, cfs.getSSTables().iterator().next().first); //
... and the first key should stay the same
+                    assertEquals(files, cfs.getSSTables().size()); // all files are now opened
early
                     rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
                     files++;
                 }
@@ -616,6 +614,73 @@ public class SSTableRewriterTest extends SchemaLoader
 
     }
 
+    @Test
+    public void testAllKeysReadable() throws Exception
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        for (int i = 0; i < 100; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i));
+            Mutation rm = new Mutation(KEYSPACE, key.getKey());
+            for (int j = 0; j < 10; j++)
+                rm.add(CF, Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER,
100);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+        cfs.forceMajorCompaction();
+        validateKeys(keyspace);
+
+        assertEquals(1, cfs.getSSTables().size());
+        SSTableReader s = cfs.getSSTables().iterator().next();
+        Set<SSTableReader> compacting = new HashSet<>();
+        compacting.add(s);
+        cfs.getDataTracker().markCompacting(compacting);
+
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter.overrideOpenInterval(1);
+        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+        rewriter.switchWriter(w);
+        int keyCount = 0;
+        try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        {
+            while (scanner.hasNext())
+            {
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (keyCount % 10 == 0)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                }
+                keyCount++;
+                validateKeys(keyspace);
+            }
+            try
+            {
+                cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(),
OperationType.COMPACTION);
+                cfs.getDataTracker().unmarkCompacting(compacting);
+            }
+            catch (Throwable t)
+            {
+                rewriter.abort();
+            }
+        }
+        validateKeys(keyspace);
+        Thread.sleep(1000);
+        validateCFS(cfs);
+    }
+
+    private void validateKeys(Keyspace ks)
+    {
+        for (int i = 0; i < 100; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i));
+            ColumnFamily cf = Util.getColumnFamily(ks, key, CF);
+            assertTrue(cf != null);
+        }
+    }
+
     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);


Mime
View raw message