cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [01/15] cassandra git commit: Revert CASSANDRA-10012 and add more loggings
Date Fri, 15 Jan 2016 18:22:55 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 abe0c6779 -> 58a0079c3
  refs/heads/cassandra-2.2 e2050c971 -> 1b5935736
  refs/heads/cassandra-3.0 a02997696 -> 6cd3bef61
  refs/heads/cassandra-3.3 dba2e16c9 -> b7356dd8e
  refs/heads/trunk 66d3428e3 -> b1d063dc2


Revert CASSANDRA-10012 and add more loggings

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10961


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58a0079c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58a0079c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58a0079c

Branch: refs/heads/cassandra-2.1
Commit: 58a0079c391d12dab97e036f05be070dfaddcc95
Parents: abe0c67
Author: Paulo Motta <pauloricardomg@gmail.com>
Authored: Fri Jan 15 12:04:32 2016 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Fri Jan 15 12:09:56 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/composites/AbstractCType.java  |  3 +-
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 26 ++++++++---
 .../cassandra/streaming/StreamWriter.java       |  9 ++++
 .../compress/CompressedInputStream.java         | 45 +++++++------------
 .../compress/CompressedStreamReader.java        | 31 +++++++++----
 .../compress/CompressedStreamWriter.java        | 12 +++++
 .../compress/CompressedInputStreamTest.java     | 46 --------------------
 9 files changed, 83 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b87ed0..3d84a30 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.13
+ * Revert CASSANDRA-10012 and add more logging (CASSANDRA-10961)
  * Allow simultaneous bootstrapping with strict consistency when no vnodes are used (CASSANDRA-11005)
  * Log a message when major compaction does not result in a single file (CASSANDRA-10847)
  * (cqlsh) fix cqlsh_copy_tests when vnodes are disabled (CASSANDRA-10997)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
index 5af7458..fecc847 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
@@ -375,7 +375,8 @@ public abstract class AbstractCType implements CType
     protected static void checkRemaining(ByteBuffer bb, int offs, int length)
     {
         if (offs + length > bb.limit())
-            throw new IllegalArgumentException("Not enough bytes");
+            throw new IllegalArgumentException(String.format("Not enough bytes. Offset: %d.
Length: %d. Buffer size: %d",
+                                                             offs, length, bb.limit()));
     }
 
     private static class Serializer implements CType.Serializer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index aa3504a..ac267f9 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -248,11 +248,11 @@ public class ConnectionHandler
                 {
                     // receive message
                     StreamMessage message = StreamMessage.deserialize(in, protocolVersion,
session);
+                    logger.debug("[Stream #{}] Received {}", session.planId(), message);
                     // Might be null if there is an error during streaming (see FileMessage.deserialize).
It's ok
                     // to ignore here since we'll have asked for a retry.
                     if (message != null)
                     {
-                        logger.debug("[Stream #{}] Received {}", session.planId(), message);
                         session.messageReceived(message);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 18013fe..1e3ba7f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -58,6 +58,7 @@ public class StreamReader
     protected final StreamSession session;
     protected final Descriptor.Version inputVersion;
     protected final long repairedAt;
+    protected final int fileSeqNum;
 
     protected Descriptor desc;
 
@@ -69,6 +70,7 @@ public class StreamReader
         this.sections = header.sections;
         this.inputVersion = new Descriptor.Version(header.version);
         this.repairedAt = header.repairedAt;
+        this.fileSeqNum = header.sequenceNumber;
     }
 
     /**
@@ -78,33 +80,46 @@ public class StreamReader
      */
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size
= {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt);
             while (in.getBytesRead() < totalSize)
             {
-                writeRow(writer, in, cfs);
+                key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                writeRow(key, writer, in, cfs);
                 // TODO move this to BytesReadTracker
                 session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {},
totalSize = {}",
+                         session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
             return writer;
         }
         catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on
ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -162,9 +177,8 @@ public class StreamReader
         return size;
     }
 
-    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws
IOException
+    protected void writeRow(DecoratedKey key, SSTableWriter writer, DataInput in, ColumnFamilyStore
cfs) throws IOException
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
         writer.appendFromStream(key, cfs.metadata, in, inputVersion);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 43bc26a..2579414 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -24,6 +24,9 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.ning.compress.lzf.LZFOutputStream;
 
 import org.apache.cassandra.io.sstable.Component;
@@ -42,6 +45,8 @@ public class StreamWriter
 {
     private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class);
+
     protected final SSTableReader sstable;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamRateLimiter limiter;
