cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stefa...@apache.org
Subject [02/10] cassandra git commit: Prevent OOM failures on SSTable corruption, improve tests for corruption detection
Date Thu, 26 May 2016 02:57:19 GMT
Prevent OOM failures on SSTable corruption, improve tests for corruption detection

Patch by Alex Petrov; reviewed by Stefania Alborghetti for CASSANDRA-9530


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/85cc3901
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/85cc3901
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/85cc3901

Branch: refs/heads/cassandra-3.7
Commit: 85cc390189f45be54dec9b146f66eeb7737fb0eb
Parents: e000ebb
Author: Alex Petrov <oleksandr.petrov@gmail.com>
Authored: Tue May 24 09:46:43 2016 +0200
Committer: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Committed: Thu May 26 10:51:06 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  11 +
 conf/cassandra.yaml                             |   8 +-
 .../org/apache/cassandra/config/Config.java     |   6 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +
 .../apache/cassandra/db/ClusteringPrefix.java   |   4 +-
 .../columniterator/AbstractSSTableIterator.java |   2 +-
 .../cassandra/db/marshal/AbstractType.java      |  46 +++-
 src/java/org/apache/cassandra/db/rows/Rows.java |   2 +-
 .../cassandra/db/rows/UnfilteredSerializer.java |  13 +-
 .../io/sstable/SSTableIdentityIterator.java     |   5 +
 .../io/sstable/format/big/BigTableScanner.java  |   4 +-
 .../io/util/RebufferingInputStream.java         |   2 +-
 .../compaction/BlacklistingCompactionsTest.java |  45 +++-
 .../sstable/SSTableCorruptionDetectionTest.java | 244 +++++++++++++++++++
 .../cassandra/io/sstable/SSTableWriterTest.java |  49 +++-
 .../io/sstable/SSTableWriterTestBase.java       |  10 +-
 17 files changed, 431 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a449fd..ddfb24f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.7
+ * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
  * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
  * Allow compaction strategies to disable early open (CASSANDRA-11754)
  * Refactor Materialized View code (CASSANDRA-11475)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index d13f94f..ac1ef17 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,17 @@ restore snapshots created with the previous major version using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+3.0.7
+=====
+
+Upgrading
+---------
+   - A maximum size for SSTables values has been introduced, to prevent out of memory
+     exceptions when reading corrupt SSTables. This maximum size can be set via
+     max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default
+     value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt
if
+     they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
+
 3.0.6
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a460abd..4b92f64 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -512,7 +512,8 @@ native_transport_port: 9042
 # native_transport_max_threads: 128
 #
 # The maximum size of allowed frame. Frame (requests) larger than this will
-# be rejected as invalid. The default is 256MB.
+# be rejected as invalid. The default is 256MB. If you're changing this parameter,
+# you may want to adjust max_value_size_in_mb accordingly.
 # native_transport_max_frame_size_in_mb: 256
 
 # The maximum number of concurrent client connections.
@@ -944,3 +945,8 @@ enable_scripted_user_defined_functions: false
 # below their system default. The sysinternals 'clockres' tool can confirm your system's
default
 # setting.
 windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 235641c..b49e14c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -152,6 +152,12 @@ public class Config
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;
+    /**
+     * Max size of values in SSTables, in MegaBytes.
+     * Default is the same as the native protocol frame limit: 256Mb.
+     * See AbstractType for how it is used.
+     */
+    public Integer max_value_size_in_mb = 256;
 
     public Integer thrift_framed_transport_size_in_mb = 15;
     public Boolean snapshot_before_compaction = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 295b827..dcda76f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -712,6 +712,9 @@ public class DatabaseDescriptor
         {
             throw new ConfigurationException("Encryption must be enabled in client_encryption_options
for native_transport_port_ssl", false);
         }
+
+        if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
+            throw new ConfigurationException("max_value_size_in_mb must be positive", false);
     }
 
     private static FileStore guessFileStore(String dir) throws IOException
@@ -815,6 +818,11 @@ public class DatabaseDescriptor
         return conf.thrift_framed_transport_size_in_mb * 1024 * 1024;
     }
 
