cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Date Thu, 30 Apr 2015 18:16:39 GMT
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/49b08989
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/49b08989
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/49b08989

Branch: refs/heads/trunk
Commit: 49b089893b5407ba42dad389804ff21d535a8537
Parents: 3bfe4b6 db127a1
Author: Tyler Hobbs <tylerlhobbs@gmail.com>
Authored: Thu Apr 30 13:15:57 2015 -0500
Committer: Tyler Hobbs <tylerlhobbs@gmail.com>
Committed: Thu Apr 30 13:15:57 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/compaction/Scrubber.java       | 128 ++++++++++++---
 .../compress/CompressedRandomAccessReader.java  |   2 +
 .../unit/org/apache/cassandra/SchemaLoader.java |   7 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java | 157 ++++++++++++++++---
 5 files changed, 242 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 675b604,1f5c7de..1e50da6
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -121,10 -122,10 +130,10 @@@ public class Scrubber implements Closea
          SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge,
isOffline);
          try
          {
-             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+             nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
              {
                  // throw away variable so we don't have a side effect in the assert
 -                long firstRowPositionFromIndex = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile,
sstable.descriptor.version).position;
 +                long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile,
sstable.descriptor.version).position;
                  assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
              }
  
@@@ -150,30 -153,19 +160,19 @@@
                      // check for null key below
                  }
  
-                 ByteBuffer currentIndexKey = nextIndexKey;
-                 long nextRowPositionFromIndex;
-                 try
-                 {
-                     nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
-                     nextRowPositionFromIndex = indexFile.isEOF()
-                                              ? dataFile.length()
-                                              : rowIndexEntrySerializer.deserialize(indexFile,
sstable.descriptor.version).position;
-                 }
-                 catch (Throwable th)
-                 {
-                     JVMStabilityInspector.inspectThrowable(th);
-                     outputHandler.warn("Error reading index file", th);
-                     nextIndexKey = null;
-                     nextRowPositionFromIndex = dataFile.length();
-                 }
+                 updateIndexKey();
  
                  long dataStart = dataFile.getFilePointer();
-                 long dataStartFromIndex = currentIndexKey == null
-                                         ? -1
-                                         : rowStart + 2 + currentIndexKey.remaining();
-                 long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+ 
+                 long dataStartFromIndex = -1;
+                 long dataSizeFromIndex = -1;
+                 if (currentIndexKey != null)
+                 {
+                     dataStartFromIndex = currentRowPositionFromIndex + 2 + currentIndexKey.remaining();
+                     dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+                 }
  
 -                dataSize = dataSizeFromIndex;
 +                long dataSize = dataSizeFromIndex;
                  // avoid an NPE if key is null
                  String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey());
                  outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize));
@@@ -187,7 -186,13 +193,14 @@@
                      if (dataSize > dataFile.length())
                          throw new IOError(new IOException("Impossible row size " + dataSize));
  
+                     if (dataStart != dataStartFromIndex)
+                         outputHandler.warn(String.format("Data file row position %d different
from index file row position %d", dataStart, dataSizeFromIndex));
+ 
+                     if (dataSize != dataSizeFromIndex)
+                         outputHandler.warn(String.format("Data file row size %d different
from index file row size %d", dataSize, dataSizeFromIndex));
+ 
 -                    SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable,
dataFile, key, dataSize, true);
 +                    SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable,
dataFile, key, checkData);
++
                      if (prevKey != null && prevKey.compareTo(key) > 0)
                      {
                          saveOutOfOrderRow(prevKey, key, atoms);
@@@ -216,7 -220,9 +228,9 @@@
                          key = sstable.partitioner.decorateKey(currentIndexKey);
                          try
                          {
+                             dataFile.seek(dataStartFromIndex);
+ 
 -                            SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable,
dataFile, key, dataSize, true);
 +                            SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable,
dataFile, key, checkData);
                              if (prevKey != null && prevKey.compareTo(key) > 0)
                              {
                                  saveOutOfOrderRow(prevKey, key, atoms);
@@@ -233,21 -240,21 +248,21 @@@
                          catch (Throwable th2)
                          {
                              throwIfFatal(th2);
 -                            throwIfCommutative(key, th2);
 +                            throwIfCannotContinue(key, th2);
  
                              outputHandler.warn("Retry failed too. Skipping to next row (retry's
stacktrace follows)", th2);
-                             dataFile.seek(nextRowPositionFromIndex);
                              badRows++;
+                             seekToNextRow();
                          }
                      }
                      else
                      {
 -                        throwIfCommutative(key, th);
 +                        throwIfCannotContinue(key, th);
  
                          outputHandler.warn("Row starting at position " + dataStart + " is
unreadable; skipping to next");
-                         if (currentIndexKey != null)
-                             dataFile.seek(nextRowPositionFromIndex);
                          badRows++;
+                         if (currentIndexKey != null)
+                             seekToNextRow();
                      }
                  }
              }
@@@ -297,6 -304,46 +312,46 @@@
          }
      }
  
