cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [2/2] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Date Thu, 30 Apr 2015 18:16:26 GMT
Merge branch 'cassandra-2.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db127a10
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db127a10
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db127a10

Branch: refs/heads/cassandra-2.1
Commit: db127a1073cfec426019c06d8f33e6c44292949f
Parents: 80c5191 593a725
Author: Tyler Hobbs <tylerlhobbs@gmail.com>
Authored: Thu Apr 30 13:09:11 2015 -0500
Committer: Tyler Hobbs <tylerlhobbs@gmail.com>
Committed: Thu Apr 30 13:09:11 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/compaction/Scrubber.java       | 127 +++++++++++----
 .../unit/org/apache/cassandra/SchemaLoader.java |  17 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java | 156 +++++++++++++++++--
 4 files changed, 256 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/db127a10/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cb235a4,4e7a5d0..c063368
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,98 -1,11 +1,99 @@@
 -2.0.15:
 +2.1.6
 + * Fix PITR commitlog replay (CASSANDRA-9195)
 + * GCInspector logs very different times (CASSANDRA-9124)
 + * Fix deleting from an empty list (CASSANDRA-9198)
 + * Update tuple and collection types that use a user-defined type when that UDT
 +   is modified (CASSANDRA-9148, CASSANDRA-9192)
 +Merged from 2.0:
