cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject git commit: Let scrub optionally skip broken counter partitions
Date Mon, 03 Feb 2014 20:02:28 GMT
Updated Branches:
  refs/heads/cassandra-2.0 b71372146 -> 728c4fa9b


Let scrub optionally skip broken counter partitions

patch by Tyler Hobbs; reviewed by Aleksey Yeschenko for CASSANDRA-5930


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/728c4fa9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/728c4fa9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/728c4fa9

Branch: refs/heads/cassandra-2.0
Commit: 728c4fa9bf2b2c11dbc61c8e5536b1542abc1ccb
Parents: b713721
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Mon Feb 3 23:01:31 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Mon Feb 3 23:01:31 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +
 NEWS.txt                                        | 12 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  4 +-
 .../db/compaction/CompactionManager.java        | 12 +--
 .../cassandra/db/compaction/Scrubber.java       | 37 ++++++---
 .../cassandra/service/StorageService.java       |  4 +-
 .../cassandra/service/StorageServiceMBean.java  |  2 +-
 .../org/apache/cassandra/tools/NodeCmd.java     |  6 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  4 +-
 .../cassandra/tools/StandaloneScrubber.java     |  6 +-
 .../apache/cassandra/tools/NodeToolHelp.yaml    |  6 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java | 81 ++++++++++++++++++--
 12 files changed, 140 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 13b4c5b..a1a58a3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+2.0.6
+ * Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
+
+
 2.0.5
  * Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
  * Add ks.cf names to tombstone logging (CASSANDRA-6597)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 92446c8..b21fbaa 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -14,11 +14,21 @@ restore snapshots created with the previous major version using the
 using the provided 'sstableupgrade' tool.
 
 
+2.0.6
+=====
+
+New features
+------------
+    - Scrub can now optionally skip corrupt counter partitions. Please note
+      that this will lead to the loss of all the counter updates in the skipped
+      partition. See the --skip-corrupted option.
+
+
 2.0.5
 =====
 
 New features
---------
+------------
     - Batchlog replay can be, and is throttled by default now.
       See batchlog_replay_throttle_in_kb setting in cassandra.yaml.
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 8750026..38d87db 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1115,12 +1115,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         CompactionManager.instance.performCleanup(ColumnFamilyStore.this, renewer);
     }
 
-    public void scrub(boolean disableSnapshot) throws ExecutionException, InterruptedException
+    public void scrub(boolean disableSnapshot, boolean skipCorrupted) throws ExecutionException,
InterruptedException
     {
         // skip snapshot creation during scrub, SEE JIRA 5891
         if(!disableSnapshot)
             snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
-        CompactionManager.instance.performScrub(ColumnFamilyStore.this);
+        CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted);
     }
 
     public void sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException,
InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 168ee02..48900c8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -227,13 +227,13 @@ public class CompactionManager implements CompactionManagerMBean
         executor.submit(runnable).get();
     }
 
-    public void performScrub(ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
+    public void performScrub(ColumnFamilyStore cfStore, final boolean skipCorrupted) throws
InterruptedException, ExecutionException
     {
         performAllSSTableOperation(cfStore, new AllSSTablesOperation()
         {
             public void perform(ColumnFamilyStore store, Iterable<SSTableReader> sstables)
throws IOException
             {
-                doScrub(store, sstables);
+                doScrub(store, sstables, skipCorrupted);
             }
         });
     }
@@ -425,16 +425,16 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * @throws IOException
      */
-    private void doScrub(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables) throws
IOException
+    private void doScrub(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, boolean
skipCorrupted) throws IOException
     {
         assert !cfs.isIndex();
         for (final SSTableReader sstable : sstables)
-            scrubOne(cfs, sstable);
+            scrubOne(cfs, sstable, skipCorrupted);
     }
 
-    private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
+    private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted)
throws IOException
     {
-        Scrubber scrubber = new Scrubber(cfs, sstable);
+        Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted);
 
         CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
         metrics.beginCompaction(scrubInfo);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 708e929..820761c 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -35,6 +35,7 @@ public class Scrubber implements Closeable
     public final ColumnFamilyStore cfs;
     public final SSTableReader sstable;
     public final File destination;
+    public final boolean skipCorrupted;
 
     private final CompactionController controller;
     private final boolean isCommutative;
@@ -63,16 +64,17 @@ public class Scrubber implements Closeable
     };
     private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
-    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
+    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted)
throws IOException
     {
-        this(cfs, sstable, new OutputHandler.LogOutput(), false);
+        this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), false);
     }
 