@@ -71,6 +76,8 @@ public class StreamWriter
     public void write(WritableByteChannel channel) throws IOException
     {
         long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize
= {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt,
totalSize);
         RandomAccessReader file = sstable.openDataReader();
         ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
                                     ? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
@@ -109,6 +116,8 @@ public class StreamWriter
                 // make sure that current section is send
                 compressedOutput.flush();
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred
= {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress,
totalSize);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index b4a3065..6280ccd 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -31,6 +31,9 @@ import java.util.zip.Checksum;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -40,12 +43,15 @@ import org.apache.cassandra.utils.WrappedRunnable;
  */
 public class CompressedInputStream extends InputStream
 {
+
+    private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
+
     private final CompressionInfo info;
     // chunk buffer
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private final byte[] buffer;
+    private byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -64,8 +70,6 @@ public class CompressedInputStream extends InputStream
     private long totalCompressedBytesRead;
     private final boolean hasPostCompressionAdlerChecksums;
 
-    private Thread readerThread;
-
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -77,10 +81,9 @@ public class CompressedInputStream extends InputStream
         this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length,
1024));
 
-        readerThread = new Thread(new Reader(source, info, dataBuffer));
-        readerThread.start();
+        new Thread(new Reader(source, info, dataBuffer)).start();
     }
 
     public int read() throws IOException
@@ -146,7 +149,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    class Reader extends WrappedRunnable
+    static class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -162,7 +165,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
+            while (chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -172,43 +175,25 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r;
                     try
                     {
-                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
                         if (r < 0)
                         {
                             dataBuffer.put(POISON_PILL);
                             return; // throw exception where we consume dataBuffer
                         }
+                        bufferRead += r;
                     }
                     catch (IOException e)
                     {
+                        logger.warn("Error while reading compressed input stream.", e);
                         dataBuffer.put(POISON_PILL);
-                        throw e;
+                        return; // throw exception where we consume dataBuffer
                     }
-                    bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
-            synchronized(CompressedInputStream.this)
-            {
-                readerThread = null;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        synchronized(this)
-        {
-            if (readerThread != null)
-            {
-                readerThread.interrupt();
-                readerThread = null;
-            }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4f60773..fd0d9c8 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -24,6 +24,7 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,10 +33,12 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
@@ -61,40 +64,56 @@ public class CompressedStreamReader extends StreamReader
     @Override
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size
= {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel),
compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt);
+            int sectionIdx = 0;
             for (Pair<Long, Long> section : sections)
             {
                 long length = section.right - section.left;
                 // skip to beginning of section inside chunk
                 cis.position(section.left);
                 in.reset(0);
+                logger.trace("[Stream #{}] Reading section {} with length {} from stream.",
session.planId(), sectionIdx++, length);
                 while (in.getBytesRead() < length)
                 {
-                    writeRow(writer, in, cfs);
+                    key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                    writeRow(key, writer, in, cfs);
+
                     // when compressed, report total bytes of compressed chunks read since
remoteFile.size is the sum of chunks transferred
                     session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(),
totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {},
totalSize = {}", session.planId(), fileSeqNum,
+                         session.peer, cis.getTotalCompressedBytesRead(), totalSize);
             return writer;
         }
         catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on
ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -113,10 +132,6 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
-        finally
-        {
-            cis.close();
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 001c927..6fe08e6 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -24,6 +24,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -40,6 +43,8 @@ public class CompressedStreamWriter extends StreamWriter
 {
     public static final int CHUNK_SIZE = 10 * 1024 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class);
+
     private final CompressionInfo compressionInfo;
 
     public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>>
sections, CompressionInfo compressionInfo, StreamSession session)
@@ -52,12 +57,15 @@ public class CompressedStreamWriter extends StreamWriter
     public void write(WritableByteChannel channel) throws IOException
     {
         long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize
= {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt,
totalSize);
         RandomAccessReader file = sstable.openDataReader();
         FileChannel fc = file.getChannel();
 
         long progress = 0L;
         // calculate chunks to transfer. we want to send continuous chunks altogether.
         List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
+        int sectionIdx = 0;
         try
         {
             // stream each of the required sections of the file
@@ -65,6 +73,8 @@ public class CompressedStreamWriter extends StreamWriter
             {
                 // length of the section to stream
                 long length = section.right - section.left;
+                logger.trace("[Stream #{}] Writing section {} with length {} to stream.",
session.planId(), sectionIdx++, length);
+
                 // tracks write progress
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
@@ -77,6 +87,8 @@ public class CompressedStreamWriter extends StreamWriter
                     session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress,
totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred
= {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress,
totalSize);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 87e0003..c70b932 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -58,52 +58,6 @@ public class CompressedInputStreamTest
     }
 
     /**
-     * Test CompressedInputStream not hang when closed while reading
-     * @throws Exception
-     */
-    @Test(expected = EOFException.class)
-    public void testClose() throws Exception
-    {
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance,
32, Collections.<String, String>emptyMap());
-        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
-        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-        InputStream blockingInput = new InputStream()
-        {
-            @Override
-            public int read() throws IOException
-            {
-                try
-                {
-                    // 10 second cut off not to stop other test in case
-                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException("Interrupted as expected", e);
-                }
-            }
-        };
-        CompressionInfo info = new CompressionInfo(chunks, param);
-        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
-        {
-            new Thread(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        cis.close();
-                    }
-                    catch (Exception ignore) {}
-                }
-            }).start();
-            // block here
-            cis.read();
-        }
-    }
-
-    /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
      */


Mime
View raw message