+  * Allow scrub to handle corrupted compressed chunks (CASSANDRA-9140)
   * Fix assertion error when resetlocalschema is run during repair (CASSANDRA-9249)
   * Disable single sstable tombstone compactions for DTCS by default (CASSANDRA-9234)
 - * Do more agressive ttl expiration checks to be able to
 -   drop more sstables (CASSANDRA-8243)
   * IncomingTcpConnection thread is not named (CASSANDRA-9262)
   * Close incoming connections when MessagingService is stopped (CASSANDRA-9238)
 +
 +2.1.5
 + * Re-add deprecated cold_reads_to_omit param for backwards compat (CASSANDRA-9203)
 + * Make anticompaction visible in compactionstats (CASSANDRA-9098)
 + * Improve nodetool getendpoints documentation about the partition
 +   key parameter (CASSANDRA-6458)
 + * Don't check other keyspaces for schema changes when an user-defined
 +   type is altered (CASSANDRA-9187)
 + * Allow takeColumnFamilySnapshot to take a list of tables (CASSANDRA-8348)
 + * Limit major sstable operations to their canonical representation (CASSANDRA-8669)
 + * cqlsh: Add tests for INSERT and UPDATE tab completion (CASSANDRA-9125)
 + * cqlsh: quote column names when needed in COPY FROM inserts (CASSANDRA-9080)
 + * Add generate-idea-files target to build.xml (CASSANDRA-9123)
 + * Do not load read meter for offline operations (CASSANDRA-9082)
 + * cqlsh: Make CompositeType data readable (CASSANDRA-8919)
 + * cqlsh: Fix display of triggers (CASSANDRA-9081)
 + * Fix NullPointerException when deleting or setting an element by index on
 +   a null list collection (CASSANDRA-9077)
 + * Buffer bloom filter serialization (CASSANDRA-9066)
 + * Fix anti-compaction target bloom filter size (CASSANDRA-9060)
 + * Make FROZEN and TUPLE unreserved keywords in CQL (CASSANDRA-9047)
 + * Prevent AssertionError from SizeEstimatesRecorder (CASSANDRA-9034)
 + * Avoid overwriting index summaries for sstables with an older format that
 +   does not support downsampling; rebuild summaries on startup when this
 +   is detected (CASSANDRA-8993)
 + * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949)
 + * Make PasswordAuthenticator number of hashing rounds configurable (CASSANDRA-8085)
 + * Fix AssertionError when binding nested collections in DELETE (CASSANDRA-8900)
 + * Check for overlap with non-early sstables in LCS (CASSANDRA-8739)
 + * Only calculate max purgable timestamp if we have to (CASSANDRA-8914)
 + * (cqlsh) Greatly improve performance of COPY FROM (CASSANDRA-8225)
 + * IndexSummary effectiveIndexInterval is now a guideline, not a rule (CASSANDRA-8993)
 + * Use correct bounds for page cache eviction of compressed files (CASSANDRA-8746)
 + * SSTableScanner enforces its bounds (CASSANDRA-8946)
 + * Cleanup cell equality (CASSANDRA-8947)
 + * Introduce intra-cluster message coalescing (CASSANDRA-8692)
 + * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839)
 + * Don't check if an sstable is live for offline compactions (CASSANDRA-8841)
 + * Don't set clientMode in SSTableLoader (CASSANDRA-8238)
 + * Fix SSTableRewriter with disabled early open (CASSANDRA-8535)
 + * Allow invalidating permissions and cache time (CASSANDRA-8722)
 + * Log warning when queries that will require ALLOW FILTERING in Cassandra 3.0
 +   are executed (CASSANDRA-8418)
 + * Fix cassandra-stress so it respects the CL passed in user mode (CASSANDRA-8948)
 + * Fix rare NPE in ColumnDefinition#hasIndexOption() (CASSANDRA-8786)
 + * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769)
 + * Use long for key count in cfstats (CASSANDRA-8913)
 + * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
 + * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
 + * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883)
 + * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)
 + * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067)
 + * Pick sstables for validation as late as possible inc repairs (CASSANDRA-8366)
 + * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856)
 + * Fix parallelism adjustment in range and secondary index queries
 +   when the first fetch does not satisfy the limit (CASSANDRA-8856)
 + * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843)
 + * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842)
 + * Fix CommitLog.forceRecycleAllSegments() memory access error (CASSANDRA-8812)
 + * Improve assertions in Memory (CASSANDRA-8792)
 + * Fix SSTableRewriter cleanup (CASSANDRA-8802)
 + * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
 + * 'nodetool info' prints exception against older node (CASSANDRA-8796)
 + * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750)
 + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
 + * Enforce SSTableReader.first/last (CASSANDRA-8744)
 + * Cleanup SegmentedFile API (CASSANDRA-8749)
 + * Avoid overlap with early compaction replacement (CASSANDRA-8683)
 + * Safer Resource Management++ (CASSANDRA-8707)
 + * Write partition size estimates into a system table (CASSANDRA-7688)
 + * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
 +   (CASSANDRA-8154)
 + * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
 + * IndexSummaryBuilder utilises offheap memory, and shares data between
 +   each IndexSummary opened from it (CASSANDRA-8757)
 + * markCompacting only succeeds if the exact SSTableReader instances being 
 +   marked are in the live set (CASSANDRA-8689)
 + * cassandra-stress support for varint (CASSANDRA-8882)
 + * Fix Adler32 digest for compressed sstables (CASSANDRA-8778)
 + * Add nodetool statushandoff/statusbackup (CASSANDRA-8912)
 + * Use stdout for progress and stats in sstableloader (CASSANDRA-8982)
 + * Correctly identify 2i datadir from older versions (CASSANDRA-9116)
 +Merged from 2.0:
 + * Ignore gossip SYNs after shutdown (CASSANDRA-9238)
   * Avoid overflow when calculating max sstable size in LCS (CASSANDRA-9235)
   * Make sstable blacklisting work with compression (CASSANDRA-9138)
   * Do not attempt to rebuild indexes if no index accepts any column (CASSANDRA-9196)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db127a10/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 2f53ab9,1752b21..1f5c7de
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -21,8 -21,8 +21,9 @@@ import java.nio.ByteBuffer
  import java.io.*;
  import java.util.*;
  
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Throwables;
 +import com.google.common.collect.Sets;
  
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.io.sstable.*;
@@@ -109,14 -113,12 +118,14 @@@ public class Scrubber implements Closea
      public void scrub()
      {
          outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
 +        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
 +        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge,
isOffline);
          try
          {
-             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+             nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
              {
                  // throw away variable so we don't have a side effect in the assert
 -                long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile,
sstable.descriptor.version).position;
 +                long firstRowPositionFromIndex = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile,
sstable.descriptor.version).position;
                  assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
              }
  