+    public static int getMaxValueSize()
+    {
+        return conf.max_value_size_in_mb * 1024 * 1024;
+    }
+
     /**
      * Creates all storage-related directories.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 17befca..8d28637 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -418,7 +418,9 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
 
         public void prepare(int flags, int extendedFlags) throws IOException
         {
-            assert !UnfilteredSerializer.isStatic(extendedFlags) : "Flags = " + flags;
+            if (UnfilteredSerializer.isStatic(extendedFlags))
+                throw new IOException("Corrupt flags value for clustering prefix (isStatic
flag set): " + flags);
+
             this.nextIsRow = UnfilteredSerializer.kind(flags) == Unfiltered.Kind.ROW;
             this.nextKind = nextIsRow ? Kind.CLUSTERING : ClusteringPrefix.Kind.values()[in.readByte()];
             this.nextSize = nextIsRow ? comparator.size() : in.readUnsignedShort();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 0e2012e..d57e6bc 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -339,7 +339,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
             {
                 return hasNextInternal();
             }
-            catch (IOException e)
+            catch (IOException | IndexOutOfBoundsException e)
             {
                 try
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 331f1a4..9d1cc8a 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -27,9 +27,11 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.db.TypeSizes;
@@ -41,11 +43,9 @@ import org.apache.cassandra.utils.FastByteOperations;
 import org.github.jamm.Unmetered;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.db.marshal.AbstractType.ComparisonType.CUSTOM;
-import static org.apache.cassandra.db.marshal.AbstractType.ComparisonType.NOT_COMPARABLE;
 
 /**
  * Specifies a Comparator for a specific type of ByteBuffer.
@@ -62,7 +62,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
 
     public final Comparator<ByteBuffer> reverseComparator;
 
-    public static enum ComparisonType
+    public enum ComparisonType
     {
         /**
          * This type should never be compared
@@ -82,10 +82,23 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
 
     public final ComparisonType comparisonType;
     public final boolean isByteOrderComparable;
+
+    /**
+     * The maximum size of values for this type, used when some values are not of fixed length,
+     * that is valueLengthIfFixed() returns -1.
+     */
+    public int maxValueSize;
+
     protected AbstractType(ComparisonType comparisonType)
     {
+        this(comparisonType, DatabaseDescriptor.getMaxValueSize());
+    }
+
+    protected AbstractType(ComparisonType comparisonType, int maxValueSize)
+    {
         this.comparisonType = comparisonType;
         this.isByteOrderComparable = comparisonType == ComparisonType.BYTE_ORDER;
+        this.maxValueSize = maxValueSize;
         reverseComparator = (o1, o2) -> AbstractType.this.compare(o2, o1);
         try
         {
@@ -101,6 +114,17 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
         }
     }
 