+     private void updateIndexKey()
+     {
+         currentIndexKey = nextIndexKey;
+         currentRowPositionFromIndex = nextRowPositionFromIndex;
+         try
+         {
+             nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+             nextRowPositionFromIndex = indexFile.isEOF()
+                     ? dataFile.length()
 -                    : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile,
sstable.descriptor.version).position;
++                    : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position;
+         }
+         catch (Throwable th)
+         {
+             JVMStabilityInspector.inspectThrowable(th);
+             outputHandler.warn("Error reading index file", th);
+             nextIndexKey = null;
+             nextRowPositionFromIndex = dataFile.length();
+         }
+     }
+ 
+     private void seekToNextRow()
+     {
+         while(nextRowPositionFromIndex < dataFile.length())
+         {
+             try
+             {
+                 dataFile.seek(nextRowPositionFromIndex);
+                 return;
+             }
+             catch (Throwable th)
+             {
+                 throwIfFatal(th);
+                 outputHandler.warn(String.format("Failed to seek to next row position %d",
nextRowPositionFromIndex), th);
+                 badRows++;
+             }
+ 
+             updateIndexKey();
+         }
+     }
+ 
      private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator
atoms)
      {
          // TODO bitch if the row is too large?  if it is there's not much we can do ...

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 1b3cd06,184db9c..edf8c68
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@@ -70,226 -60,87 +70,228 @@@ public class CompressedRandomAccessRead
      private ByteBuffer compressed;
  
      // re-use single crc object
 -    private final Checksum checksum;
 +    private final Adler32 checksum;
  
      // raw checksum bytes
 -    private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
 +    private ByteBuffer checksumBytes;
  
 -    protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata,
PoolingSegmentedFile owner) throws FileNotFoundException
 +    protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata,
PoolingSegmentedFile owner) throws FileNotFoundException
      {
 -        super(new File(dataFilePath), metadata.chunkLength(), metadata.compressedFileLength,
owner);
 +        super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(),
owner);
          this.metadata = metadata;
 -        checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
 -        compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
 -    }
 +        checksum = new Adler32();
  
 -    @Override
 -    protected void reBuffer()
 -    {
 -        try
 +        if (!useMmap)
          {
 -            decompressChunk(metadata.chunkFor(current));
 +            compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
 +            checksumBytes = ByteBuffer.wrap(new byte[4]);
          }
 -        catch (CorruptBlockException e)
 +        else
          {
 -            throw new CorruptSSTableException(e, getPath());
 +            try
 +            {
 +                createMappedSegments();
 +            }
 +            catch (IOException e)
 +            {
 +                throw new IOError(e);
 +            }
          }
 -        catch (IOException e)
 +    }
 +
 +    private void createMappedSegments() throws IOException
 +    {
 +        chunkSegments = new TreeMap<>();
 +        long offset = 0;
 +        long lastSegmentOffset = 0;
 +        long segmentSize = 0;
 +
 +        while (offset < metadata.dataLength)
          {
 -            throw new FSReadError(e, getPath());
 +            CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
 +
 +            //Reached a new mmap boundary
 +            if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
 +            {
 +                chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY,
lastSegmentOffset, segmentSize));
 +                lastSegmentOffset += segmentSize;
 +                segmentSize = 0;
 +            }
 +
 +            segmentSize += chunk.length + 4; //checksum
 +            offset += metadata.chunkLength();
          }
 +
 +        if (segmentSize > 0)
 +            chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY,
