cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/5] cassandra git commit: Avoid writing range tombstones after END_OF_ROW marker.
Date Wed, 02 Dec 2015 14:10:58 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 04e5f5d96 -> 51464631c


Avoid writing range tombstones after END_OF_ROW marker.

Patch by Branimir Lambov; reviewed by marcuse for CASSANDRA-10791


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b26ca68
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b26ca68
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b26ca68

Branch: refs/heads/trunk
Commit: 0b26ca68747cdecb907d7c238e04b39836efe3d1
Parents: 5175326
Author: Branimir Lambov <branimir.lambov@datastax.com>
Authored: Tue Dec 1 11:38:09 2015 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Wed Dec 2 14:59:11 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  16 ++--
 .../org/apache/cassandra/db/RangeTombstone.java |   5 ++
 .../cassandra/db/compaction/Scrubber.java       |  25 +++++-
 .../io/sstable/SSTableIdentityIterator.java     |  79 +++++++++++++++----
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +
 .../Keyspace1-Standard3-jb-1-Summary.db         | Bin 71 -> 63 bytes
 .../Keyspace1-StandardInteger1-ka-2-CRC.db      | Bin 0 -> 8 bytes
 .../Keyspace1-StandardInteger1-ka-2-Data.db     | Bin 0 -> 12357 bytes
 .../Keyspace1-StandardInteger1-ka-2-Digest.sha1 |   1 +
 .../Keyspace1-StandardInteger1-ka-2-Filter.db   | Bin 0 -> 176 bytes
 .../Keyspace1-StandardInteger1-ka-2-Index.db    | Bin 0 -> 108 bytes
 ...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes
 .../Keyspace1-StandardInteger1-ka-2-Summary.db  | Bin 0 -> 80 bytes
 .../Keyspace1-StandardInteger1-ka-2-TOC.txt     |   8 ++
 .../apache/cassandra/db/RowIndexEntryTest.java  |   1 +
 .../unit/org/apache/cassandra/db/ScrubTest.java |  57 ++++++++++++-
 .../streaming/StreamingTransferTest.java        |  46 ++++++++++-
 18 files changed, 215 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b0f9588..e00abfe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
  * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
  * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 0ea5c87..f63dfe1 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -147,6 +147,7 @@ public class ColumnIndex
                 add(tombstone);
                 tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
             }
+            finishAddingAtoms();
             ColumnIndex index = build();
 
             maybeWriteEmptyRowHeader();
@@ -167,6 +168,7 @@ public class ColumnIndex
                 OnDiskAtom c =  columns.next();
                 add(c);
             }
+            finishAddingAtoms();
 
             return build();
         }
@@ -218,15 +220,19 @@ public class ColumnIndex
             }
         }
 
-        public ColumnIndex build() throws IOException
+        public void finishAddingAtoms() throws IOException
         {
-            // all columns were GC'd after all
-            if (lastColumn == null)
-                return ColumnIndex.EMPTY;
-
             long size = tombstoneTracker.writeUnwrittenTombstones(output, atomSerializer);
             endPosition += size;
             blockSize += size;
+        }
+
+        public ColumnIndex build()
+        {
+            assert !tombstoneTracker.hasUnwrittenTombstones();  // finishAddingAtoms must
be called before building.
+            // all columns were GC'd after all
+            if (lastColumn == null)
+                return ColumnIndex.EMPTY;
 
             // the last column may have fallen on an index boundary already.  if not, index
it explicitly.
             if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 4d22d48..5e41792 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -325,6 +325,11 @@ public class RangeTombstone extends Interval<Composite, DeletionTime>
implements
             return false;
         }
 