@@@ -143,33 -152,32 +153,22 @@@
                      // check for null key below
                  }
  
-                 ByteBuffer currentIndexKey = nextIndexKey;
-                 long nextRowPositionFromIndex;
-                 try
-                 {
-                     nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
-                     nextRowPositionFromIndex = indexFile.isEOF()
-                                              ? dataFile.length()
-                                              : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile,
sstable.descriptor.version).position;
-                 }
-                 catch (Throwable th)
-                 {
-                     JVMStabilityInspector.inspectThrowable(th);
-                     outputHandler.warn("Error reading index file", th);
-                     nextIndexKey = null;
-                     nextRowPositionFromIndex = dataFile.length();
-                 }
+                 updateIndexKey();
  
                  long dataStart = dataFile.getFilePointer();
-                 long dataStartFromIndex = currentIndexKey == null
-                                         ? -1
-                                         : rowStart + 2 + currentIndexKey.remaining();
-                 long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
++
+                 long dataStartFromIndex = -1;
+                 long dataSizeFromIndex = -1;
+                 if (currentIndexKey != null)
+                 {
+                     dataStartFromIndex = currentRowPositionFromIndex + 2 + currentIndexKey.remaining();
 -                    if (sstable.descriptor.version.hasRowSizeAndColumnCount)
 -                        dataStartFromIndex += 8;
 -
+                     dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+                 }
  
 -                if (!sstable.descriptor.version.hasRowSizeAndColumnCount)
 -                {
 -                    dataSize = dataSizeFromIndex;
 -                    // avoid an NPE if key is null
 -                    String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.key);
 -                    outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize));
 -                }
 -                else
 -                {
 -                    if (currentIndexKey != null)
 -                        outputHandler.debug(String.format("Index doublecheck: row %s is
%s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
 -                }
 +                dataSize = dataSizeFromIndex;
 +                // avoid an NPE if key is null
 +                String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey());
 +                outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize));
  
                  assert currentIndexKey != null || indexFile.isEOF();
  