-    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler,
boolean isOffline) throws IOException
+    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted,
OutputHandler outputHandler, boolean isOffline) throws IOException
     {
         this.cfs = cfs;
         this.sstable = sstable;
         this.outputHandler = outputHandler;
+        this.skipCorrupted = skipCorrupted;
 
         // Calculate the expected compacted filesize
         this.destination = cfs.directories.getDirectoryForNewSSTables();
@@ -166,7 +168,9 @@ public class Scrubber implements Closeable
                 if (!sstable.descriptor.version.hasRowSizeAndColumnCount)
                 {
                     dataSize = dataSizeFromIndex;
-                    outputHandler.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key),
dataSize));
+                    // avoid an NPE if key is null
+                    String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.key);
+                    outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize));
                 }
                 else
                 {
@@ -203,7 +207,7 @@ public class Scrubber implements Closeable
                 catch (Throwable th)
                 {
                     throwIfFatal(th);
-                    outputHandler.warn("Non-fatal error reading row (stacktrace follows)",
th);
+                    outputHandler.warn("Error reading row (stacktrace follows):", th);
                     writer.resetAndTruncate();
 
                     if (currentIndexKey != null
@@ -231,9 +235,7 @@ public class Scrubber implements Closeable
                         catch (Throwable th2)
                         {
                             throwIfFatal(th2);
-                            // Skipping rows is dangerous for counters (see CASSANDRA-2759)
-                            if (isCommutative)
-                                throw new IOError(th2);
+                            throwIfCommutative(key, th2);
 
                             outputHandler.warn("Retry failed too. Skipping to next row (retry's
stacktrace follows)", th2);
                             writer.resetAndTruncate();
@@ -243,11 +245,9 @@ public class Scrubber implements Closeable
                     }
                     else
                     {
-                        // Skipping rows is dangerous for counters (see CASSANDRA-2759)
-                        if (isCommutative)
-                            throw new IOError(th);
+                        throwIfCommutative(key, th);
 
-                        outputHandler.warn("Row at " + dataStart + " is unreadable; skipping
to next");
+                        outputHandler.warn("Row starting at position " + dataStart + " is
unreadable; skipping to next");
                         if (currentIndexKey != null)
                             dataFile.seek(nextRowPositionFromIndex);
                         badRows++;
@@ -324,6 +324,19 @@ public class Scrubber implements Closeable
             throw (Error) th;
     }
 
+    private void throwIfCommutative(DecoratedKey key, Throwable th)
+    {
+        if (isCommutative && !skipCorrupted)
+        {
+            outputHandler.warn(String.format("An error occurred while scrubbing the row with
key '%s'.  Skipping corrupt " +
+                                             "rows in counter tables will result in undercounts
for the affected " +
+                                             "counters (see CASSANDRA-2759 for more details),
so by default the scrub will " +
+                                             "stop at this point.  If you would like to skip
the row anyway and continue " +
+                                             "scrubbing, re-run the scrub with the --skip-corrupted
option.", key));
+            throw new IOError(th);
+        }
+    }
+
     public void close()
     {
         FileUtils.closeQuietly(dataFile);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 700966f..f46ae66 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2155,10 +2155,10 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         }
     }
 
-    public void scrub(boolean disableSnapshot, String keyspaceName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
+    public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName,
columnFamilies))
-            cfStore.scrub(disableSnapshot);
+            cfStore.scrub(disableSnapshot, skipCorrupted);
     }
 
     public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index df85901..d31e8b9 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -231,7 +231,7 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * Scrubbed CFs will be snapshotted first, if disableSnapshot is false
      */
-    public void scrub(boolean disableSnapshot, String keyspaceName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException;
+    public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
 
     /**
      * Rewrite all sstables to the latest version.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 0cc7320..ab05d16 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -74,6 +74,8 @@ public class NodeCmd
     private static final Pair<String, String> NO_SNAPSHOT = Pair.create("ns", "no-snapshot");
     private static final Pair<String, String> CFSTATS_IGNORE_OPT = Pair.create("i",
"ignore");
     private static final Pair<String, String> RESOLVE_IP = Pair.create("r", "resolve-ip");
+    private static final Pair<String, String> SCRUB_SKIP_CORRUPTED_OPT = Pair.create("s",
"skip-corrupted");
+
 
     private static final String DEFAULT_HOST = "127.0.0.1";
     private static final int DEFAULT_PORT = 7199;
@@ -101,6 +103,7 @@ public class NodeCmd
         options.addOption(NO_SNAPSHOT, false, "disables snapshot creation for scrub");
         options.addOption(CFSTATS_IGNORE_OPT, false, "ignore the supplied list of keyspace.columnfamiles
in statistics");
         options.addOption(RESOLVE_IP, false, "show node domain names instead of IPs");
+        options.addOption(SCRUB_SKIP_CORRUPTED_OPT, false, "when scrubbing counter tables,
skip corrupted rows");
     }
 
     public NodeCmd(NodeProbe probe)
@@ -1562,7 +1565,8 @@ public class NodeCmd
                     break;
                 case SCRUB :
                     boolean disableSnapshot = cmd.hasOption(NO_SNAPSHOT.left);
-                    try { probe.scrub(disableSnapshot, keyspace, columnFamilies); }
+                    boolean skipCorrupted = cmd.hasOption(SCRUB_SKIP_CORRUPTED_OPT.left);
+                    try { probe.scrub(disableSnapshot, skipCorrupted, keyspace, columnFamilies);
}
                     catch (ExecutionException ee) { err(ee, "Error occurred while scrubbing
keyspace " + keyspace); }
                     break;
                 case UPGRADESSTABLES :

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1bb9d4e..0fbb12a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -190,9 +190,9 @@ public class NodeProbe
         ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies);
     }
 
-    public void scrub(boolean disableSnapshot, String keyspaceName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
+    public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
-        ssProxy.scrub(disableSnapshot, keyspaceName, columnFamilies);
+        ssProxy.scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies);
     }
 
     public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 00e0a5a..6556c3a 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -49,6 +49,7 @@ public class StandaloneScrubber
     private static final String DEBUG_OPTION  = "debug";
     private static final String HELP_OPTION  = "help";
     private static final String MANIFEST_CHECK_OPTION  = "manifest-check";
+    private static final String SKIP_CORRUPTED_OPTION = "skip-corrupted";
 
     public static void main(String args[])
     {
@@ -119,7 +120,7 @@ public class StandaloneScrubber
                 {
                     try
                     {
-                        Scrubber scrubber = new Scrubber(cfs, sstable, handler, true);
+                        Scrubber scrubber = new Scrubber(cfs, sstable, options.skipCorrupted,
handler, true);
                         try
                         {
                             scrubber.scrub();
@@ -184,6 +185,7 @@ public class StandaloneScrubber
         public boolean debug;
         public boolean verbose;
         public boolean manifestCheckOnly;
+        public boolean skipCorrupted;
 
         private Options(String keyspaceName, String cfName)
         {
@@ -222,6 +224,7 @@ public class StandaloneScrubber
                 opts.debug = cmd.hasOption(DEBUG_OPTION);
                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
                 opts.manifestCheckOnly = cmd.hasOption(MANIFEST_CHECK_OPTION);
+                opts.skipCorrupted = cmd.hasOption(SKIP_CORRUPTED_OPTION);
 
                 return opts;
             }
@@ -246,6 +249,7 @@ public class StandaloneScrubber
             options.addOption("v",  VERBOSE_OPTION,        "verbose output");
             options.addOption("h",  HELP_OPTION,           "display this help message");
             options.addOption("m",  MANIFEST_CHECK_OPTION, "only check and repair the leveled
manifest, without actually scrubbing the sstables");
+            options.addOption("s",  SKIP_CORRUPTED_OPTION, "skip corrupt rows in counter
tables");
             return options;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
index 42fda0d..b28e300 100644
--- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
+++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
@@ -163,9 +163,11 @@ commands:
   - name: compact [keyspace] [cfnames]
     help: |
       Force a (major) compaction on one or more column families
-  - name: scrub [keyspace] [cfnames]
+  - name: scrub [keyspace] [cfnames] [-s|--skip-corrupted]
     help: |
-      Scrub (rebuild sstables for) one or more column families
+      Scrub (rebuild sstables for) one or more column families.
+         Use -s/--skip-corrupted to skip corrupted rows even when scrubbing
+         tables that use counters.
   - name: upgradesstables [-a|--include-all-sstables] [keyspace] [cfnames]
     help: |
       Rewrite sstables (for the requested column families) that are not on the current version
(thus upgrading them to said current version).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/728c4fa9/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index a83d3c6..08dd435 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -20,13 +20,15 @@ package org.apache.cassandra.db;
  *
  */
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -52,6 +54,7 @@ public class ScrubTest extends SchemaLoader
     public String KEYSPACE = "Keyspace1";
     public String CF = "Standard1";
     public String CF3 = "Standard2";
+    public String COUNTER_CF = "Counter1";
 
     @Test
     public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException,
ConfigurationException
@@ -68,7 +71,7 @@ public class ScrubTest extends SchemaLoader
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
         assertEquals(1, rows.size());
 
-        CompactionManager.instance.performScrub(cfs);
+        CompactionManager.instance.performScrub(cfs, false);
 
         // check data is still there
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -76,6 +79,53 @@ public class ScrubTest extends SchemaLoader
     }
 
     @Test
+    public void testScrubCorruptedCounterRow() throws IOException, InterruptedException,
ExecutionException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF);
+        cfs.clearUnsafe();
+
+        fillCounterCF(cfs, 2);
+
+        List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(),
1000);
+        assertEquals(2, rows.size());
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        // overwrite one row with garbage
+        long row0Start = sstable.getPosition(RowPosition.forKey(ByteBufferUtil.bytes("0"),
sstable.partitioner), SSTableReader.Operator.EQ).position;
+        long row1Start = sstable.getPosition(RowPosition.forKey(ByteBufferUtil.bytes("1"),
sstable.partitioner), SSTableReader.Operator.EQ).position;
+        long startPosition = row0Start < row1Start ? row0Start : row1Start;
+        long endPosition = row0Start < row1Start ? row1Start : row0Start;
+
+        RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw");
+        file.seek(startPosition);
+        file.writeBytes(StringUtils.repeat('z', (int) (endPosition - startPosition)));
+        file.close();
+
+        // with skipCorrupted == false, the scrub is expected to fail
+        Scrubber scrubber = new Scrubber(cfs, sstable, false);
+        try
+        {
+            scrubber.scrub();
+            fail("Expected a CorruptSSTableException to be thrown");
+        }
+        catch (IOError err) {}
+
+        // with skipCorrupted == true, the corrupt row will be skipped
+        scrubber = new Scrubber(cfs, sstable, true);
+        scrubber.scrub();
+        scrubber.close();
+        cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()),
OperationType.SCRUB);
+        assertEquals(1, cfs.getSSTables().size());
+
+        // verify that we can read all of the rows, and there is now one less row
+        rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
+        assertEquals(1, rows.size());
+    }
+
+    @Test
     public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException,