+        public boolean hasUnwrittenTombstones()
+        {
+            return !unwrittenTombstones.isEmpty();
+        }
+
         /**
          * The tracker needs to track expired range tombstone but keep tracks that they are
          * expired, so this is what this class is used for.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 400df08..e02f901 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -155,6 +155,22 @@ public class Scrubber implements Closeable
                 if (scrubInfo.isStopRequested())
                     throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
 
+                updateIndexKey();
+
+                if (prevKey != null && indexFile != null)
+                {
+                    long nextRowStart = currentRowPositionFromIndex == -1 ? dataFile.length()
: currentRowPositionFromIndex;
+                    if (dataFile.getFilePointer() < nextRowStart)
+                    {
+                        // Encountered CASSANDRA-10791. Place post-END_OF_ROW data in the
out-of-order table.
+                        saveOutOfOrderRow(prevKey,
+                                          SSTableIdentityIterator.createFragmentIterator(sstable,
dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), validateColumns),
+                                          String.format("Row fragment detected after END_OF_ROW
at key %s", prevKey));
+                        if (dataFile.isEOF())
+                            break;
+                    }
+                }
+
                 long rowStart = dataFile.getFilePointer();
                 outputHandler.debug("Reading row at " + rowStart);
 
@@ -170,8 +186,6 @@ public class Scrubber implements Closeable
                     // check for null key below
                 }
 
-                updateIndexKey();
-
                 long dataStart = dataFile.getFilePointer();
 
                 long dataStartFromIndex = -1;
@@ -369,8 +383,13 @@ public class Scrubber implements Closeable
 
     private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator
atoms)
     {
+        saveOutOfOrderRow(key, atoms, String.format("Out of order row detected (%s found
after %s)", key, prevKey));
+    }
+
+    void saveOutOfOrderRow(DecoratedKey key, SSTableIdentityIterator atoms, String message)
+    {
         // TODO bitch if the row is too large?  if it is there's not much we can do ...
-        outputHandler.warn(String.format("Out of order row detected (%s found after %s)",
key, prevKey));
+        outputHandler.warn(message);
         // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to
do this very often
         // and there's no sense in failing on mis-sorted cells when a TreeMap could safe
us
         ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory,
false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 498ad26..45994d0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -20,9 +20,13 @@ package org.apache.cassandra.io.sstable;
 import java.io.*;
 import java.util.Iterator;
 
+import com.google.common.collect.AbstractIterator;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.sstable.Descriptor.Version;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.serializers.MarshalException;
 
@@ -66,6 +70,35 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
     }
 
+    /**
+     * Used only by scrubber to solve problems with data written after the END_OF_ROW marker.
Iterates atoms for the given dataSize only and does not accept an END_OF_ROW marker.
+     */
+    public static SSTableIdentityIterator createFragmentIterator(SSTableReader sstable, final
RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
+    {
+        final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL;
+        final CellNameType type = sstable.metadata.comparator;
+        final int expireBefore = (int) (System.currentTimeMillis() / 1000);
+        final Version version = sstable.descriptor.version;
+        final long dataEnd = file.getFilePointer() + dataSize;
+        return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, dataSize,
checkData, sstable, flag, DeletionTime.LIVE,
+                                           new AbstractIterator<OnDiskAtom>()
+                                                   {
+                                                       protected OnDiskAtom computeNext()
+                                                       {
+                                                           if (file.getFilePointer() >=
dataEnd)
+                                                               return endOfData();
+                                                           try
+                                                           {
+                                                               return type.onDiskAtomSerializer().deserializeFromSSTable(file,
flag, expireBefore, version);
+                                                           }
+                                                           catch (IOException e)
+                                                           {
+                                                               throw new IOError(e);
+                                                           }
+                                                       }
+                                                   });
+    }
+
     // sstable may be null *if* checkData is false
     // If it is null, we assume the data is in the current file format
     private SSTableIdentityIterator(CFMetaData metadata,
@@ -77,23 +110,15 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
                                     SSTableReader sstable,
                                     ColumnSerializer.Flag flag)
     {
-        assert !checkData || (sstable != null);
-        this.in = in;
-        this.filename = filename;
-        this.key = key;
-        this.dataSize = dataSize;
-        this.flag = flag;
-        this.validateColumns = checkData;
-        this.sstable = sstable;
-
-        Descriptor.Version dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version;
-        int expireBefore = (int) (System.currentTimeMillis() / 1000);
-        columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+        this(metadata, in, filename, key, dataSize, checkData, sstable, flag, readDeletionTime(in,
sstable, filename),
+             metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000),
sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version));
+    }
 
+    private static DeletionTime readDeletionTime(DataInput in, SSTableReader sstable, String
filename)
+    {
         try
         {
-            columnFamily.delete(DeletionTime.serializer.deserialize(in));
-            atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore,
dataVersion);
+            return DeletionTime.serializer.deserialize(in);
         }
         catch (IOException e)
         {
@@ -103,6 +128,32 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         }
     }
 
+    // sstable may be null *if* checkData is false
+    // If it is null, we assume the data is in the current file format
+    private SSTableIdentityIterator(CFMetaData metadata,
+                                    DataInput in,
+                                    String filename,
+                                    DecoratedKey key,
+                                    long dataSize,
+                                    boolean checkData,
+                                    SSTableReader sstable,
+                                    ColumnSerializer.Flag flag,
+                                    DeletionTime deletion,
+                                    Iterator<OnDiskAtom> atomIterator)
+    {
+        assert !checkData || (sstable != null);
+        this.in = in;
+        this.filename = filename;
+        this.key = key;
+        this.dataSize = dataSize;
+        this.flag = flag;
+        this.validateColumns = checkData;
+        this.sstable = sstable;
+        columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+        columnFamily.delete(deletion);
+        this.atomIterator = atomIterator;
+    }
+
     public DecoratedKey getKey()
     {
         return key;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 8e0b5f7..8620f30 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -319,6 +320,7 @@ public class SSTableWriter extends SSTable
 
                 columnIndexer.add(atom); // This write the atom on disk too
             }
+            columnIndexer.finishAddingAtoms();
 
             columnIndexer.maybeWriteEmptyRowHeader();
             dataFile.stream.writeShort(END_OF_ROW);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