@@@ -177,9 -186,22 +176,22 @@@
                  {
                      if (key == null)
                          throw new IOError(new IOException("Unable to read row key from data
file"));
+ 
 -                    if (!key.key.equals(currentIndexKey))
++                    if (!key.getKey().equals(currentIndexKey))
+                     {
+                         throw new IOError(new IOException(String.format("Key from data file
(%s) does not match key from index file (%s)",
 -                                ByteBufferUtil.bytesToHex(key.key), ByteBufferUtil.bytesToHex(currentIndexKey))));
++                                ByteBufferUtil.bytesToHex(key.getKey()), ByteBufferUtil.bytesToHex(currentIndexKey))));
+                     }
+ 
                      if (dataSize > dataFile.length())
 -                        throw new IOError(new IOException("Impossible row size (greater
than file length): " + dataSize));
 +                        throw new IOError(new IOException("Impossible row size " + dataSize));
  
+                     if (dataStart != dataStartFromIndex)
 -                        outputHandler.warn(String.format("Data file row position %d differs
from index file row position %d", dataStart, dataSizeFromIndex));
++                        outputHandler.warn(String.format("Data file row position %d different
from index file row position %d", dataStart, dataSizeFromIndex));
+ 
+                     if (dataSize != dataSizeFromIndex)
 -                        outputHandler.warn(String.format("Data file row size %d differs
from index file row size %d", dataSize, dataSizeFromIndex));
++                        outputHandler.warn(String.format("Data file row size %d different
from index file row size %d", dataSize, dataSizeFromIndex));
+ 
                      SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable,
dataFile, key, dataSize, true);
                      if (prevKey != null && prevKey.compareTo(key) > 0)
                      {
@@@ -229,8 -254,9 +243,8 @@@
                              throwIfCommutative(key, th2);
  
                              outputHandler.warn("Retry failed too. Skipping to next row (retry's
stacktrace follows)", th2);
-                             dataFile.seek(nextRowPositionFromIndex);
 -                            writer.resetAndTruncate();
                              badRows++;
+                             seekToNextRow();
                          }
                      }
                      else
@@@ -290,6 -309,45 +304,46 @@@
          }
      }
  
+     private void updateIndexKey()
+     {
+         currentIndexKey = nextIndexKey;
+         currentRowPositionFromIndex = nextRowPositionFromIndex;
+         try
+         {
+             nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+             nextRowPositionFromIndex = indexFile.isEOF()
+                     ? dataFile.length()
 -                    : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position;
++                    : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile,
sstable.descriptor.version).position;
+         }
+         catch (Throwable th)
+         {
++            JVMStabilityInspector.inspectThrowable(th);
+             outputHandler.warn("Error reading index file", th);
+             nextIndexKey = null;
+             nextRowPositionFromIndex = dataFile.length();
+         }
+     }
+ 
+     private void seekToNextRow()
+     {
+         while(nextRowPositionFromIndex < dataFile.length())
+         {
+             try
+             {
+                 dataFile.seek(nextRowPositionFromIndex);
+                 return;
+             }
+             catch (Throwable th)
+             {
+                 throwIfFatal(th);
+                 outputHandler.warn(String.format("Failed to seek to next row position %d",
nextRowPositionFromIndex), th);
+                 badRows++;
+             }
+ 
+             updateIndexKey();
+         }
+     }
+ 
      private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator
atoms)
      {
          // TODO bitch if the row is too large?  if it is there's not much we can do ...
@@@ -380,9 -438,30 +434,30 @@@
          }
  
          @Override
 -        public boolean shouldPurge(DecoratedKey key, long delTimestamp)
 +        public long maxPurgeableTimestamp(DecoratedKey key)
          {
 -            return false;
 +            return Long.MIN_VALUE;
          }
      }
+ 
+     @VisibleForTesting
+     public ScrubResult scrubWithResult()
+     {
+         scrub();
+         return new ScrubResult(this);
+     }
+ 
+     public static final class ScrubResult
+     {
+         public final int goodRows;
+         public final int badRows;
+         public final int emptyRows;
+ 
+         public ScrubResult(Scrubber scrubber)
+         {
+             this.goodRows = scrubber.goodRows;
+             this.badRows = scrubber.badRows;
+             this.emptyRows = scrubber.emptyRows;
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db127a10/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/SchemaLoader.java
index db1758f,6b34b9a..c6a3855
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@@ -51,30 -53,17 +51,35 @@@ public class SchemaLoade
      private static Logger logger = LoggerFactory.getLogger(SchemaLoader.class);
  
      @BeforeClass
 -    public static void loadSchema() throws IOException, ConfigurationException
 +    public static void loadSchema() throws ConfigurationException
      {
 -        loadSchema(false);
++        loadSchema(null);
++    }
++
++    public static void loadSchema(Integer compressionChunkLength) throws ConfigurationException
++    {
 +        prepareServer();
 +
 +        // Migrations aren't happy if gossiper is not started.  Even if we don't use migrations
though,
 +        // some tests now expect us to start gossip for them.
 +        startGossiper();
 +
 +        // if you're messing with low-level sstable stuff, it can be useful to inject the
schema directly
 +        // Schema.instance.load(schemaDefinition());
-         for (KSMetaData ksm : schemaDefinition())
++        for (KSMetaData ksm : schemaDefinition(compressionChunkLength))
 +            MigrationManager.announceNewKeyspace(ksm);
      }
  
 -    public static void loadSchema(boolean withOldCfIds) throws IOException, ConfigurationException
 +    @After
 +    public void leakDetect() throws InterruptedException
      {
 -        loadSchema(withOldCfIds, null);
 +        System.gc();
 +        System.gc();
 +        System.gc();
 +        Thread.sleep(10);
      }
  
 -    public static void loadSchema(boolean withOldCfIds, Integer compressionChunkLength)
throws IOException, ConfigurationException
 +    public static void prepareServer()
      {
          // Cleanup first
          cleanupAndLeaveDirs();
@@@ -103,7 -98,7 +108,7 @@@
          Gossiper.instance.stop();
      }
  
-     public static Collection<KSMetaData> schemaDefinition() throws ConfigurationException
 -    public static Collection<KSMetaData> schemaDefinition(boolean withOldCfIds, Integer
compressionChunkLength) throws ConfigurationException
++    public static Collection<KSMetaData> schemaDefinition(Integer compressionChunkLength)
throws ConfigurationException
      {
          List<KSMetaData> schema = new ArrayList<KSMetaData>();
  
@@@ -355,15 -359,21 +360,15 @@@
          final Map<String, String> indexOptions = Collections.singletonMap(
                                                        SecondaryIndex.CUSTOM_INDEX_OPTION_NAME,
                                                        PerRowSecondaryIndexTest.TestIndex.class.getName());
 -        return standardCFMD(ksName, cfName)
 -                .keyValidator(AsciiType.instance)
 -                .columnMetadata(new HashMap<ByteBuffer, ColumnDefinition>()
 -                {{
 -                        ByteBuffer cName = ByteBuffer.wrap("indexed".getBytes(StandardCharsets.UTF_8));
 -                        put(cName, new ColumnDefinition(cName,
 -                                AsciiType.instance,
 -                                IndexType.CUSTOM,
 -                                indexOptions,
 -                                ByteBufferUtil.bytesToHex(cName),
 -                                null, ColumnDefinition.Type.REGULAR));
 -                }});
 +
 +        CFMetaData cfm =  CFMetaData.sparseCFMetaData(ksName, cfName, AsciiType.instance).keyValidator(AsciiType.instance);
 +
 +        ByteBuffer cName = ByteBufferUtil.bytes("indexed");
 +        return cfm.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(cfm, cName,
AsciiType.instance, null)
 +                                                                .setIndex("indexe1", IndexType.CUSTOM,
indexOptions));
      }
  
-     private static void useCompression(List<KSMetaData> schema)
+     private static void useCompression(List<KSMetaData> schema, Integer chunkLength)
throws ConfigurationException
      {
          for (KSMetaData ksm : schema)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db127a10/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index 383b1d0,08237a4..a19c76d
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -31,9 -30,12 +30,12 @@@ import java.util.Set
  import java.util.concurrent.ExecutionException;
  
  import org.apache.cassandra.cql3.QueryProcessor;
 -import org.apache.cassandra.db.compaction.OperationType;
++import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.exceptions.RequestExecutionException;
+ import org.apache.cassandra.io.compress.CompressionMetadata;
  import org.apache.cassandra.utils.UUIDGen;
  import org.apache.commons.lang3.StringUtils;
+ import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  
@@@ -43,18 -45,17 +45,21 @@@ import org.apache.cassandra.SchemaLoade
  import org.apache.cassandra.Util;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.db.compaction.Scrubber;
 -import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.db.compaction.Scrubber;
 +import org.apache.cassandra.exceptions.WriteTimeoutException;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableReader;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
 -import static junit.framework.Assert.assertNotNull;
 +import static org.apache.cassandra.Util.cellname;
  import static org.apache.cassandra.Util.column;
++
++import static junit.framework.Assert.assertNotNull;
  import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
  import static org.junit.Assert.fail;
  import static org.junit.Assume.assumeTrue;
  
@@@ -65,9 -66,16 +70,16 @@@ public class ScrubTest extends SchemaLo
      public String CF = "Standard1";
      public String CF3 = "Standard2";
      public String COUNTER_CF = "Counter1";
+     private static Integer COMPRESSION_CHUNK_LENGTH = 4096;
+ 
+     @BeforeClass
 -    public static void loadSchema() throws IOException, ConfigurationException
++    public static void loadSchema() throws ConfigurationException
+     {
 -        loadSchema(false, COMPRESSION_CHUNK_LENGTH);
++        loadSchema(COMPRESSION_CHUNK_LENGTH);
+     }
  
      @Test
 -    public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException,
ConfigurationException
 +    public void testScrubOneRow() throws ExecutionException, InterruptedException
      {
          CompactionManager.instance.disableAutoCompaction();
          Keyspace keyspace = Keyspace.open(KEYSPACE);
@@@ -89,9 -97,71 +101,69 @@@
      }
  
      @Test
 -    public void testScrubCorruptedCounterRow() throws IOException, InterruptedException,
ExecutionException
 +    public void testScrubCorruptedCounterRow() throws IOException, WriteTimeoutException
      {
-         // skip the test when compression is enabled until CASSANDRA-9140 is complete
+         // When compression is enabled, for testing corrupted chunks we need enough partitions
to cover
+         // at least 3 chunks of size COMPRESSION_CHUNK_LENGTH
+         int numPartitions = 1000;
+ 
+         CompactionManager.instance.disableAutoCompaction();
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF);
+         cfs.clearUnsafe();
+ 
+         fillCounterCF(cfs, numPartitions);
+ 
+         List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(),
numPartitions*10);
+         assertEquals(numPartitions, rows.size());
+         assertEquals(1, cfs.getSSTables().size());
+ 
+         SSTableReader sstable = cfs.getSSTables().iterator().next();
+ 
+         //make sure to override at most 1 chunk when compression is enabled
+         overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
+ 
+         // with skipCorrupted == false, the scrub is expected to fail
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false);
++        Scrubber scrubber = new Scrubber(cfs, sstable, false, false);
+         try
+         {
+             scrubber.scrub();
+             fail("Expected a CorruptSSTableException to be thrown");
+         }
+         catch (IOError err) {}
+ 
+         // with skipCorrupted == true, the corrupt row will be skipped
+         Scrubber.ScrubResult scrubResult;
 -        scrubber = new Scrubber(cfs, sstable, true);
++        scrubber = new Scrubber(cfs, sstable, true, false);
+         scrubResult = scrubber.scrubWithResult();
+         scrubber.close();
 -        cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()),
OperationType.SCRUB);
+ 
+         assertNotNull(scrubResult);
+ 
+         boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression",
"false"));
+         if (compression)
+         {
+             assertEquals(0, scrubResult.emptyRows);
+             assertEquals(numPartitions, scrubResult.badRows + scrubResult.goodRows);
+             //because we only corrupted 1 chunk and we chose enough partitions to cover
at least 3 chunks
+             assertTrue(scrubResult.goodRows >= scrubResult.badRows * 2);
+         }
+         else
+         {
+             assertEquals(0, scrubResult.emptyRows);
+             assertEquals(1, scrubResult.badRows);
+             assertEquals(numPartitions-1, scrubResult.goodRows);
+         }
+         assertEquals(1, cfs.getSSTables().size());
+ 
+         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
+         assertEquals(scrubResult.goodRows, rows.size());
+     }
+ 
 -