lastSegmentOffset, segmentSize));
      }
  
 -    private void decompressChunk(CompressionMetadata.Chunk chunk) throws IOException
 +    protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
      {
 -        if (channel.position() != chunk.offset)
 -            channel.position(chunk.offset);
 +        assert Integer.bitCount(bufferSize) == 1;
 +        return useMmap && useDirect
 +                ? ByteBuffer.allocateDirect(bufferSize)
 +                : ByteBuffer.allocate(bufferSize);
 +    }
  
 -        if (compressed.capacity() < chunk.length)
 -            compressed = ByteBuffer.wrap(new byte[chunk.length]);
 -        else
 -            compressed.clear();
 -        compressed.limit(chunk.length);
 +    @Override
 +    public void deallocate()
 +    {
 +        super.deallocate();
  
 -        if (channel.read(compressed) != chunk.length)
 -            throw new CorruptBlockException(getPath(), chunk);
 +        if (chunkSegments != null)
 +        {
 +            for (Map.Entry<Long, MappedByteBuffer> entry : chunkSegments.entrySet())
 +            {
 +                FileUtils.clean(entry.getValue());
 +            }
 +        }
  
 -        // technically flip() is unnecessary since all the remaining work uses the raw array,
but if that changes
 -        // in the future this will save a lot of hair-pulling
 -        compressed.flip();
 +        chunkSegments = null;
 +    }
 +
 +    private void reBufferStandard()
 +    {
          try
          {
 -            validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length,
buffer, 0);
 +            long position = current();
 +            assert position < metadata.dataLength;
 +
 +            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +
 +            if (compressed.capacity() < chunk.length)
 +                compressed = ByteBuffer.wrap(new byte[chunk.length]);
 +            else
 +                compressed.clear();
 +            compressed.limit(chunk.length);
 +
 +            if (channel.read(compressed, chunk.offset) != chunk.length)
 +                throw new CorruptBlockException(getPath(), chunk);
 +
 +            // technically flip() is unnecessary since all the remaining work uses the raw
array, but if that changes
 +            // in the future this will save a lot of hair-pulling
 +            compressed.flip();
 +            buffer.clear();
 +            int decompressedBytes;
 +            try
 +            {
 +                decompressedBytes = metadata.compressor().uncompress(compressed.array(),
0, chunk.length, buffer.array(), 0);
 +                buffer.limit(decompressedBytes);
 +            }
 +            catch (IOException e)
 +            {
++                buffer.limit(0);
 +                throw new CorruptBlockException(getPath(), chunk);
 +            }
 +
 +            if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
 +            {
 +
 +                checksum.update(compressed.array(), 0, chunk.length);
 +
 +                if (checksum(chunk) != (int) checksum.getValue())
 +                    throw new CorruptBlockException(getPath(), chunk);
 +
 +                // reset checksum object back to the original (blank) state
 +                checksum.reset();
 +            }
 +
 +            // buffer offset is always aligned
 +            bufferOffset = position & ~(buffer.capacity() - 1);
 +            buffer.position((int) (position - bufferOffset));
 +            // the length() can be provided at construction time, to override the true (uncompressed)
length of the file;
 +            // this is permitted to occur within a compressed segment, so we truncate validBufferBytes
if we cross the imposed length
 +            if (bufferOffset + buffer.limit() > length())
 +                buffer.limit((int)(length() - bufferOffset));
 +        }
 +        catch (CorruptBlockException e)
 +        {
 +            throw new CorruptSSTableException(e, getPath());
          }
          catch (IOException e)
          {
 -            throw new CorruptBlockException(getPath(), chunk, e);
 +            throw new FSReadError(e, getPath());
          }
 +    }
  
 -        if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
 +    private void reBufferMmap()
 +    {
 +        try
          {
 +            long position = current();
 +            assert position < metadata.dataLength;
  
 -            if (metadata.hasPostCompressionAdlerChecksums)
 +            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +
 +            Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
 +            long segmentOffset = entry.getKey();
 +            int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
 +            MappedByteBuffer compressedChunk = entry.getValue();
 +
 +            compressedChunk.position(chunkOffset);
 +            compressedChunk.limit(chunkOffset + chunk.length);
 +            compressedChunk.mark();
 +
 +            buffer.clear();
 +            int decompressedBytes;
 +            try
              {
 -                checksum.update(compressed.array(), 0, chunk.length);
 +                decompressedBytes = metadata.compressor().uncompress(compressedChunk, buffer);
 +                buffer.limit(decompressedBytes);
              }
 -            else
 +            catch (IOException e)
 +            {
++                buffer.limit(0);
 +                throw new CorruptBlockException(getPath(), chunk);
 +            }
 +            finally
              {
 -                checksum.update(buffer, 0, validBufferBytes);
 +                compressedChunk.limit(compressedChunk.capacity());
              }
  
 -            if (checksum(chunk) != (int) checksum.getValue())
 -                throw new CorruptBlockException(getPath(), chunk);
 +            if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
 +            {
 +                compressedChunk.reset();
 +                compressedChunk.limit(chunkOffset + chunk.length);
 +
 +                FBUtilities.directCheckSum(checksum, compressedChunk);
 +
 +                compressedChunk.limit(compressedChunk.capacity());
  
 -            // reset checksum object back to the original (blank) state
 -            checksum.reset();
 +
 +                if (compressedChunk.getInt() != (int) checksum.getValue())
 +                    throw new CorruptBlockException(getPath(), chunk);
 +
 +                // reset checksum object back to the original (blank) state
 +                checksum.reset();
 +            }
 +
 +            // buffer offset is always aligned
 +            bufferOffset = position & ~(buffer.capacity() - 1);
 +            buffer.position((int) (position - bufferOffset));
 +            // the length() can be provided at construction time, to override the true (uncompressed)
length of the file;
 +            // this is permitted to occur within a compressed segment, so we truncate validBufferBytes
if we cross the imposed length
 +            if (bufferOffset + buffer.limit() > length())
 +                buffer.limit((int)(length() - bufferOffset));
 +        }
 +        catch (CorruptBlockException e)
 +        {
 +            throw new CorruptSSTableException(e, getPath());
          }
  
 -        // buffer offset is always aligned
 -        bufferOffset = current & ~(buffer.length - 1);
 -        // the length() can be provided at construction time, to override the true (uncompressed)
length of the file;
 -        // this is permitted to occur within a compressed segment, so we truncate validBufferBytes
if we cross the imposed length
 -        if (bufferOffset + validBufferBytes > length())
 -            validBufferBytes = (int)(length() - bufferOffset);
 +    }
 +
 +    @Override
 +    protected void reBuffer()
 +    {
 +        if (useMmap)
 +        {
 +            reBufferMmap();
 +        }
 +        else
 +        {
 +            reBufferStandard();
 +        }
      }
  
      private int checksum(CompressionMetadata.Chunk chunk) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/SchemaLoader.java