index 376ca9d..7621f07 100644
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db and b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
new file mode 100644
index 0000000..fc23cfe
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
new file mode 100644
index 0000000..a4157d3
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1 b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
new file mode 100644
index 0000000..fb42fa9
--- /dev/null
+++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
@@ -0,0 +1 @@
+3265926428
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
new file mode 100644
index 0000000..eb0ae30
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
new file mode 100644
index 0000000..69a2fce
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
new file mode 100644
index 0000000..1cba196
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
new file mode 100644
index 0000000..22cfa6a
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
new file mode 100644
index 0000000..503f64d
--- /dev/null
+++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
@@ -0,0 +1,8 @@
+Digest.sha1
+Summary.db
+Filter.db
+Index.db
+Statistics.db
+Data.db
+CRC.db
+TOC.txt

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 237573e..ce58e11 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -58,6 +58,7 @@ public class RowIndexEntryTest extends SchemaLoader
                 add(column);
             }
             while (size < DatabaseDescriptor.getColumnIndexSize() * 3);
+            finishAddingAtoms();
 
         }}.build();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index f8acd22..167671b 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -38,7 +38,9 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.UUIDGen;
+
 import org.apache.commons.lang3.StringUtils;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,6 +52,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -60,7 +63,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.cellname;
 import static org.apache.cassandra.Util.column;
-
 import static junit.framework.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -362,6 +364,59 @@ public class ScrubTest extends SchemaLoader
         assert rows.size() == 6 : "Got " + rows.size();
     }
 
+    @Test
+    public void testScrub10791() throws Exception
+    {
+        // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791
fix disabled.
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        String columnFamily = "StandardInteger1";
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily);
+        cfs.clearUnsafe();
+
+        String root = System.getProperty("corrupt-sstable-root");
+        assert root != null;
+        File rootDir = new File(root);
+        assert rootDir.isDirectory();
+        Descriptor desc = new Descriptor(new Descriptor.Version("ka"), rootDir, KEYSPACE,
columnFamily, 2, Descriptor.Type.FINAL);
+        CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
+
+        // open without validation for scrubbing
+        Set<Component> components = new HashSet<>();
+        components.add(Component.DATA);
+        components.add(Component.PRIMARY_INDEX);
+        components.add(Component.FILTER);
+        components.add(Component.STATS);
+        components.add(Component.SUMMARY);
+        components.add(Component.TOC);
+        SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
+
+        Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
+        scrubber.scrub();
+
+        cfs.loadNewSSTables();
+        assertEquals(7, countCells(cfs));
+    }
+
+    private int countCells(ColumnFamilyStore cfs)
+    {
+        int cellCount = 0;
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+            while (it.hasNext())
+            {
+                Iterator<OnDiskAtom> itr = it.next();
+                while (itr.hasNext())
+                {
+                    ++cellCount;
+                    itr.next();
+                }
+            }
+        }
+        return cellCount;
+    }
+
     private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2)
throws IOException
     {
         boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression",
"false"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 06ebdd3..31dc492 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -26,9 +26,11 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.Scrubber;
+import org.apache.cassandra.db.compaction.Scrubber.ScrubResult;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -51,8 +56,9 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
-
+import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.concurrent.Refs;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.apache.cassandra.Util.cellname;
@@ -265,7 +271,7 @@ public class StreamingTransferTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open(ks);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 
-        String key = "key1";
+        String key = "key0";
         Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
         // add columns of size slightly less than column_index_size to force insert column
index
         rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()
- 64]), 2);
@@ -274,9 +280,21 @@ public class StreamingTransferTest extends SchemaLoader
         // add RangeTombstones
         cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int)
(System.currentTimeMillis() / 1000)));
         cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int)
(System.currentTimeMillis() / 1000)));
+        cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int)
(System.currentTimeMillis() / 1000)));
+        rm.apply();
+
+        key = "key1";
+        rm = new Mutation(ks, ByteBufferUtil.bytes(key));
+        // add columns of size slightly less than column_index_size to force insert column
index
+        rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()
- 64]), 2);
+        cf = rm.addOrGet(cfname);
+        // add RangeTombstones
+        cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int)
(System.currentTimeMillis() / 1000)));
         rm.apply();
+
         cfs.forceBlockingFlush();
 
+        int cellCount = countCells(cfs);
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         cfs.clearUnsafe();
         transferSSTables(sstable);
@@ -284,8 +302,30 @@ public class StreamingTransferTest extends SchemaLoader
         // confirm that a single SSTable was transferred and registered
         assertEquals(1, cfs.getSSTables().size());
 
+        // Verify table
+        assertEquals(cellCount, countCells(cfs));
+
         List<Row> rows = Util.getRangeSlice(cfs);
-        assertEquals(1, rows.size());
+        assertEquals(2, rows.size());
+    }
+
+    private int countCells(ColumnFamilyStore cfs)
+    {
+        int cellCount = 0;
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+            while (it.hasNext())
+            {
+                Iterator<OnDiskAtom> itr = it.next();
+                while (itr.hasNext())
+                {
+                    ++cellCount;
+                    itr.next();
+                }
+            }
+        }
+        return cellCount;
     }
 
     @Test


Mime
View raw message