cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] cassandra git commit: Replace all usages of Adler32 with CRC32 which has a fast instrinsic now
Date Mon, 10 Aug 2015 16:16:52 GMT
Replace all usages of Adler32 with CRC32 which has a fast instrinsic now

The switch to adler happened across two versions depending on whether the data was compressed
or uncompressed

Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-8684


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5baf28d0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5baf28d0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5baf28d0

Branch: refs/heads/trunk
Commit: 5baf28d0935b7f112c499856b3bc00c722feb460
Parents: 2fcfc7c
Author: Ariel Weisberg <ariel@weisberg.ws>
Authored: Fri Jul 31 11:56:50 2015 -0400
Committer: T Jake Luciani <jake@apache.org>
Committed: Mon Aug 10 12:14:40 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/cache/AutoSavingCache.java |  2 +-
 .../compress/CompressedRandomAccessReader.java  | 10 ++--
 .../io/compress/CompressedSequentialWriter.java |  4 +-
 .../io/compress/CompressionMetadata.java        | 14 +++--
 .../apache/cassandra/io/sstable/Component.java  | 18 ++++--
 .../cassandra/io/sstable/format/Version.java    |  6 +-
 .../io/sstable/format/big/BigFormat.java        | 38 +++++++++---
 .../io/util/ChecksummedRandomAccessReader.java  |  4 +-
 .../io/util/DataIntegrityMetadata.java          | 10 ++--
 .../compress/CompressedInputStream.java         |  6 +-
 .../compress/CompressedStreamReader.java        |  2 +-
 .../apache/cassandra/utils/ChecksumType.java    | 63 ++++++++++++++++++++
 .../org/apache/cassandra/db/VerifyTest.java     |  7 ++-
 .../CompressedRandomAccessReaderTest.java       |  8 +--
 .../CompressedSequentialWriterTest.java         |  4 +-
 .../compression/CompressedInputStreamTest.java  |  4 +-
 17 files changed, 151 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c7d466a..f1ac423 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta1
+ * Replace all usages of Adler32 with CRC32
  * Fix row deletion bug for Materialized Views (CASSANDRA-10014)
  * Support mixed-version clusters with Cassandra 2.1 and 2.2 (CASSANDRA-9704)
  * Fix multiple slices on RowSearchers (CASSANDRA-10002)
@@ -9,6 +10,7 @@
  * Add transparent data encryption core classes (CASSANDRA-9945)
  * Bytecode inspection for Java-UDFs (CASSANDRA-9890)
  * Use byte to serialize MT hash length (CASSANDRA-9792)
+ * Replace usage of Adler32 with CRC32 (CASSANDRA-8684)
 Merged from 2.2:
  * Add checksum to saved cache files (CASSANDRA-9265)
  * Log warning when using an aggregate without partition key (CASSANDRA-9737)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 3c5b6a5..2a838ab 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -62,7 +62,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     protected final CacheService.CacheType cacheType;
 
     private final CacheSerializer<K, V> cacheLoader;
-    private static final String CURRENT_VERSION = "c";
+    private static final String CURRENT_VERSION = "d";
 
     private static volatile IStreamFactory streamFactory = new IStreamFactory()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 01b4655..c38f4d2 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.zip.Adler32;
-
+import java.util.zip.Checksum;
 
 import com.google.common.primitives.Ints;
 
@@ -58,7 +58,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     private ByteBuffer compressed;
 
     // re-use single crc object
-    private final Adler32 checksum;
+    private final Checksum checksum;
 
     // raw checksum bytes
     private ByteBuffer checksumBytes;
@@ -67,7 +67,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     {
         super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType());
         this.metadata = metadata;
-        checksum = new Adler32();
+        checksum = metadata.checksumType.newInstance();
 
         chunkSegments = file == null ? null : file.chunkSegments();
         if (chunkSegments == null)
