cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [2/3] git commit: Merge branch 'cassandra-1.1' into cassandra-1.2.0
Date Wed, 19 Dec 2012 13:09:37 GMT
Merge branch 'cassandra-1.1' into cassandra-1.2.0

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/io/compress/CompressionParameters.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/620a4b7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/620a4b7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/620a4b7a

Branch: refs/heads/cassandra-1.2
Commit: 620a4b7a4e86e1f31f9823e12a8d5473980c7f03
Parents: 08cc800 1a66ee9
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Wed Dec 19 16:07:39 2012 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Dec 19 16:07:39 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   14 ++++
 .../cassandra/db/ColumnFamilyStoreMBean.java       |    5 ++
 .../io/compress/CompressedRandomAccessReader.java  |    2 +-
 .../io/compress/CompressionParameters.java         |   54 ++++++++++++--
 .../streaming/compress/CompressedInputStream.java  |    2 +-
 6 files changed, 69 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7a17236,38d9d47..6edf7bb
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,142 -1,29 +1,143 @@@
 -1.1.9
 +1.2.0-rc2
 + * fix nodetool ownership display with vnodes (CASSANDRA-5065)
 + * cqlsh: add DESCRIBE KEYSPACES command (CASSANDRA-5060)
 + * Fix potential infinite loop when reloading CFS (CASSANDRA-5064)
 + * Fix SimpleAuthorizer example (CASSANDRA-5072)
 + * cqlsh: force CL.ONE for tracing and system.schema* queries (CASSANDRA-5070)
 + * Includes cassandra-shuffle in the debian package (CASSANDRA-5058)
 +Merged from 1.1:
   * fix multithreaded compaction deadlock (CASSANDRA-4492)
 + * fix temporarily missing schema after upgrade from pre-1.1.5 (CASSANDRA-5061)
 + * Fix ALTER TABLE overriding compression options with defaults