index 8f9df8f,c6a3855..46f4a9a
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@@ -435,26 -415,16 +435,31 @@@ public class SchemaLoade
      
      private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp)
      {
 -        return CFMetaData.denseCFMetaData(ksName, cfName, comp).defaultValidator(comp);
 +        return CFMetaData.denseCFMetaData(ksName, cfName, comp).defaultValidator(comp).compressionParameters(getCompressionParameters());
      }
  
 -    private static CFMetaData jdbcSparseCFMD(String ksName, String cfName, AbstractType
comp)
 +    public static CFMetaData jdbcSparseCFMD(String ksName, String cfName, AbstractType comp)
      {
 -        return CFMetaData.sparseCFMetaData(ksName, cfName, comp).defaultValidator(comp);
 +        return CFMetaData.sparseCFMetaData(ksName, cfName, comp).defaultValidator(comp).compressionParameters(getCompressionParameters());
      }
  
 -    public static void cleanupAndLeaveDirs()
 +    public static CompressionParameters getCompressionParameters()
      {
++        return getCompressionParameters(null);
++    }
++
++    public static CompressionParameters getCompressionParameters(Integer chunkSize)
++    {
 +        if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")))
-             return new CompressionParameters(SnappyCompressor.instance);
++            return new CompressionParameters(SnappyCompressor.instance, chunkSize, Collections.<String,
String>emptyMap());
 +        else
 +            return new CompressionParameters(null);
 +    }
 +
 +    public static void cleanupAndLeaveDirs() throws IOException
 +    {
 +        // We need to stop and unmap all CLS instances prior to cleanup() or we'll get failures
on Windows.
 +        CommitLog.instance.stopUnsafe(true);
          mkdirs();
          cleanup();
          mkdirs();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index dce8d14,a19c76d..128d1b0
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -21,18 -21,19 +21,13 @@@ package org.apache.cassandra.db
   */
  
  import java.io.*;
 +import java.lang.reflect.Field;
 +import java.lang.reflect.Modifier;
  import java.nio.ByteBuffer;
- import java.util.Arrays;
 -import java.util.Collections;
--import java.util.HashSet;
--import java.util.Iterator;
--import java.util.List;
--import java.util.Set;
++import java.util.*;
  import java.util.concurrent.ExecutionException;
  
- import org.apache.cassandra.io.compress.CompressionParameters;
- import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.cql3.QueryProcessor;
 -import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.exceptions.RequestExecutionException;
+ import org.apache.cassandra.io.compress.CompressionMetadata;
 -import org.apache.cassandra.utils.UUIDGen;
  import org.apache.commons.lang3.StringUtils;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -54,57 -51,31 +49,57 @@@ import org.apache.cassandra.exceptions.
  import org.apache.cassandra.exceptions.WriteTimeoutException;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.io.sstable.SSTableRewriter;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.OrderedJUnit4ClassRunner;
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.Util;
  import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +import static org.junit.Assert.*;
++import static org.junit.Assume.assumeTrue;
  
  import static org.apache.cassandra.Util.cellname;
  import static org.apache.cassandra.Util.column;
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.fail;
- import static org.junit.Assume.assumeTrue;
  
 -import static junit.framework.Assert.assertNotNull;
 -import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertTrue;
 -import static org.junit.Assert.fail;
 -import static org.junit.Assume.assumeTrue;
 -
  @RunWith(OrderedJUnit4ClassRunner.class)
 -public class ScrubTest extends SchemaLoader
 +public class ScrubTest
  {
 -    public String KEYSPACE = "Keyspace1";
 -    public String CF = "Standard1";
 -    public String CF3 = "Standard2";
 -    public String COUNTER_CF = "Counter1";
 -    private static Integer COMPRESSION_CHUNK_LENGTH = 4096;
 +    public static final String KEYSPACE = "Keyspace1";
 +    public static final String CF = "Standard1";
 +    public static final String CF2 = "Standard2";
 +    public static final String CF3 = "Standard3";
 +    public static final String COUNTER_CF = "Counter1";
 +    public static final String CF_UUID = "UUIDKeys";
 +    public static final String CF_INDEX1 = "Indexed1";
 +    public static final String CF_INDEX2 = "Indexed2";
 +
 +    public static final String COL_KEYS_INDEX = "birthdate";
 +    public static final String COL_COMPOSITES_INDEX = "col1";
 +    public static final String COL_NON_INDEX = "notanindexcol";
 +
++    public static final Integer COMPRESSION_CHUNK_LENGTH = 4096;
+ 
      @BeforeClass
 -    public static void loadSchema() throws ConfigurationException
 +    public static void defineSchema() throws ConfigurationException
      {
 -        loadSchema(COMPRESSION_CHUNK_LENGTH);
 +        SchemaLoader.loadSchema();
 +        SchemaLoader.createKeyspace(KEYSPACE,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF2),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF3),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, COUNTER_CF)
 +                                                .defaultValidator(CounterColumnType.instance)
-                                                 .compressionParameters(SchemaLoader.getCompressionParameters()),
++                                                .compressionParameters(SchemaLoader.getCompressionParameters(COMPRESSION_CHUNK_LENGTH)),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF_UUID).keyValidator(UUIDType.instance),
 +                                    SchemaLoader.indexCFMD(KEYSPACE, CF_INDEX1, true),
 +                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2,
