Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 087B7200B80 for ; Wed, 10 Aug 2016 00:04:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 07331160AB0; Tue, 9 Aug 2016 22:04:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A2545160AB7 for ; Wed, 10 Aug 2016 00:04:25 +0200 (CEST) Received: (qmail 52872 invoked by uid 500); 9 Aug 2016 22:04:24 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 52448 invoked by uid 99); 9 Aug 2016 22:04:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 22:04:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EBFC1EE695; Tue, 9 Aug 2016 22:04:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Tue, 09 Aug 2016 22:04:27 -0000 Message-Id: <408a6b493bb7475c98521077ad628504@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0 archived-at: Tue, 09 Aug 2016 22:04:27 -0000 Merge branch 'cassandra-2.2' into cassandra-3.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62ef8617 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62ef8617 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62ef8617 Branch: refs/heads/cassandra-3.9 Commit: 62ef8617cdaa07fa37b1b2121ad5923da64e74a3 Parents: 676b6a8 76e3100 Author: Yuki Morishita Authored: Tue Aug 9 16:45:52 2016 -0500 Committer: Yuki Morishita Committed: Tue Aug 9 16:45:52 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/Config.java | 4 ++ .../cassandra/config/DatabaseDescriptor.java | 5 -- .../cassandra/streaming/StreamReader.java | 26 +--------- .../cassandra/streaming/StreamSession.java | 36 +------------- .../compress/CompressedInputStream.java | 21 +++++++- .../compress/CompressedStreamReader.java | 11 ++--- .../streaming/messages/IncomingFileMessage.java | 22 ++------- .../streaming/messages/RetryMessage.java | 4 ++ .../org/apache/cassandra/utils/Throwables.java | 14 ++++++ .../compression/CompressedInputStreamTest.java | 52 +++++++++++++++++--- 11 files changed, 100 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 78bd32d,232203e..f613c5f --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,34 -1,6 +1,35 @@@ -2.2.8 +3.0.9 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828) + * NullPointerException during compaction on table with static columns (CASSANDRA-12336) + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823) + * Fix upgrade of super columns on thrift (CASSANDRA-12335) + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359) + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277) + * Exception when computing read-repair for range tombstones (CASSANDRA-12263) + * Lost counter writes in compact table and static columns (CASSANDRA-12219) + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247) + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980) + * Add option to override compaction space check (CASSANDRA-12180) + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114) + * Respond with v1/v2 protocol header when responding to driver that attempts + to connect with too low of a protocol version (CASSANDRA-11464) + * NullPointerExpception when reading/compacting table (CASSANDRA-11988) + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144) + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107) + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393) + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147) + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315) + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733) + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098) + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944) + * Fix column ordering of results with static columns for Thrift requests in + a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of + those static columns in query results (CASSANDRA-12123) + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090) + * Fix EOF exception when altering column type (CASSANDRA-11820) +Merged from 2.2: + * Fix hanging stream session (CASSANDRA-10992) - * Add byteman support for testing (CASSANDRA-12377) * Fix INSERT JSON, fromJson() support of smallint, tinyint types (CASSANDRA-12371) * Restore JVM metric export for metric reporters (CASSANDRA-12312) * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Config.java index e6c56cb,60daee6..86f1016 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -171,8 -170,11 +171,12 @@@ public class Confi public Integer concurrent_compactors; public volatile Integer compaction_throughput_mb_per_sec = 16; public volatile Integer compaction_large_partition_warning_threshold_mb = 100; + public Integer min_free_space_per_drive_in_mb = 50; + /** + * @deprecated retry support removed on CASSANDRA-10992 + */ + @Deprecated public Integer max_streaming_retries = 3; public volatile Integer stream_throughput_outbound_megabits_per_sec = 200; http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index f8db26b,c96ea22..4ca7937 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -31,24 -33,19 +31,25 @@@ import org.slf4j.LoggerFactory import com.ning.compress.lzf.LZFInputStream; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.RewindableDataInputStreamPlus; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.BytesReadTracker; +import org.apache.cassandra.io.util.TrackedInputStream; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; + import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; /** * StreamReader reads from stream and writes to SSTable. @@@ -124,63 -121,37 +125,40 @@@ public class StreamReade logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize); return writer; - } catch (Throwable e) + } + catch (Throwable e) { - if (key != null) + if (deserializer != null) logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName()); + session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName()); if (writer != null) { - try - { - writer.abort(); - } - catch (Throwable e2) - { - // add abort error to original and continue so we can drain unread stream - e.addSuppressed(e2); - } + writer.abort(e); } - drain(in, in.getBytesRead()); - if (e instanceof IOException) - throw (IOException) e; - else - throw Throwables.propagate(e); + throw Throwables.propagate(e); } + finally + { + if (deserializer != null) + deserializer.cleanup(); + } + } + + protected SerializationHeader getHeader(CFMetaData metadata) + { + return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader } - protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException + protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException { - Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize); + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format)); + desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); - return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel); + return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId)); } - protected void drain(InputStream dis, long bytesRead) throws IOException - { - long toSkip = totalSize() - bytesRead; - - // InputStream.skip can return -1 if dis is inaccessible. - long skipped = dis.skip(toSkip); - if (skipped == -1) - return; - - toSkip = toSkip - skipped; - while (toSkip > 0) - { - skipped = dis.skip(toSkip); - if (skipped == -1) - break; - toSkip = toSkip - skipped; - } - } - protected long totalSize() { long size = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 9719587,fa1022d..bc87c8f --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@@ -17,7 -17,8 +17,6 @@@ */ package org.apache.cassandra.streaming.compress; --import java.io.DataInputStream; - import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; @@@ -37,9 -40,12 +36,11 @@@ import org.apache.cassandra.streaming.P import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.FileMessageHeader; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.BytesReadTracker; +import org.apache.cassandra.io.util.TrackedInputStream; import org.apache.cassandra.utils.Pair; + import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; + /** * StreamReader that reads from streamed compressed SSTable */ @@@ -114,24 -119,25 +115,22 @@@ public class CompressedStreamReader ext } catch (Throwable e) { - if (key != null) + if (deserializer != null) logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName()); + session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName()); if (writer != null) { - try - { - writer.abort(); - } - catch (Throwable e2) - { - // add abort error to original and continue so we can drain unread stream - e.addSuppressed(e2); - } + writer.abort(e); } - drain(in, in.getBytesRead()); - if (e instanceof IOException) - throw (IOException) e; - else - throw Throwables.propagate(e); + if (extractIOExceptionCause(e).isPresent()) + throw e; + throw Throwables.propagate(e); } + finally + { + if (deserializer != null) + deserializer.cleanup(); + } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java index d881d43,2870c03..438cb0b --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@@ -20,10 -20,11 +20,12 @@@ package org.apache.cassandra.streaming. import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; ++import java.util.Optional; -import com.google.common.base.Optional; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; + -import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/src/java/org/apache/cassandra/utils/Throwables.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/Throwables.java index 8ef6a63,877f388..5ad9686 --- a/src/java/org/apache/cassandra/utils/Throwables.java +++ b/src/java/org/apache/cassandra/utils/Throwables.java @@@ -18,25 -18,14 +18,26 @@@ */ package org.apache.cassandra.utils; +import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; ++import java.util.Optional; +import java.util.stream.Stream; -import com.google.common.base.Optional; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.FSWriteError; -public class Throwables +public final class Throwables { + public enum FileOpType { READ, WRITE } - public static Throwable merge(Throwable existingFail, Throwable newFail) + public interface DiscreteAction + { + void perform() throws E; + } + + public static T merge(T existingFail, T newFail) { if (existingFail == null) return newFail; @@@ -167,4 -54,17 +168,17 @@@ } return accumulate; } + + public static Optional extractIOExceptionCause(Throwable t) + { + if (t instanceof IOException) + return Optional.of((IOException) t); + Throwable cause = t; + while ((cause = cause.getCause()) != null) + { + if (cause instanceof IOException) + return Optional.of((IOException) cause); + } - return Optional.absent(); ++ return Optional.empty(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ef8617/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index a3300ac,0000000..8512d8f mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@@ -1,137 -1,0 +1,173 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.compression; + +import java.io.*; +import java.util.*; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.streaming.compress.CompressedInputStream; +import org.apache.cassandra.streaming.compress.CompressionInfo; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.fail; + +/** + */ +public class CompressedInputStreamTest +{ + @Test + public void testCompressedRead() throws Exception + { - testCompressedReadWith(new long[]{0L}, false); - testCompressedReadWith(new long[]{1L}, false); - testCompressedReadWith(new long[]{100L}, false); ++ testCompressedReadWith(new long[]{0L}, false, false); ++ testCompressedReadWith(new long[]{1L}, false, false); ++ testCompressedReadWith(new long[]{100L}, false, false); + - testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false); ++ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, false); + } + + @Test(expected = EOFException.class) + public void testTruncatedRead() throws Exception + { - testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true); ++ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true, false); ++ } ++ ++ /** ++ * Test that CompressedInputStream does not block if there's an exception while reading stream ++ */ ++ @Test(timeout = 30000) ++ public void testException() throws Exception ++ { ++ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, true); + } + + /** + * @param valuesToCheck array of longs of range(0-999) + * @throws Exception + */ - private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception ++ private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate, boolean testException) throws Exception + { + assert valuesToCheck != null && valuesToCheck.length > 0; + + // write compressed data file of longs + File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db"); + Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath()); + MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); + CompressionParams param = CompressionParams.snappy(32); + Map index = new HashMap(); + try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector)) + { + for (long l = 0L; l < 1000; l++) + { + index.put(l, writer.position()); + writer.writeLong(l); + } + writer.finish(); + } + + CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath()); + List> sections = new ArrayList<>(); + for (long l : valuesToCheck) + { + long position = index.get(l); + sections.add(Pair.create(position, position + 8)); + } + CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections); + long totalSize = comp.getTotalSizeForSections(sections); + long expectedSize = 0; + for (CompressionMetadata.Chunk c : chunks) + expectedSize += c.length + 4; + assertEquals(expectedSize, totalSize); + + // buffer up only relevant parts of file + int size = 0; + for (CompressionMetadata.Chunk c : chunks) + size += (c.length + 4); // 4bytes CRC + byte[] toRead = new byte[size]; + + try (RandomAccessFile f = new RandomAccessFile(tmp, "r")) + { + int pos = 0; + for (CompressionMetadata.Chunk c : chunks) + { + f.seek(c.offset); + pos += f.read(toRead, pos, c.length + 4); + } + } + + if (testTruncate) + { + byte [] actuallyRead = new byte[50]; + System.arraycopy(toRead, 0, actuallyRead, 0, 50); + toRead = actuallyRead; + } + + // read buffer using CompressedInputStream + CompressionInfo info = new CompressionInfo(chunks, param); - CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, - ChecksumType.CRC32, () -> 1.0); ++ ++ if (testException) ++ { ++ testException(sections, info); ++ return; ++ } ++ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, ChecksumType.CRC32, () -> 1.0); + + try (DataInputStream in = new DataInputStream(input)) + { + for (int i = 0; i < sections.size(); i++) + { + input.position(sections.get(i).left); + long readValue = in.readLong(); + assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue); + } + } + } ++ ++ private static void testException(List> sections, CompressionInfo info) throws IOException ++ { ++ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(new byte[0]), info, ChecksumType.CRC32, () -> 1.0); ++ ++ try (DataInputStream in = new DataInputStream(input)) ++ { ++ for (int i = 0; i < sections.size(); i++) ++ { ++ input.position(sections.get(i).left); ++ try { ++ in.readLong(); ++ fail("Should have thrown IOException"); ++ } ++ catch (IOException e) ++ { ++ continue; ++ } ++ } ++ } ++ } +}