ConfigurationException
     {
         CompactionManager.instance.disableAutoCompaction();
@@ -89,7 +139,7 @@ public class ScrubTest extends SchemaLoader
         rm.applyUnsafe();
         cfs.forceBlockingFlush();
 
-        CompactionManager.instance.performScrub(cfs);
+        CompactionManager.instance.performScrub(cfs, false);
         assert cfs.getSSTables().isEmpty();
     }
 
@@ -108,7 +158,7 @@ public class ScrubTest extends SchemaLoader
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
         assertEquals(10, rows.size());
 
-        CompactionManager.instance.performScrub(cfs);
+        CompactionManager.instance.performScrub(cfs, false);
 
         // check data is still there
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -145,7 +195,6 @@ public class ScrubTest extends SchemaLoader
         writer.closeAndOpenReader();
         */
 
-
         String root = System.getProperty("corrupt-sstable-root");
         assert root != null;
         File rootDir = new File(root);
@@ -171,7 +220,7 @@ public class ScrubTest extends SchemaLoader
         components.add(Component.TOC);
         SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
 
-        Scrubber scrubber = new Scrubber(cfs, sstable);
+        Scrubber scrubber = new Scrubber(cfs, sstable, false);
         scrubber.scrub();
 
         cfs.loadNewSSTables();
@@ -207,4 +256,20 @@ public class ScrubTest extends SchemaLoader
 
         cfs.forceBlockingFlush();
     }
-}
+
+    protected void fillCounterCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException,
InterruptedException, IOException
+    {
+        for (int i = 0; i < rowsPerSSTable; i++)
+        {
+            String key = String.valueOf(i);
+            ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(KEYSPACE, COUNTER_CF);
+            RowMutation rm = new RowMutation(KEYSPACE, ByteBufferUtil.bytes(key), cf);
+            rm.addCounter(COUNTER_CF, ByteBufferUtil.bytes("Column1"), 100);
+            CounterMutation cm = new CounterMutation(rm, ConsistencyLevel.ONE);
+            cm.apply();
+        }
+
+        cfs.forceBlockingFlush();
+    }
+
+}
\ No newline at end of file


Mime
View raw message