true));
      }
  
      @Test
@@@ -140,28 -112,88 +136,89 @@@
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF);
          cfs.clearUnsafe();
  
-         fillCounterCF(cfs, 2);
+         fillCounterCF(cfs, numPartitions);
  
-         List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(),
1000);
-         assertEquals(2, rows.size());
+         List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(),
numPartitions*10);
+         assertEquals(numPartitions, rows.size());
 +
-         overrdeWithGarbage(cfs, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
+         assertEquals(1, cfs.getSSTables().size());
  
          SSTableReader sstable = cfs.getSSTables().iterator().next();
  
+         //make sure to override at most 1 chunk when compression is enabled
+         overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
+ 
          // with skipCorrupted == false, the scrub is expected to fail
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false, false);
 -        try
 +        try(Scrubber scrubber = new Scrubber(cfs, sstable, false, false))
          {
              scrubber.scrub();
              fail("Expected a CorruptSSTableException to be thrown");
          }
          catch (IOError err) {}
  
--        // with skipCorrupted == true, the corrupt row will be skipped
++        // with skipCorrupted == true, the corrupt rows will be skipped
+         Scrubber.ScrubResult scrubResult;
 -        scrubber = new Scrubber(cfs, sstable, true, false);
 -        scrubResult = scrubber.scrubWithResult();
 -        scrubber.close();
 +        try(Scrubber scrubber = new Scrubber(cfs, sstable, true, false))
 +        {
++            scrubResult = scrubber.scrubWithResult();
++        }
+ 
+         assertNotNull(scrubResult);
+ 
+         boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression",
"false"));
+         if (compression)
+         {
+             assertEquals(0, scrubResult.emptyRows);
+             assertEquals(numPartitions, scrubResult.badRows + scrubResult.goodRows);
+             //because we only corrupted 1 chunk and we chose enough partitions to cover
at least 3 chunks
+             assertTrue(scrubResult.goodRows >= scrubResult.badRows * 2);
+         }
+         else
+         {
+             assertEquals(0, scrubResult.emptyRows);
+             assertEquals(1, scrubResult.badRows);
+             assertEquals(numPartitions-1, scrubResult.goodRows);
+         }
+         assertEquals(1, cfs.getSSTables().size());
+ 
+         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
+         assertEquals(scrubResult.goodRows, rows.size());
+     }
+ 
+     @Test
+     public void testScrubCorruptedRowInSmallFile() throws IOException, WriteTimeoutException
+     {
+         // cannot test this with compression
+         assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression",
"false")));
+ 
+         CompactionManager.instance.disableAutoCompaction();
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF);
+         cfs.clearUnsafe();
+ 
+         fillCounterCF(cfs, 2);
+ 
+         List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(),
1000);
+         assertEquals(2, rows.size());
+ 
+         SSTableReader sstable = cfs.getSSTables().iterator().next();
+ 
+         // overwrite one row with garbage
+         overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
+ 
+         // with skipCorrupted == false, the scrub is expected to fail
+         Scrubber scrubber = new Scrubber(cfs, sstable, false, false);
+         try
+         {
              scrubber.scrub();
+             fail("Expected a CorruptSSTableException to be thrown");
          }