+     @Test
 -    public void testScrubCorruptedRowInSmallFile() throws IOException, InterruptedException,
ExecutionException
++    public void testScrubCorruptedRowInSmallFile() throws IOException, WriteTimeoutException
+     {
+         // cannot test this with compression
          assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression",
"false")));
  
          CompactionManager.instance.disableAutoCompaction();
@@@ -105,20 -175,10 +177,12 @@@
          assertEquals(2, rows.size());
  
          SSTableReader sstable = cfs.getSSTables().iterator().next();
 +
 +        // overwrite one row with garbage
-         long row0Start = sstable.getPosition(RowPosition.ForKey.get(ByteBufferUtil.bytes("0"),
sstable.partitioner), SSTableReader.Operator.EQ).position;
-         long row1Start = sstable.getPosition(RowPosition.ForKey.get(ByteBufferUtil.bytes("1"),
sstable.partitioner), SSTableReader.Operator.EQ).position;
-         long startPosition = row0Start < row1Start ? row0Start : row1Start;
-         long endPosition = row0Start < row1Start ? row1Start : row0Start;
- 
-         RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw");
-         file.seek(startPosition);
-         file.writeBytes(StringUtils.repeat('z', (int) (endPosition - startPosition)));
-         file.close();
+         overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
  
          // with skipCorrupted == false, the scrub is expected to fail
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false);
 +        Scrubber scrubber = new Scrubber(cfs, sstable, false, false);
          try
          {
              scrubber.scrub();
@@@ -138,7 -199,35 +202,35 @@@
      }
  
      @Test