@@ -131,7 +131,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
             if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
             {
                 compressed.rewind();
-                checksum.update(compressed);
+                metadata.checksumType.update( checksum, (compressed));
 
                 if (checksum(chunk) != (int) checksum.getValue())
                     throw new CorruptBlockException(getPath(), chunk);
@@ -193,7 +193,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
             {
                 compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
 
-                checksum.update(compressedChunk);
+                metadata.checksumType.update( checksum, compressedChunk);
 
                 compressedChunk.limit(compressedChunk.capacity());
                 if (compressedChunk.getInt() != (int) checksum.getValue())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index bc1e6f6..a4afa3f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -23,7 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
-import java.util.zip.Adler32;
+import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
@@ -204,7 +204,7 @@ public class CompressedSequentialWriter extends SequentialWriter
                 throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
             }
 
-            Adler32 checksum = new Adler32();
+            CRC32 checksum = new CRC32();
             compressed.rewind();
             checksum.update(compressed);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index bd6da2c..f5d8f7e 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.SafeMemory;
 import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Transactional;
 import org.apache.cassandra.utils.concurrent.Ref;
@@ -71,6 +72,7 @@ public class CompressionMetadata
     private final long chunkOffsetsSize;
     public final String indexFilePath;
     public final CompressionParams parameters;
+    public final ChecksumType checksumType;
 
     /**
      * Create metadata about given compressed file including uncompressed data length, chunk
size
@@ -86,13 +88,14 @@ public class CompressionMetadata
     public static CompressionMetadata create(String dataFilePath)
     {
         Descriptor desc = Descriptor.fromFilename(dataFilePath);
-        return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new
File(dataFilePath).length());
+        return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new
File(dataFilePath).length(), desc.version.compressedChecksumType());
     }
 
     @VisibleForTesting
-    CompressionMetadata(String indexFilePath, long compressedLength)
+    CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType)
     {
         this.indexFilePath = indexFilePath;
+        this.checksumType = checksumType;
 
         try (DataInputStream stream = new DataInputStream(new FileInputStream(indexFilePath)))
         {
@@ -131,7 +134,7 @@ public class CompressionMetadata
         this.chunkOffsetsSize = chunkOffsets.size();
     }
 
-    private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory
offsets, long offsetsSize, long dataLength, long compressedLength)
+    private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory
offsets, long offsetsSize, long dataLength, long compressedLength, ChecksumType checksumType)
     {
         this.indexFilePath = filePath;
         this.parameters = parameters;
@@ -139,6 +142,7 @@ public class CompressionMetadata
         this.compressedFileLength = compressedLength;
         this.chunkOffsets = offsets;
         this.chunkOffsetsSize = offsetsSize;
+        this.checksumType = checksumType;
     }
 
     public ICompressor compressor()
@@ -380,7 +384,7 @@ public class CompressionMetadata
             if (count < this.count)
                 compressedLength = offsets.getLong(count * 8L);
 
-            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength,
compressedLength);
+            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength,
compressedLength, ChecksumType.CRC32);
         }
 
         /**
@@ -398,7 +402,7 @@ public class CompressionMetadata
         /**
          * Reset the writer so that the next chunk offset written will be the
          * one of {@code chunkIndex}.
-         * 
+         *
          * @param chunkIndex the next index to write
          */
         public void resetAndTruncate(int chunkIndex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index a431f29..54dd35b 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -48,7 +48,7 @@ public class Component
         // statistical metadata about the content of the sstable
         STATS("Statistics.db"),
         // holds adler32 checksum of the data file
-        DIGEST("Digest.adler32"),
+        DIGEST(new String[] { "Digest.crc32", "Digest.adler32" }),
         // holds the CRC32 for chunks in an a uncompressed file.
         CRC("CRC.db"),
         // holds SSTable Index Summary (sampling of Index component)
@@ -56,19 +56,25 @@ public class Component
         // table of contents, stores the list of all components for the sstable
         TOC("TOC.txt"),
         // custom component, used by e.g. custom compaction strategy
-        CUSTOM(null);
+        CUSTOM(new String[] { null });
 
-        final String repr;
+        final String[] repr;
         Type(String repr)
         {
+            this(new String[] { repr });
+        }
+
+        Type(String[] repr)
+        {
             this.repr = repr;
         }
 
         static Type fromRepresentation(String repr)
         {
             for (Type type : TYPES)
-                if (repr.equals(type.repr))
-                    return type;
+                for (String representation : type.repr)
+                    if (repr.equals(representation))
+                        return type;
             return CUSTOM;
         }
     }
@@ -90,7 +96,7 @@ public class Component
 
     public Component(Type type)
     {
-        this(type, type.repr);
+        this(type, type.repr[0]);
         assert type != Type.CUSTOM;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 10ceb24..9ef0b43 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.sstable.format;
 
 import java.util.regex.Pattern;
 
+import org.apache.cassandra.utils.ChecksumType;
+
 /**
  * A set of feature flags associated with a SSTable format
  *
@@ -48,7 +50,9 @@ public abstract class Version
 
     public abstract boolean hasNewStatsFile();
 
-    public abstract boolean hasAllAdlerChecksums();
+    public abstract ChecksumType compressedChecksumType();
+
+    public abstract ChecksumType uncompressedChecksumType();
 
     public abstract boolean hasRepairedAt();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 860cd9f..6df4b1e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ChecksumType;
 
 /**
  * Legacy bigtable format
@@ -81,11 +82,11 @@ public class BigFormat implements SSTableFormat
     static class WriterFactory extends SSTableWriter.Factory
     {
         @Override
-        public SSTableWriter open(Descriptor descriptor, 
-                                  long keyCount, 
-                                  long repairedAt, 
-                                  CFMetaData metadata, 
-                                  MetadataCollector metadataCollector, 
+        public SSTableWriter open(Descriptor descriptor,
+                                  long keyCount,
+                                  long repairedAt,
+                                  CFMetaData metadata,
+                                  MetadataCollector metadataCollector,
                                   SerializationHeader header,
                                   LifecycleTransaction txn)
         {
@@ -126,7 +127,8 @@ public class BigFormat implements SSTableFormat
         private final boolean isLatestVersion;
         private final boolean hasSamplingLevel;
         private final boolean newStatsFile;
-        private final boolean hasAllAdlerChecksums;
+        private final ChecksumType compressedChecksumType;
+        private final ChecksumType uncompressedChecksumType;
         private final boolean hasRepairedAt;
         private final boolean tracksLegacyCounterShards;
         private final boolean newFileName;
@@ -145,7 +147,19 @@ public class BigFormat implements SSTableFormat
             isLatestVersion = version.compareTo(current_version) == 0;
             hasSamplingLevel = version.compareTo("ka") >= 0;
             newStatsFile = version.compareTo("ka") >= 0;
-            hasAllAdlerChecksums = version.compareTo("ka") >= 0;
+
+            //For a while Adler32 was in use, now the CRC32 instrinsic is very good especially
after Haswell
+            //PureJavaCRC32 was always faster than Adler32. See CASSANDRA-8684
+            ChecksumType checksumType = ChecksumType.CRC32;
+            if (version.compareTo("ka") >= 0 && version.compareTo("ma") < 0)
+                checksumType = ChecksumType.Adler32;
+            this.uncompressedChecksumType = checksumType;
+
+            checksumType = ChecksumType.CRC32;
+            if (version.compareTo("jb") >= 0 && version.compareTo("ma") < 0)
+                checksumType = ChecksumType.Adler32;
+            this.compressedChecksumType = checksumType;
+
             hasRepairedAt = version.compareTo("ka") >= 0;
             tracksLegacyCounterShards = version.compareTo("ka") >= 0;
 
@@ -177,9 +191,15 @@ public class BigFormat implements SSTableFormat
         }
 
         @Override
-        public boolean hasAllAdlerChecksums()
+        public ChecksumType compressedChecksumType()
+        {
+            return compressedChecksumType;
+        }
+
+        @Override
+        public ChecksumType uncompressedChecksumType()
         {
-            return hasAllAdlerChecksums;
+            return uncompressedChecksumType;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 976ff23..3fc247b 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.io.util;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.zip.Adler32;
+import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,7 +52,7 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader
     {
         ChannelProxy channel = new ChannelProxy(file);
         RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
-        DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new
Adler32(),
+        DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new
CRC32(),
                                                                                         
               crcReader,
                                                                                         
               file.getPath());
         return new ChecksummedRandomAccessReader(file, channel, validator);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index ac2ab47..70cd860 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -25,7 +25,6 @@ import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
-import java.util.zip.Adler32;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
@@ -35,7 +34,6 @@ import com.google.common.base.Charsets;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.utils.FBUtilities;
 
 public class DataIntegrityMetadata
 {
@@ -53,7 +51,7 @@ public class DataIntegrityMetadata
 
         public ChecksumValidator(Descriptor descriptor) throws IOException
         {
-            this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new CRC32(),
+            this(descriptor.version.uncompressedChecksumType().newInstance(),
                  RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))),
                  descriptor.filenameFor(Component.DATA));
         }
@@ -110,7 +108,7 @@ public class DataIntegrityMetadata
         public FileDigestValidator(Descriptor descriptor) throws IOException
         {
             this.descriptor = descriptor;
-            checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new CRC32();
+            checksum = descriptor.version.uncompressedChecksumType().newInstance();
             digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST)));
             dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA)));
             try
@@ -154,9 +152,9 @@ public class DataIntegrityMetadata
 
     public static class ChecksumWriter
     {
-        private final Adler32 incrementalChecksum = new Adler32();
+        private final CRC32 incrementalChecksum = new CRC32();
         private final DataOutput incrementalOut;
-        private final Adler32 fullChecksum = new Adler32();
+        private final CRC32 fullChecksum = new CRC32();
 
         public ChecksumWriter(DataOutput incrementalOut)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 099fd14..0a118b2 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -24,13 +24,13 @@ import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 /**
@@ -65,10 +65,10 @@ public class CompressedInputStream extends InputStream
      * @param source Input source to read compressed data from
      * @param info Compression info
      */
-    public CompressedInputStream(InputStream source, CompressionInfo info)
+    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType)
     {
         this.info = info;
-        this.checksum =  new Adler32();
+        this.checksum =  checksumType.newInstance();
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length,
1024));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 47832f0..205291b 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -77,7 +77,7 @@ public class CompressedStreamReader extends StreamReader
 
         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
 