+    /**
+     * Change the maximum value size, this should only be called for testing.
+     * Unfortunately, ensuring we use a type created with a different maxValueSize
+     * is too hard at the moment, due to the pervasive use of the type's singleton instances.
+     */
+    @VisibleForTesting
+    public void setMaxValueSize(int maxValueSize)
+    {
+        this.maxValueSize = maxValueSize;
+    }
+
     public static List<String> asCQLTypeStringList(List<AbstractType<?>>
abstractTypes)
     {
         List<String> r = new ArrayList<>(abstractTypes.size());
@@ -357,7 +381,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     }
 
     /**
-    * The length of values for this type if all values are of fixed length, -1 otherwise.
+     * The length of values for this type if all values are of fixed length, -1 otherwise.
      */
     protected int valueLengthIfFixed()
     {
@@ -385,10 +409,22 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     public ByteBuffer readValue(DataInputPlus in) throws IOException
     {
         int length = valueLengthIfFixed();
+
         if (length >= 0)
             return ByteBufferUtil.read(in, length);
         else
-            return ByteBufferUtil.readWithVIntLength(in);
+        {
+            int l = (int)in.readUnsignedVInt();
+            if (l < 0)
+                throw new IOException("Corrupt (negative) value length encountered");
+
+            if (l > maxValueSize)
+                throw new IOException(String.format("Corrupt value length %d encountered,
as it exceeds the maximum of %d, " +
+                                                    "which is set via max_value_size_in_mb
in cassandra.yaml",
+                                                    l, maxValueSize));
+
+            return ByteBufferUtil.read(in, l);
+        }
     }
 
     public void skipValue(DataInputPlus in) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index ea2ca06..e325091 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -59,7 +59,7 @@ public abstract class Rows
     }
 
     /**
-     * Collect statistics ont a given row.
+     * Collect statistics on a given row.
      *
      * @param row the row for which to collect stats.
      * @param collector the stats collector.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 4efc5eb..e4202c9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -355,9 +355,18 @@ public class UnfilteredSerializer
         }
         else
         {
-            assert !isStatic(extendedFlags); // deserializeStaticRow should be used for that.
+            // deserializeStaticRow should be used for that.
+            if (isStatic(extendedFlags))
+                throw new IOException("Corrupt flags value for unfiltered partition (isStatic
flag set): " + flags);
+
             builder.newRow(Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes()));
-            return deserializeRowBody(in, header, helper, flags, extendedFlags, builder);
+            Row row = deserializeRowBody(in, header, helper, flags, extendedFlags, builder);
+            // we do not write empty rows because Rows.collectStats(), called by BTW.applyToRow(),
asserts that rows are not empty
+            // if we don't throw here, then later the very same assertion in Rows.collectStats()
will fail compactions
+            // see BlackListingCompactionsTest and CASSANDRA-9530 for details
+            if (row.isEmpty())
+                throw new IOException("Corrupt empty row found in unfiltered partition");
+            return row;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index acdf6bb..6fbc690 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -99,6 +99,11 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered>
implem
         {
             return iterator.hasNext() ? iterator.next() : endOfData();
         }
+        catch (IndexOutOfBoundsException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, filename);
+        }
         catch (IOError e)
         {
             if (e.getCause() instanceof IOException)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 717cfdc..a3bd442 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -329,8 +329,8 @@ public class BigTableScanner implements ISSTableScanner
                         {
                             if (dataRange == null)
                             {
-                                dfile.seek(currentEntry.position + currentEntry.headerOffset());
-                                ByteBufferUtil.readWithShortLength(dfile); // key
+                                dfile.seek(currentEntry.position);
+                                ByteBufferUtil.skipShortLength(dfile); // key
                                 return new SSTableIdentityIterator(sstable, dfile, partitionKey());
                             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index 3068746..15d0975 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -65,7 +65,7 @@ public abstract class RebufferingInputStream extends InputStream implements
Data
     {
         int read = read(b, off, len);
         if (read < len)
-            throw new EOFException();
+            throw new EOFException("EOF after " + read + " bytes out of " + len);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
index 19d2347..21ce450 100644
--- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
@@ -28,25 +28,32 @@ import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.*;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class BlacklistingCompactionsTest
 {
+    private static final Logger logger = LoggerFactory.getLogger(BlacklistingCompactionsTest.class);
+
+    private static Random random;
+
     private static final String KEYSPACE1 = "BlacklistingCompactionsTest";
-    private static final String CF_STANDARD1 = "Standard1";
+    private static final String STANDARD_STCS = "Standard_STCS";
+    private static final String STANDARD_LCS = "Standard_LCS";
 
     @After
     public void leakDetect() throws InterruptedException
@@ -60,10 +67,26 @@ public class BlacklistingCompactionsTest
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
+        long seed = System.nanoTime();
+        //long seed = 754271160974509L; // CASSANDRA-9530: use this seed to reproduce compaction
failures if reading empty rows
+        logger.info("Seed {}", seed);
+        random = new Random(seed);
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD_STCS).compaction(CompactionParams.DEFAULT),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD_LCS).compaction(CompactionParams.lcs(Collections.emptyMap())));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        for (String tableName : new String[] {STANDARD_STCS, STANDARD_LCS})
+        {
+            final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(tableName);
+
+            for (ColumnDefinition cd : cfs.metadata.allColumns())
+                cd.type.setMaxValueSize(1024 * 1024); // set max value size to 1MB
+        }
+
         closeStdErr();
     }
 
@@ -80,20 +103,20 @@ public class BlacklistingCompactionsTest
     @Test
     public void testBlacklistingWithSizeTieredCompactionStrategy() throws Exception
     {
-        testBlacklisting(SizeTieredCompactionStrategy.class.getCanonicalName());
+        testBlacklisting(STANDARD_STCS);
     }
 
     @Test
     public void testBlacklistingWithLeveledCompactionStrategy() throws Exception
     {
-        testBlacklisting(LeveledCompactionStrategy.class.getCanonicalName());
+        testBlacklisting(STANDARD_LCS);
     }
 
-    public void testBlacklisting(String compactionStrategy) throws Exception
+    private void testBlacklisting(String tableName) throws Exception
     {
         // this test does enough rows to force multiple block indexes to be used
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+        final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(tableName);
 
         final int ROWS_PER_SSTABLE = 10;
         final int SSTABLES = cfs.metadata.params.minIndexInterval * 2 / ROWS_PER_SSTABLE;
@@ -142,11 +165,11 @@ public class BlacklistingCompactionsTest
                 raf = new RandomAccessFile(sstable.getFilename(), "rw");
                 assertNotNull(raf);
                 assertTrue(raf.length() > corruptionSize);
-                raf.seek(new Random().nextInt((int)(raf.length() - corruptionSize)));
+                raf.seek(random.nextInt((int)(raf.length() - corruptionSize)));
                 // We want to write something large enough that the corruption cannot get
undetected
                 // (even without compression)
                 byte[] corruption = new byte[corruptionSize];
-                Arrays.fill(corruption, (byte)0xFF);
+                random.nextBytes(corruption);
                 raf.write(corruption);
 
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
new file mode 100644
index 0000000..0c27fc8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.*;
+import org.apache.cassandra.cache.*;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.schema.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableCorruptionDetectionTest extends SSTableWriterTestBase
+{
+    private static final Logger logger = LoggerFactory.getLogger(SSTableCorruptionDetectionTest.class);
+
+    private static final int numberOfPks = 1000;
+    private static final int numberOfRuns = 100;
+    private static final int valueSize = 512 * 1024;
+    // Set corruption size larger or in comparable size to value size, otherwise
+    // chance for corruption to land in the middle of value is quite high.
+    private static final int maxCorruptionSize = 2 * 1024 * 1024;
+
+    private static final String keyspace = "SSTableCorruptionDetectionTest";
+    private static final String table = "corrupted_table";
+
+    private static Random random;
+    private static SSTableWriter writer;
+    private static LifecycleTransaction txn;
+    private static ColumnFamilyStore cfs;
+    private static SSTableReader ssTableReader;
+
+    @BeforeClass
+    public static void setUp()
+    {
+        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
+                                           .addPartitionKey("pk", AsciiType.instance)
+                                           .addClusteringColumn("ck1", AsciiType.instance)
+                                           .addClusteringColumn("ck2", AsciiType.instance)
+                                           .addRegularColumn("reg1", BytesType.instance)
+                                           .addRegularColumn("reg2", BytesType.instance)
+                                           .build();
+
+        cfm.compression(CompressionParams.noCompression());
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    cfm);
+
+        cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+        cfs.disableAutoCompaction();
+
+        for (ColumnDefinition cd : cfs.metadata.allColumns())
+            cd.type.setMaxValueSize(1024 * 1024);
+
+        long seed = System.nanoTime();
+        logger.info("Seed {}", seed);
+        random = new Random(seed);
+
+        truncate(cfs);
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        txn = LifecycleTransaction.offline(OperationType.WRITE);
+
+        // Setting up/writing large values is an expensive operation, we only want to do
it once per run
+        writer = getWriter(cfs, dir, txn);
+        for (int i = 0; i < numberOfPks; i++)
+        {
+            UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, String.format("pkvalue_%07d",
i)).withTimestamp(1);
+            byte[] reg1 = new byte[valueSize];
+            random.nextBytes(reg1);
+            byte[] reg2 = new byte[valueSize];
+            random.nextBytes(reg2);
+            builder.newRow("clustering_" + i, "clustering_" + (i + 1))
+                   .add("reg1", ByteBuffer.wrap(reg1))
+                   .add("reg2", ByteBuffer.wrap(reg2));
+            writer.append(builder.build().unfilteredIterator());
+        }
+        cfs.forceBlockingFlush();
+
+        ssTableReader = writer.finish(true);
+        txn.update(ssTableReader, false);
+        LifecycleTransaction.waitForDeletions();
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        txn.abort();
+        writer.close();
+    }
+
+    @Test
+    public void testSinglePartitionIterator() throws Throwable
+    {
+        bruteForceCorruptionTest(ssTableReader, partitionIterator());
+    }
+
+    @Test
+    public void testSSTableScanner() throws Throwable
+    {
+        bruteForceCorruptionTest(ssTableReader, sstableScanner());
+    }
+
+    private void bruteForceCorruptionTest(SSTableReader ssTableReader, Consumer<SSTableReader>
walker) throws Throwable
+    {
+        RandomAccessFile raf = new RandomAccessFile(ssTableReader.getFilename(), "rw");
+
+        int corruptedCounter = 0;
+
+        int fileLength = (int)raf.length(); // in current test, it does fit into int
+        for (int i = 0; i < numberOfRuns; i++)
+        {
+            final int corruptionPosition = random.nextInt(fileLength - 1); //ensure at least
one byte will be corrupted
+            // corrupt max from position to end of file
+            final int corruptionSize = Math.min(maxCorruptionSize, random.nextInt(fileLength
- corruptionPosition));
+
+            byte[] backup = corruptSstable(raf, corruptionPosition, corruptionSize);
+
+            try
+            {
+                walker.accept(ssTableReader);
+            }
+            catch (CorruptSSTableException t)
+            {
+                corruptedCounter++;
+            }
+            finally
+            {
+                restore(raf, corruptionPosition, backup);
+            }
+        }
+
+        assertTrue(corruptedCounter > 0);
+        FileUtils.closeQuietly(raf);
+    }
+
+    private Consumer<SSTableReader> sstableScanner()
+    {
+        return (SSTableReader sstable) -> {
+            try (ISSTableScanner scanner = sstable.getScanner())
+            {
+                while (scanner.hasNext())
+                {
+                    try (UnfilteredRowIterator rowIter = scanner.next())
+                    {
+                        if (rowIter.hasNext())
+                        {
+                            Unfiltered unfiltered = rowIter.next();
+                            if (unfiltered.isRow())
+                            {
+                                Row row = (Row) unfiltered;
+                                assertEquals(2, row.clustering().size());
+                                // no-op read
+                            }
+                        }
+                    }
+
+                }
+            }
+        };
+    }
+
+    private Consumer<SSTableReader> partitionIterator()
+    {
+        return (SSTableReader sstable) -> {
+            for (int i = 0; i < numberOfPks; i++)
+            {
+                DecoratedKey dk = Util.dk(String.format("pkvalue_%07d", i));
+                try (UnfilteredRowIterator rowIter = sstable.iterator(dk, ColumnFilter.all(cfs.metadata),
false, false))
+                {
+                    while (rowIter.hasNext())
+                    {
+                        Unfiltered unfiltered = rowIter.next();
+                        if (unfiltered.isRow())
+                        {
+                            Row row = (Row) unfiltered;
+                            assertEquals(2, row.clustering().size());
+                            // no-op read
+                        }
+                    }
+                    rowIter.close();
+                }
+            }
+        };
+    }
+
+    private byte[] corruptSstable(RandomAccessFile raf, int position, int corruptionSize)
throws IOException
+    {
+        byte[] backup = new byte[corruptionSize];
+        raf.seek(position);
+        raf.read(backup);
+
+        raf.seek(position);
+        byte[] corruption = new byte[corruptionSize];
+        random.nextBytes(corruption);
+        raf.write(corruption);
+
+        return backup;
+    }
+
+    private void restore(RandomAccessFile raf, int position, byte[] backup) throws IOException
+    {
+        raf.seek(position);
+        raf.write(backup);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
index a73a164..6f18461 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@ -23,15 +23,17 @@ import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
-import org.apache.cassandra.UpdateBuilder;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.*;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static junit.framework.Assert.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -197,4 +199,45 @@ public class SSTableWriterTest extends SSTableWriterTestBase
             writer2.close();
         }
     }
+
+    @Test
+    public void testValueTooBigCorruption() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
+        truncate(cfs);
+
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+        try (SSTableWriter writer1 = getWriter(cfs, dir, txn))
+        {
+            UpdateBuilder largeValue = UpdateBuilder.create(cfs.metadata, "large_value").withTimestamp(1);
+            largeValue.newRow("clustering").add("val", ByteBuffer.allocate(2 * 1024 * 1024));
+            writer1.append(largeValue.build().unfilteredIterator());
+
+            SSTableReader sstable = writer1.finish(true);
+
+            txn.update(sstable, false);
+
+            try
+            {
+                DecoratedKey dk = Util.dk("large_value");
+                UnfilteredRowIterator rowIter = sstable.iterator(dk, ColumnFilter.all(cfs.metadata),
false, false);
+                while (rowIter.hasNext())
+                {
+                    rowIter.next();
+                    // no-op read, as values may not appear expected
+                }
+                fail("Expected a CorruptSSTableException to be thrown");
+            }
+            catch (CorruptSSTableException e)
+            {
+            }
+
+            txn.abort();
+            LifecycleTransaction.waitForDeletions();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
index 2db92f7..f2c97c0 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -31,6 +31,7 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -54,6 +55,7 @@ public class SSTableWriterTestBase extends SchemaLoader
 
     protected static final String KEYSPACE = "SSTableRewriterTest";
     protected static final String CF = "Standard1";
+    protected static final String CF_SMALL_MAX_VALUE = "Standard_SmallMaxValue";
 
     private static Config.DiskAccessMode standardMode;
     private static Config.DiskAccessMode indexMode;
@@ -73,7 +75,13 @@ public class SSTableWriterTestBase extends SchemaLoader
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE, CF));
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_SMALL_MAX_VALUE));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
+        for (ColumnDefinition cd : cfs.metadata.allColumns())
+            cd.type.setMaxValueSize(1024 * 1024); // set max value size to 1MB
     }
 
     @AfterClass


Mime
View raw message