+     public void testScrubOneRowWithCorruptedKey() throws IOException, ExecutionException,
InterruptedException, ConfigurationException
+     {
+         // cannot test this with compression
+         assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression",
"false")));
+ 
+         CompactionManager.instance.disableAutoCompaction();
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.clearUnsafe();
+ 
+         List<Row> rows;
+ 
+         // insert data and verify we get it back w/ range query
+         fillCF(cfs, 4);
+         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
+         assertEquals(4, rows.size());
+ 
+         SSTableReader sstable = cfs.getSSTables().iterator().next();
+         overrideWithGarbage(sstable, 0, 2);
+ 
+         CompactionManager.instance.performScrub(cfs, false);
+ 
+         // check data is still there
+         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
+         assertEquals(4, rows.size());
+     }
+ 
+     @Test
 -    public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException,
ConfigurationException
 +    public void testScrubDeletedRow() throws ExecutionException, InterruptedException
      {
          CompactionManager.instance.disableAutoCompaction();
          Keyspace keyspace = Keyspace.open(KEYSPACE);
@@@ -241,6 -330,42 +333,42 @@@
          assert rows.size() == 6 : "Got " + rows.size();
      }
  
+     private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer
key2) throws IOException
+     {
+         boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression",
"false"));
+         long startPosition, endPosition;
+ 
+         if (compression)
+         { // overwrite with garbage the compression chunks from key1 to key2
+             CompressionMetadata compData = CompressionMetadata.create(sstable.getFilename());
+ 
+             CompressionMetadata.Chunk chunk1 = compData.chunkFor(
 -                    sstable.getPosition(RowPosition.forKey(key1, sstable.partitioner), SSTableReader.Operator.EQ).position);
++                    sstable.getPosition(RowPosition.ForKey.get(key1, sstable.partitioner),
SSTableReader.Operator.EQ).position);
+             CompressionMetadata.Chunk chunk2 = compData.chunkFor(
 -                    sstable.getPosition(RowPosition.forKey(key2, sstable.partitioner), SSTableReader.Operator.EQ).position);