-        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel),
compressionInfo);
+        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel),
compressionInfo, inputVersion.compressedChecksumType());
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion,
header.toHeader(cfs.metadata));
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/utils/ChecksumType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ChecksumType.java b/src/java/org/apache/cassandra/utils/ChecksumType.java
new file mode 100644
index 0000000..c9a1eb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/ChecksumType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
+import java.util.zip.Adler32;
+
+public enum ChecksumType
+{
+    Adler32()
+    {
+
+        @Override
+        public Checksum newInstance()
+        {
+            return new Adler32();
+        }
+
+        @Override
+        public void update(Checksum checksum, ByteBuffer buf)
+        {
+            ((Adler32)checksum).update(buf);
+        }
+
+    },
+    CRC32()
+    {
+
+        @Override
+        public Checksum newInstance()
+        {
+            return new CRC32();
+        }
+
+        @Override
+        public void update(Checksum checksum, ByteBuffer buf)
+        {
+            ((CRC32)checksum).update(buf);
+        }
+
+    };
+
+    public abstract Checksum newInstance();
+
+    public abstract void update(Checksum checksum, ByteBuffer buf);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 3bd4a47..13ce0c1 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import com.google.common.base.Charsets;
+
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
@@ -43,7 +44,7 @@ import org.junit.runner.RunWith;
 
 import java.io.*;
 import java.nio.file.Files;
-import java.util.zip.Adler32;
+import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 
 import static org.junit.Assert.fail;
@@ -371,8 +372,8 @@ public class VerifyTest
     protected long simpleFullChecksum(String filename) throws IOException
     {
         FileInputStream inputStream = new FileInputStream(filename);
-        Adler32 adlerChecksum = new Adler32();
-        CheckedInputStream cinStream = new CheckedInputStream(inputStream, adlerChecksum);
+        CRC32 checksum = new CRC32();
+        CheckedInputStream cinStream = new CheckedInputStream(inputStream, checksum);
         byte[] b = new byte[128];
         while (cinStream.read(b) >= 0) {
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index cc76a9e..8f94cf2 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -24,7 +24,6 @@ import java.io.RandomAccessFile;
 import java.util.Random;
 
 import org.junit.Test;
-
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -35,6 +34,7 @@ import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.SyncUtil;
 
 import static org.junit.Assert.assertEquals;
@@ -84,7 +84,7 @@ public class CompressedRandomAccessReaderTest
                 writer.write("x".getBytes());
             writer.finish();
 
-            CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel,
new CompressionMetadata(filename + ".metadata", f.length()));
+            CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel,
new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
             String res = reader.readLine();
             assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
             assertEquals(40, res.length());
@@ -129,7 +129,7 @@ public class CompressedRandomAccessReaderTest
 
             assert f.exists();
             RandomAccessReader reader = compressed
-                                      ? CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename
+ ".metadata", f.length()))
+                                      ? CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename
+ ".metadata", f.length(), ChecksumType.CRC32))
                                       : RandomAccessReader.open(f);
             String expected = "The quick brown fox jumps over the lazy dog";
             assertEquals(expected.length(), reader.length());