-    (CASSANDRA-4996, CASSANDRA-5066)
++   (CASSANDRA-4996, 5066)
+  * fix specifying and altering crc_check_chance (CASSANDRA-5053)
  
  
 -1.1.8
 - * reset getRangeSlice filter after finishing a row for get_paged_slice
 -   (CASSANDRA-4919)
 - * fix temporarily missing schema after upgrade from pre-1.1.5 (CASSANDRA-5061)
 +1.2-rc1
 + * rename rpc_timeout settings to request_timeout (CASSANDRA-5027)
 + * add BF with 0.1 FP to LCS by default (CASSANDRA-5029)
 + * Fix preparing insert queries (CASSANDRA-5016)
 + * Fix preparing queries with counter increment (CASSANDRA-5022)
 + * Fix preparing updates with collections (CASSANDRA-5017)
 + * Don't generate UUID based on other node address (CASSANDRA-5002)
 + * Fix message when trying to alter a clustering key type (CASSANDRA-5012)
 + * Update IAuthenticator to match the new IAuthorizer (CASSANDRA-5003)
 + * Fix inserting only a key in CQL3 (CASSANDRA-5040)
 + * Fix CQL3 token() function when used with strings (CASSANDRA-5050)
 +Merged from 1.1:
   * reduce log spam from invalid counter shards (CASSANDRA-5026)
   * Improve schema propagation performance (CASSANDRA-5025)
 - * Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803)
 - * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
 + * Fix for IndexHelper.IndexFor throws OOB Exception (CASSANDRA-5030)
 + * cqlsh: make it possible to describe thrift CFs (CASSANDRA-4827)
   * cqlsh: fix timestamp formatting on some platforms (CASSANDRA-5046)
 - * Fix ALTER TABLE overriding compression options with defaults (CASSANDRA-4996, 5066)
 - * Avoid error opening data file on startup (CASSANDRA-4984)
 - * Fix wrong index_options in cli 'show schema' (CASSANDRA-5008)
 - * Allow overriding number of available processor (CASSANDRA-4790)
  
  
 -1.1.7
 - * cqlsh: improve COPY FROM performance (CASSANDRA-4921)
 +1.2-beta3
 + * make consistency level configurable in cqlsh (CASSANDRA-4829)
 + * fix cqlsh rendering of blob fields (CASSANDRA-4970)
 + * fix cqlsh DESCRIBE command (CASSANDRA-4913)
 + * save truncation position in system table (CASSANDRA-4906)
 + * Move CompressionMetadata off-heap (CASSANDRA-4937)
 + * allow CLI to GET cql3 columnfamily data (CASSANDRA-4924)
 + * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402)
 + * acquire references to overlapping sstables during compaction so bloom filter
 +   doesn't get free'd prematurely (CASSANDRA-4934)
 + * Don't share slice query filter in CQL3 SelectStatement (CASSANDRA-4928)
 + * Separate tracing from Log4J (CASSANDRA-4861)
 + * Exclude gcable tombstones from merkle-tree computation (CASSANDRA-4905)
 + * Better printing of AbstractBounds for tracing (CASSANDRA-4931)
 + * Optimize mostRecentTombstone check in CC.collectAllData (CASSANDRA-4883)
 + * Change stream session ID to UUID to avoid collision from same node (CASSANDRA-4813)
 + * Use Stats.db when bulk loading if present (CASSANDRA-4957)
 + * Skip repair on system_trace and keyspaces with RF=1 (CASSANDRA-4956)
 + * (cql3) Remove arbitrary SELECT limit (CASSANDRA-4918)
 + * Correctly handle prepared operation on collections (CASSANDRA-4945)
 + * Fix CQL3 LIMIT (CASSANDRA-4877)
 + * Fix Stress for CQL3 (CASSANDRA-4979)
 + * Remove cassandra specific exceptions from JMX interface (CASSANDRA-4893)
 + * (CQL3) Force using ALLOW FILTERING on potentially inefficient queries (CASSANDRA-4915)
 + * (cql3) Fix adding column when the table has collections (CASSANDRA-4982)
 + * (cql3) Fix allowing collections with compact storage (CASSANDRA-4990)
 + * (cql3) Refuse ttl/writetime function on collections (CASSANDRA-4992)
 + * Replace IAuthority with new IAuthorizer (CASSANDRA-4874)
 + * clqsh: fix KEY pseudocolumn escaping when describing Thrift tables
 +   in CQL3 mode (CASSANDRA-4955)
   * add basic authentication support for Pig CassandraStorage (CASSANDRA-3042)
   * fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965)
 +Merged from 1.1:
 + * Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803)
 + * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
 + * Fix cqlsh timestamp formatting of timezone info (CASSANDRA-4746)
 + * Fix assertion failure with leveled compaction (CASSANDRA-4799)
 + * Check for null end_token in get_range_slice (CASSANDRA-4804)
 + * Remove all remnants of removed nodes (CASSANDRA-4840)
 + * Add aut-reloading of the log4j file in debian package (CASSANDRA-4855)
 + * Fix estimated row cache entry size (CASSANDRA-4860)
 + * reset getRangeSlice filter after finishing a row for get_paged_slice
 +   (CASSANDRA-4919)
   * expunge row cache post-truncate (CASSANDRA-4940)
 - * remove IAuthority2 (CASSANDRA-4875)
 + * Allow static CF definition with compact storage (CASSANDRA-4910)
 + * Fix endless loop/compaction of schema_* CFs due to broken timestamps (CASSANDRA-4880)
 + * Fix 'wrong class type' assertion in CounterColumn (CASSANDRA-4976)
 +
 +
 +1.2-beta2
 + * fp rate of 1.0 disables BF entirely; LCS defaults to 1.0 (CASSANDRA-4876)
 + * off-heap bloom filters for row keys (CASSANDRA_4865)
 + * add extension point for sstable components (CASSANDRA-4049)
 + * improve tracing output (CASSANDRA-4852, 4862)
 + * make TRACE verb droppable (CASSANDRA-4672)
 + * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755)
 + * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793)
 + * Make hint delivery asynchronous (CASSANDRA-4761)
 + * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610)
 + * cassandra-cli: allow Double value type to be inserted to a column (CASSANDRA-4661)
 + * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
 + * optimize batchlog flushing to skip successful batches (CASSANDRA-4667)
 + * include metadata for system keyspace itself in schema tables (CASSANDRA-4416)
 + * add check to PropertyFileSnitch to verify presence of location for
 +   local node (CASSANDRA-4728)
 + * add PBSPredictor consistency modeler (CASSANDRA-4261)
 + * remove vestiges of Thrift unframed mode (CASSANDRA-4729)
 + * optimize single-row PK lookups (CASSANDRA-4710)
 + * adjust blockFor calculation to account for pending ranges due to node 
 +   movement (CASSANDRA-833)
 + * Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649)
 + * (CQL3) Make prepared statement global instead of per connection 
 +   (CASSANDRA-4449)
 + * Fix scrubbing of CQL3 created tables (CASSANDRA-4685)
 + * (CQL3) Fix validation when using counter and regular columns in the same 
 +   table (CASSANDRA-4706)
 + * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
 + * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738)
 + * Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
 + * Support repairing only the local DC nodes (CASSANDRA-4747)
 + * Use rpc_address for binary protocol and change default port (CASSANRA-4751)
 + * Fix use of collections in prepared statements (CASSANDRA-4739)
 + * Store more information into peers table (CASSANDRA-4351, 4814)
 + * Configurable bucket size for size tiered compaction (CASSANDRA-4704)
 + * Run leveled compaction in parallel (CASSANDRA-4310)
 + * Fix potential NPE during CFS reload (CASSANDRA-4786)
 + * Composite indexes may miss results (CASSANDRA-4796)
 + * Move consistency level to the protocol level (CASSANDRA-4734, 4824)
 + * Fix Subcolumn slice ends not respected (CASSANDRA-4826)
 + * Fix Assertion error in cql3 select (CASSANDRA-4783)
 + * Fix list prepend logic (CQL3) (CASSANDRA-4835)
 + * Add booleans as literals in CQL3 (CASSANDRA-4776)
 + * Allow renaming PK columns in CQL3 (CASSANDRA-4822)
 + * Fix binary protocol NEW_NODE event (CASSANDRA-4679)
 + * Fix potential infinite loop in tombstone compaction (CASSANDRA-4781)
 + * Remove system tables accounting from schema (CASSANDRA-4850)
 + * (cql3) Force provided columns in clustering key order in 
 +   'CLUSTERING ORDER BY' (CASSANDRA-4881)
 + * Fix composite index bug (CASSANDRA-4884)
 + * Fix short read protection for CQL3 (CASSANDRA-4882)
 + * Add tracing support to the binary protocol (CASSANDRA-4699)
 + * (cql3) Don't allow prepared marker inside collections (CASSANDRA-4890)
 + * Re-allow order by on non-selected columns (CASSANDRA-4645)
 + * Bug when composite index is created in a table having collections (CASSANDRA-4909)
 + * log index scan subject in CompositesSearcher (CASSANDRA-4904)
 +Merged from 1.1:
   * add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
   * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
   * fix indexing empty column values (CASSANDRA-4832)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 364565f,8284d38..ef3e14a
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -204,18 -196,18 +204,32 @@@ public class ColumnFamilyStore implemen
          return metadata.compressionParameters().asThriftOptions();
      }
  
 -    public void setCompressionParameters(Map<String,String> opts) throws ConfigurationException
 +    public void setCompressionParameters(Map<String,String> opts)
      {
 -        metadata.compressionParameters = CompressionParameters.create(opts);
 +        try
 +        {
 +            metadata.compressionParameters = CompressionParameters.create(opts);
 +        }
 +        catch (ConfigurationException e)
 +        {
 +            throw new IllegalArgumentException(e.getMessage());
 +        }
      }
  
 -    public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException
++    public void setCrcCheckChance(double crcCheckChance)
+     {
 -        for (SSTableReader sstable : table.getAllSSTables())
 -            if (sstable.compression)
 -                sstable.getCompressionMetadata().parameters.setCrcCheckChance(crcCheckChance);
++        try
++        {
++            for (SSTableReader sstable : table.getAllSSTables())
++                if (sstable.compression)
++                    sstable.getCompressionMetadata().parameters.setCrcCheckChance(crcCheckChance);
++        }
++        catch (ConfigurationException e)
++        {
++            throw new IllegalArgumentException(e.getMessage());
++        }
+     }
+ 
      private ColumnFamilyStore(Table table,
                                String columnFamilyName,
                                IPartitioner partitioner,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 3388b9b,26da8be..0a6b077
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@@ -274,9 -212,14 +274,14 @@@ public interface ColumnFamilyStoreMBea
       * Set the compression parameters
       * @param opts map of string names to values
       */
 -    public void setCompressionParameters(Map<String,String> opts) throws ConfigurationException;
 +    public void setCompressionParameters(Map<String,String> opts);
  
      /**
+      * Set new crc check chance
+      */
 -    public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException;
++    public void setCrcCheckChance(double crcCheckChance);
+ 
+     /**
       * Disable automatic compaction.
       */
      public void disableAutoCompaction();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index da35e92,3d3b95b..92812a6
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@@ -112,18 -86,11 +112,18 @@@ public class CompressedRandomAccessRead
              compressed = new byte[chunk.length];
  
          if (source.read(compressed, 0, chunk.length) != chunk.length)
 -            throw new IOException(String.format("(%s) failed to read %d bytes from offset
%d.", getPath(), chunk.length, chunk.offset));
 +            throw new CorruptBlockException(getPath(), chunk);
  
 -        validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length,
buffer, 0);
 +        try
 +        {
 +            validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length,
buffer, 0);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new CorruptBlockException(getPath(), chunk);
 +        }
  
-         if (metadata.parameters.crcChance > FBUtilities.threadLocalRandom().nextDouble())
+         if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
          {
              checksum.update(buffer, 0, validBufferBytes);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionParameters.java
index 37cdcc4,05cc707..6448d84
--- a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
@@@ -23,16 -21,19 +23,19 @@@ import java.io.IOException
  import java.lang.reflect.InvocationTargetException;
  import java.lang.reflect.Method;
  import java.util.Collections;
++import java.util.AbstractSet;
  import java.util.HashMap;
  import java.util.Map;
 -import java.util.AbstractSet;
  import java.util.Set;
  
+ import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Sets;
  import org.apache.commons.lang.builder.EqualsBuilder;
  import org.apache.commons.lang.builder.HashCodeBuilder;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.config.ConfigurationException;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.io.IVersionedSerializer;
  
  public class CompressionParameters
  {
@@@ -76,9 -84,41 +85,39 @@@
          this.sstableCompressor = sstableCompressor;
          this.chunkLength = chunkLength;
          this.otherOptions = otherOptions;
 -        this.crcCheckChance = otherOptions.get(CRC_CHECK_CHANCE) == null
 -                              ? DEFAULT_CRC_CHECK_CHANCE
 -                              : parseCrcCheckChance(otherOptions.get(CRC_CHECK_CHANCE));
 +        String chance = otherOptions.get(CRC_CHECK_CHANCE);
-         otherOptions.remove(CRC_CHECK_CHANCE);
-         this.crcChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : Double.parseDouble(chance);
++        this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : parseCrcCheckChance(chance);
+     }
+ 
+     public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException
+     {
+         validateCrcCheckChance(crcCheckChance);
 -        logger.debug("Setting crcCheckChance to {}", crcCheckChance);
+         this.crcCheckChance = crcCheckChance;
+     }
+ 
+     public double getCrcCheckChance()
+     {
+         return this.crcCheckChance;
+     }
+ 
+     private static double parseCrcCheckChance(String crcCheckChance) throws ConfigurationException
+     {
+         try
+         {
+             double chance = Double.parseDouble(crcCheckChance);
+             validateCrcCheckChance(chance);
+             return chance;
+         }
+         catch (NumberFormatException e)
+         {
+             throw new ConfigurationException("crc_check_chance should be a double");
+         }
+     }
+ 
+     private static void validateCrcCheckChance(double crcCheckChance) throws ConfigurationException
+     {
+         if (crcCheckChance < 0.0d || crcCheckChance > 1.0d)
+             throw new ConfigurationException("crc_check_chance should be between 0.0 and
1.0");
      }
  
      public int chunkLength()
@@@ -116,7 -156,7 +155,7 @@@
              Method method = compressorClass.getMethod("create", Map.class);
              ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions);
              // Check for unknown options
-             Set<String> supportedOpts = compressor.supportedOptions();
 -            AbstractSet<String> supportedOpts =  Sets.union(compressor.supportedOptions(),
GLOBAL_OPTIONS);
++            AbstractSet<String> supportedOpts = Sets.union(compressor.supportedOptions(),
GLOBAL_OPTIONS);
              for (String provided : compressionOptions.keySet())
                  if (!supportedOpts.contains(provided))
                      throw new ConfigurationException("Unknown compression options " + provided);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 8eadee8,0000000..c226cb6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -1,156 -1,0 +1,156 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compress;
 +
 +import java.io.*;
 +import java.util.Iterator;
 +import java.util.concurrent.*;
 +import java.util.zip.CRC32;
 +import java.util.zip.Checksum;
 +
 +import com.google.common.collect.Iterators;
 +import com.google.common.primitives.Ints;
 +
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +
 +/**
 + * InputStream which reads data from underlining source with given {@link CompressionInfo}.
 + */
 +public class CompressedInputStream extends InputStream
 +{
 +    private final CompressionInfo info;
 +    // chunk buffer
 +    private final BlockingQueue<byte[]> dataBuffer;
 +
 +    // uncompressed bytes
 +    private byte[] buffer;
 +
 +    // offset from the beginning of the buffer
 +    protected long bufferOffset = 0;
 +    // current position in stream
 +    private long current = 0;
 +    // number of bytes in the buffer that are actually valid
 +    protected int validBufferBytes = -1;
 +
 +    private final Checksum checksum = new CRC32();
 +
 +    // raw checksum bytes
 +    private final byte[] checksumBytes = new byte[4];
 +
 +    private long uncompressedBytes;
 +
 +    /**
 +     * @param source Input source to read compressed data from
 +     * @param info Compression info
 +     */
 +    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    {
 +        this.info = info;
 +        this.buffer = new byte[info.parameters.chunkLength()];
 +        // buffer is limited to store up to 1024 chunks
 +        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length,
1024));
 +
 +        new Thread(new Reader(source, info, dataBuffer)).start();
 +    }
 +
 +    public int read() throws IOException
 +    {
 +        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
 +        {
 +            try
 +            {
 +                decompress(dataBuffer.take());
 +            }
 +            catch (InterruptedException e)
 +            {
 +                throw new EOFException("No chunk available");
 +            }
 +        }
 +
 +        assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
 +
 +        return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
 +    }
 +
 +    public void position(long position)
 +    {
 +        assert position >= current : "stream can only read forward.";
 +        current = position;
 +    }
 +
 +    private void decompress(byte[] compressed) throws IOException
 +    {
 +        // uncompress
 +        validBufferBytes = info.parameters.sstableCompressor.uncompress(compressed, 0, compressed.length
- checksumBytes.length, buffer, 0);
 +        uncompressedBytes += validBufferBytes;
 +
 +        // validate crc randomly
-         if (info.parameters.crcChance > FBUtilities.threadLocalRandom().nextDouble())
++        if (info.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
 +        {
 +            checksum.update(buffer, 0, validBufferBytes);
 +
 +            System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes,
0, checksumBytes.length);
 +            if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue())
 +                throw new IOException("CRC unmatched");
 +
 +            // reset checksum object back to the original (blank) state
 +            checksum.reset();
 +        }
 +
 +        // buffer offset is always aligned
 +        bufferOffset = current & ~(buffer.length - 1);
 +    }
 +
 +    public long uncompressedBytes()
 +    {
 +        return uncompressedBytes;
 +    }
 +
 +    static class Reader extends WrappedRunnable
 +    {
 +        private final InputStream source;
 +        private final Iterator<CompressionMetadata.Chunk> chunks;
 +        private final BlockingQueue<byte[]> dataBuffer;
 +
 +        Reader(InputStream source, CompressionInfo info, BlockingQueue<byte[]> dataBuffer)
 +        {
 +            this.source = source;
 +            this.chunks = Iterators.forArray(info.chunks);
 +            this.dataBuffer = dataBuffer;
 +        }
 +
 +        protected void runMayThrow() throws Exception
 +        {
 +            byte[] compressedWithCRC;
 +            while (chunks.hasNext())
 +            {
 +                CompressionMetadata.Chunk chunk = chunks.next();
 +
 +                int readLength = chunk.length + 4; // read with CRC
 +                compressedWithCRC = new byte[readLength];
 +
 +                int bufferRead = 0;
 +                while (bufferRead < readLength)
 +                    bufferRead += source.read(compressedWithCRC, bufferRead, readLength
- bufferRead);
 +                dataBuffer.put(compressedWithCRC);
 +            }
 +        }
 +    }
 +}


Mime
View raw message