++                    sstable.getPosition(RowPosition.ForKey.get(key2, sstable.partitioner),
SSTableReader.Operator.EQ).position);
+ 
+             startPosition = Math.min(chunk1.offset, chunk2.offset);
+             endPosition = Math.max(chunk1.offset + chunk1.length, chunk2.offset + chunk2.length);
+         }
+         else
+         { // overwrite with garbage from key1 to key2
 -            long row0Start = sstable.getPosition(RowPosition.forKey(key1, sstable.partitioner),
SSTableReader.Operator.EQ).position;
 -            long row1Start = sstable.getPosition(RowPosition.forKey(key2, sstable.partitioner),
SSTableReader.Operator.EQ).position;
++            long row0Start = sstable.getPosition(RowPosition.ForKey.get(key1, sstable.partitioner),
SSTableReader.Operator.EQ).position;
++            long row1Start = sstable.getPosition(RowPosition.ForKey.get(key2, sstable.partitioner),
SSTableReader.Operator.EQ).position;
+             startPosition = Math.min(row0Start, row1Start);
+             endPosition = Math.max(row0Start, row1Start);
+         }
+ 
+         overrideWithGarbage(sstable, startPosition, endPosition);
+     }
+ 
 -    private void overrideWithGarbage(SSTable sstable, long startPosition, long endPosition)
throws IOException
++    private void overrideWithGarbage(SSTableReader sstable, long startPosition, long endPosition)
throws IOException
+     {
+         RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw");
+         file.seek(startPosition);
+         file.writeBytes(StringUtils.repeat('z', (int) (endPosition - startPosition)));
+         file.close();
+     }
+ 
      private static boolean isRowOrdered(List<Row> rows)
      {
          DecoratedKey prev = null;


Mime
View raw message