@@ -171,7 +171,7 @@ public class CompressedRandomAccessReaderTest
         ChannelProxy channel = new ChannelProxy(file);
 
         // open compression metadata and get chunk information
-        CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length());
+        CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(),
ChecksumType.CRC32);
         CompressionMetadata.Chunk chunk = meta.chunkFor(0);
 
         RandomAccessReader reader = CompressedRandomAccessReader.open(channel, meta);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index db99317..28af0ae 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -31,6 +31,7 @@ import org.junit.After;
 import org.junit.Test;
 
 import junit.framework.Assert;
+
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -41,6 +42,7 @@ import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SequentialWriterTest;
 import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ChecksumType;
 
 public class CompressedSequentialWriterTest extends SequentialWriterTest
 {
@@ -115,7 +117,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
             }
 
             assert f.exists();
-            RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename
+ ".metadata", f.length()));
+            RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename
+ ".metadata", f.length(), ChecksumType.CRC32));
             assertEquals(dataPre.length + rawPost.length, reader.length());
             byte[] result = new byte[(int)reader.length()];
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 37aea91..e3014c3 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -21,7 +21,6 @@ import java.io.*;
 import java.util.*;
 
 import org.junit.Test;
-
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
@@ -32,6 +31,7 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.streaming.compress.CompressedInputStream;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -108,7 +108,7 @@ public class CompressedInputStreamTest
 
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
-        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead),
info);
+        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead),
info, ChecksumType.CRC32);
         DataInputStream in = new DataInputStream(input);
 
         for (int i = 0; i < sections.size(); i++)


Mime
View raw message