+         catch (IOError err) {}
+ 
+         // with skipCorrupted == true, the corrupt row will be skipped
+         scrubber = new Scrubber(cfs, sstable, true, false);
+         scrubber.scrub();
+         scrubber.close();
          assertEquals(1, cfs.getSSTables().size());
  
          // verify that we can read all of the rows, and there is now one less row
@@@ -170,21 -202,34 +227,49 @@@
      }
  
      @Test
+     public void testScrubOneRowWithCorruptedKey() throws IOException, ExecutionException,
InterruptedException, ConfigurationException
+     {
+         // cannot test this with compression
+         assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression",
"false")));
+ 
+         CompactionManager.instance.disableAutoCompaction();
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.clearUnsafe();
+ 
+         List<Row> rows;
+ 
+         // insert data and verify we get it back w/ range query
+         fillCF(cfs, 4);
+         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
+         assertEquals(4, rows.size());
+ 
+         SSTableReader sstable = cfs.getSSTables().iterator().next();
+         overrideWithGarbage(sstable, 0, 2);
+ 
+         CompactionManager.instance.performScrub(cfs, false);
+ 
+         // check data is still there
+         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
+         assertEquals(4, rows.size());
+     }
+ 
+     @Test
 +    public void testScrubCorruptedCounterRowNoEarlyOpen() throws IOException, WriteTimeoutException
 +    {
 +        long oldOpenVal = SSTableRewriter.getOpenInterval();
 +        try
 +        {
 +            SSTableRewriter.overrideOpenInterval(Long.MAX_VALUE);
 +            testScrubCorruptedCounterRow();
 +        }
 +        finally
 +        {
 +            SSTableRewriter.overrideOpenInterval(oldOpenVal);
 +        }
 +    }
 +
 +    @Test
      public void testScrubDeletedRow() throws ExecutionException, InterruptedException
      {
          CompactionManager.instance.disableAutoCompaction();
@@@ -432,129 -471,4 +537,129 @@@
          assertEquals("bar", iter.next().getString("c"));
          assertEquals("boo", iter.next().getString("c"));
      }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubKeysIndex_preserveOrder() throws IOException, ExecutionException,
