cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jji...@apache.org
Subject [4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Thu, 07 Dec 2017 05:58:52 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e74f014
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e74f014
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e74f014

Branch: refs/heads/trunk
Commit: 4e74f01488e03d85516b68514388c32d3c78965c
Parents: c169d49 b885e9c
Author: Jeff Jirsa <jjirsa@apple.com>
Authored: Wed Dec 6 21:56:22 2017 -0800
Committer: Jeff Jirsa <jjirsa@apple.com>
Committed: Wed Dec 6 21:56:47 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                                    | 1 +
 .../org/apache/cassandra/io/util/CompressedChunkReader.java    | 6 ++++--
 .../cassandra/streaming/compress/CompressedInputStream.java    | 3 ++-
 3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e74f014/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3c6565c,b275397..1a1a2cf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,7 +1,16 @@@
 +3.11.2
 + * Remove OpenJDK log warning (CASSANDRA-13916)
 + * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
 + * Cache disk boundaries (CASSANDRA-13215)
 + * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
 + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
 + * Update jackson JSON jars (CASSANDRA-13949)
 + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
 +Merged from 3.0:
  3.0.16
+  * Optimize CRC check chance probability calculations (CASSANDRA-14094)
   * Fix cleanup on keyspace with no replicas (CASSANDRA-13526)
 - * Fix updating base table rows with TTL not removing materialized view entries (CASSANDRA-14071)
 + * Fix updating base table rows with TTL not removing view entries (CASSANDRA-14071)
   * Reduce garbage created by DynamicSnitch (CASSANDRA-14091)
   * More frequent commitlog chained markers (CASSANDRA-13987)
   * Fix serialized size of DataLimits (CASSANDRA-14057)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e74f014/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
index 8f00ce7,0000000..0919c29
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
@@@ -1,227 -1,0 +1,229 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.io.util;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.concurrent.ThreadLocalRandom;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.primitives.Ints;
 +
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.compress.CorruptBlockException;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +
 +public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
 +{
 +    final CompressionMetadata metadata;
 +
 +    protected CompressedChunkReader(ChannelProxy channel, CompressionMetadata metadata)
 +    {
 +        super(channel, metadata.dataLength);
 +        this.metadata = metadata;
 +        assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two
 +    }
 +
 +    @VisibleForTesting
 +    public double getCrcCheckChance()
 +    {
 +        return metadata.parameters.getCrcCheckChance();
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("CompressedChunkReader.%s(%s - %s, chunk length %d, data length
%d)",
 +                             getClass().getSimpleName(),
 +                             channel.filePath(),
 +                             metadata.compressor().getClass().getSimpleName(),
 +                             metadata.chunkLength(),
 +                             metadata.dataLength);
 +    }
 +
 +    @Override
 +    public int chunkSize()
 +    {
 +        return metadata.chunkLength();
 +    }
 +
 +    @Override
 +    public BufferType preferredBufferType()
 +    {
 +        return metadata.compressor().preferredBufferType();
 +    }
 +
 +    @Override
 +    public Rebufferer instantiateRebufferer()
 +    {
 +        return new BufferManagingRebufferer.Aligned(this);
 +    }
 +
 +    public static class Standard extends CompressedChunkReader
 +    {
 +        // we read the raw compressed bytes into this buffer, then uncompressed them into
the provided one.
 +        private final ThreadLocal<ByteBuffer> compressedHolder;
 +
 +        public Standard(ChannelProxy channel, CompressionMetadata metadata)
 +        {
 +            super(channel, metadata);
 +            compressedHolder = ThreadLocal.withInitial(this::allocateBuffer);
 +        }
 +
 +        public ByteBuffer allocateBuffer()
 +        {
 +            return allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
 +        }
 +
 +        public ByteBuffer allocateBuffer(int size)
 +        {
 +            return metadata.compressor().preferredBufferType().allocate(size);
 +        }
 +
 +        @Override
 +        public void readChunk(long position, ByteBuffer uncompressed)
 +        {
 +            try
 +            {
 +                // accesses must always be aligned
 +                assert (position & -uncompressed.capacity()) == position;
 +                assert position <= fileLength;
 +
 +                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +                ByteBuffer compressed = compressedHolder.get();
 +
 +                if (compressed.capacity() < chunk.length)
 +                {
 +                    compressed = allocateBuffer(chunk.length);
 +                    compressedHolder.set(compressed);
 +                }
 +                else
 +                {
 +                    compressed.clear();
 +                }
 +
 +                compressed.limit(chunk.length);
 +                if (channel.read(compressed, chunk.offset) != chunk.length)
 +                    throw new CorruptBlockException(channel.filePath(), chunk);
 +
 +                compressed.flip();
 +                uncompressed.clear();
 +
 +                try
 +                {
 +                    metadata.compressor().uncompress(compressed, uncompressed);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new CorruptBlockException(channel.filePath(), chunk, e);
 +                }
 +                finally
 +                {
 +                    uncompressed.flip();
 +                }
 +
-                 if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
++                if (getCrcCheckChance() >= 1d ||
++                    getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
 +                {
 +                    compressed.rewind();
 +                    int checksum = (int) metadata.checksumType.of(compressed);
 +
 +                    compressed.clear().limit(Integer.BYTES);
 +                    if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES
 +                                || compressed.getInt(0) != checksum)
 +                        throw new CorruptBlockException(channel.filePath(), chunk);
 +                }
 +            }
 +            catch (CorruptBlockException e)
 +            {
 +                throw new CorruptSSTableException(e, channel.filePath());
 +            }
 +        }
 +    }
 +
 +    public static class Mmap extends CompressedChunkReader
 +    {
 +        protected final MmappedRegions regions;
 +
 +        public Mmap(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
 +        {
 +            super(channel, metadata);
 +            this.regions = regions;
 +        }
 +
 +        @Override
 +        public void readChunk(long position, ByteBuffer uncompressed)
 +        {
 +            try
 +            {
 +                // accesses must always be aligned
 +                assert (position & -uncompressed.capacity()) == position;
 +                assert position <= fileLength;
 +
 +                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +
 +                MmappedRegions.Region region = regions.floor(chunk.offset);
 +                long segmentOffset = region.offset();
 +                int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
 +                ByteBuffer compressedChunk = region.buffer();
 +
 +                compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
 +
 +                uncompressed.clear();
 +
 +                try
 +                {
 +                    metadata.compressor().uncompress(compressedChunk, uncompressed);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new CorruptBlockException(channel.filePath(), chunk, e);
 +                }
 +                finally
 +                {
 +                    uncompressed.flip();
 +                }
 +
-                 if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
++                if (getCrcCheckChance() >= 1d ||
++                    getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
 +                {
 +                    compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
 +
 +                    int checksum = (int) metadata.checksumType.of(compressedChunk);
 +
 +                    compressedChunk.limit(compressedChunk.capacity());
 +                    if (compressedChunk.getInt() != checksum)
 +                        throw new CorruptBlockException(channel.filePath(), chunk);
 +                }
 +            }
 +            catch (CorruptBlockException e)
 +            {
 +                throw new CorruptSSTableException(e, channel.filePath());
 +            }
 +
 +        }
 +
 +        public void close()
 +        {
 +            regions.closeQuietly();
 +            super.close();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e74f014/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 81abefa,e3d698e..8a32d7a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -164,13 -136,17 +164,14 @@@ public class CompressedInputStream exte
          totalCompressedBytesRead += compressed.length;
  
          // validate crc randomly
-         if (this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble())
+         if (this.crcCheckChanceSupplier.get() >= 1d ||
+             this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble())
          {
 -            checksum.update(compressed, 0, compressed.length - checksumBytes.length);
 +            int checksum = (int) checksumType.of(compressed, 0, compressed.length - checksumBytes.length);
  
              System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes,
0, checksumBytes.length);
 -            if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue())
 +            if (Ints.fromByteArray(checksumBytes) != checksum)
                  throw new IOException("CRC unmatched");
 -
 -            // reset checksum object back to the original (blank) state
 -            checksum.reset();
          }
  
          // buffer offset is always aligned


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message