InterruptedException
 +    {
 +        //If the partitioner preserves the order then SecondaryIndex uses BytesType comparator,
 +        // otherwise it uses LocalByPartitionerType
 +        setKeyComparator(BytesType.instance);
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubCompositeIndex_preserveOrder() throws IOException, ExecutionException,
InterruptedException
 +    {
 +        setKeyComparator(BytesType.instance);
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubKeysIndex() throws IOException, ExecutionException, InterruptedException
 +    {
 +        setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner()));
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException
 +    {
 +        setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner()));
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testFailScrubKeysIndex() throws IOException, ExecutionException, InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, false);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testFailScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, false);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubTwice() throws IOException, ExecutionException, InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true, true);
 +    }
 +
 +    /** The SecondaryIndex class is used for custom indexes so to avoid
 +     * making a public final field into a private field with getters
 +     * and setters, we resort to this hack in order to test it properly
 +     * since it can have two values which influence the scrubbing behavior.
 +     * @param comparator - the key comparator we want to test
 +     */
 +    private void setKeyComparator(AbstractType<?> comparator)
 +    {
 +        try
 +        {
 +            Field keyComparator = SecondaryIndex.class.getDeclaredField("keyComparator");
 +            keyComparator.setAccessible(true);
 +            int modifiers = keyComparator.getModifiers();
 +            Field modifierField = keyComparator.getClass().getDeclaredField("modifiers");
 +            modifiers = modifiers & ~Modifier.FINAL;
 +            modifierField.setAccessible(true);
 +            modifierField.setInt(keyComparator, modifiers);
 +
 +            keyComparator.set(null, comparator);
 +        }
 +        catch (Exception ex)
 +        {
 +            fail("Failed to change key comparator in secondary index : " + ex.getMessage());
 +            ex.printStackTrace();
 +        }
 +    }
 +
 +    private void testScrubIndex(String cfName, String colName, boolean composite, boolean
... scrubs)
 +            throws IOException, ExecutionException, InterruptedException
 +    {
 +        CompactionManager.instance.disableAutoCompaction();
 +        Keyspace keyspace = Keyspace.open(KEYSPACE);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
 +        cfs.clearUnsafe();
 +
 +        int numRows = 1000;
 +        long[] colValues = new long [numRows * 2]; // each row has two columns
 +        for (int i = 0; i < colValues.length; i+=2)
 +        {
 +            colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column
 +            colValues[i+1] = 3L; //other column
 +        }
 +        fillIndexCF(cfs, composite, colValues);
 +
 +        // check index
 +        IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes(colName), Operator.EQ,
ByteBufferUtil.bytes(1L));
 +        List<Row> rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(),
numRows);
 +        assertNotNull(rows);
 +        assertEquals(numRows / 2, rows.size());
 +
 +        // scrub index
 +        Set<ColumnFamilyStore> indexCfss = cfs.indexManager.getIndexesBackedByCfs();
 +        assertTrue(indexCfss.size() == 1);
 +        for(ColumnFamilyStore indexCfs : indexCfss)
 +        {
 +            for (int i = 0; i < scrubs.length; i++)
 +            {
 +                boolean failure = !scrubs[i];
 +                if (failure)
 +                { //make sure the next scrub fails
-                     overrdeWithGarbage(indexCfs, ByteBufferUtil.bytes(1L), ByteBufferUtil.bytes(2L));
++                    overrideWithGarbage(indexCfs.getSSTables().iterator().next(), ByteBufferUtil.bytes(1L),
ByteBufferUtil.bytes(2L));
 +                }
 +                CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false,
true);
 +                assertEquals(failure ?
 +                             CompactionManager.AllSSTableOpStatus.ABORTED :
 +                             CompactionManager.AllSSTableOpStatus.SUCCESSFUL,
 +                                result);
 +            }
 +        }
 +
 +
 +        // check index is still working
 +        rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(),
numRows);
 +        assertNotNull(rows);
 +        assertEquals(numRows / 2, rows.size());
 +    }
  }


Mime
View raw message