Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BC86E18877 for ; Mon, 23 Nov 2015 20:00:40 +0000 (UTC) Received: (qmail 17350 invoked by uid 500); 23 Nov 2015 20:00:40 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 17183 invoked by uid 500); 23 Nov 2015 20:00:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 17111 invoked by uid 99); 23 Nov 2015 20:00:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Nov 2015 20:00:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E4E6FE08DB; Mon, 23 Nov 2015 20:00:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Mon, 23 Nov 2015 20:00:39 -0000 Message-Id: <0f9111065ec1438f9ba2460c940ea625@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/15] cassandra git commit: Fix CompressedInputStream for proper cleanup Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 4a94f75b0 -> 8b9a9161c refs/heads/cassandra-2.2 2aa834265 -> 056055feb refs/heads/cassandra-3.0 9fe790d75 -> 0b3cfae4e refs/heads/cassandra-3.1 e0c945228 -> e8737fda3 refs/heads/trunk 440366edd -> fa4c17383 Fix CompressedInputStream for proper cleanup patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161 Branch: refs/heads/cassandra-2.1 Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42 Parents: 4a94f75 Author: Chris Moos Authored: Mon Nov 23 12:31:24 2015 -0600 Committer: Yuki Morishita Committed: Mon Nov 23 12:33:09 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compress/CompressedInputStream.java | 46 +++++++-- .../compress/CompressedStreamReader.java | 4 + .../compress/CompressedInputStreamTest.java | 98 +++++++++++++++----- 4 files changed, 117 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 86e5cb2..c4dd54e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012) * (cqlsh) Support counters in COPY commands (CASSANDRA-9043) * Try next replica if not possible to connect to primary replica on ColumnFamilyRecordReader (CASSANDRA-2388) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 449546f..b4a3065 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream private final BlockingQueue dataBuffer; // uncompressed bytes - private byte[] buffer; + private final byte[] buffer; // offset from the beginning of the buffer protected long bufferOffset = 0; @@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream private long totalCompressedBytesRead; private final boolean hasPostCompressionAdlerChecksums; + private Thread readerThread; + /** * @param source Input source to read compressed data from * @param info Compression info @@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums; this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks - this.dataBuffer = new ArrayBlockingQueue(Math.min(info.chunks.length, 1024)); + this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); - new Thread(new Reader(source, info, dataBuffer)).start(); + readerThread = new Thread(new Reader(source, info, dataBuffer)); + readerThread.start(); } public int read() throws IOException @@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream return totalCompressedBytesRead; } - static class Reader extends WrappedRunnable + class Reader extends WrappedRunnable { private final InputStream source; private final Iterator chunks; @@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream protected void runMayThrow() throws Exception { byte[] compressedWithCRC; - while (chunks.hasNext()) + while (!Thread.currentThread().isInterrupted() && chunks.hasNext()) { CompressionMetadata.Chunk chunk = chunks.next(); @@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream int bufferRead = 0; while (bufferRead < readLength) { - int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead); - if (r < 0) + int r; + try + { + r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead); + if (r < 0) + { + dataBuffer.put(POISON_PILL); + return; // throw exception where we consume dataBuffer + } + } + catch (IOException e) { dataBuffer.put(POISON_PILL); - return; // throw exception where we consume dataBuffer + throw e; } bufferRead += r; } dataBuffer.put(compressedWithCRC); } + synchronized(CompressedInputStream.this) + { + readerThread = null; + } } } + + @Override + public void close() throws IOException + { + synchronized(this) + { + if (readerThread != null) + { + readerThread.interrupt(); + readerThread = null; + } + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 0529496..4f60773 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader else throw Throwables.propagate(e); } + finally + { + cis.close(); + } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index f3007da..87e0003 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -17,12 +17,10 @@ */ package org.apache.cassandra.streaming.compress; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.File; -import java.io.RandomAccessFile; +import java.io.*; import java.util.*; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -58,6 +56,53 @@ public class CompressedInputStreamTest { testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true); } + + /** + * Test CompressedInputStream not hang when closed while reading + * @throws Exception + */ + @Test(expected = EOFException.class) + public void testClose() throws Exception + { + CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.emptyMap()); + CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)}; + final SynchronousQueue blocker = new SynchronousQueue<>(); + InputStream blockingInput = new InputStream() + { + @Override + public int read() throws IOException + { + try + { + // 10 second cut off not to stop other test in case + return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new IOException("Interrupted as expected", e); + } + } + }; + CompressionInfo info = new CompressionInfo(chunks, param); + try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true)) + { + new Thread(new Runnable() + { + @Override + public void run() + { + try + { + cis.close(); + } + catch (Exception ignore) {} + } + }).start(); + // block here + cis.read(); + } + } + /** * @param valuesToCheck array of longs of range(0-999) * @throws Exception @@ -70,18 +115,20 @@ public class CompressedInputStreamTest File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db"); Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath()); MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)); - CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP); - CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector); - Map index = new HashMap(); - for (long l = 0L; l < 1000; l++) + CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.emptyMap()); + Map index = new HashMap<>(); + try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector)) { - index.put(l, writer.getFilePointer()); - writer.stream.writeLong(l); + for (long l = 0L; l < 1000; l++) + { + index.put(l, writer.getFilePointer()); + writer.stream.writeLong(l); + } + writer.close(); } - writer.close(); CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath()); - List> sections = new ArrayList>(); + List> sections = new ArrayList<>(); for (long l : valuesToCheck) { long position = index.get(l); @@ -100,14 +147,15 @@ public class CompressedInputStreamTest size += (c.length + 4); // 4bytes CRC byte[] toRead = new byte[size]; - RandomAccessFile f = new RandomAccessFile(tmp, "r"); - int pos = 0; - for (CompressionMetadata.Chunk c : chunks) + try (RandomAccessFile f = new RandomAccessFile(tmp, "r")) { - f.seek(c.offset); - pos += f.read(toRead, pos, c.length + 4); + int pos = 0; + for (CompressionMetadata.Chunk c : chunks) + { + f.seek(c.offset); + pos += f.read(toRead, pos, c.length + 4); + } } - f.close(); if (testTruncate) { @@ -119,13 +167,15 @@ public class CompressedInputStreamTest // read buffer using CompressedInputStream CompressionInfo info = new CompressionInfo(chunks, param); CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true); - DataInputStream in = new DataInputStream(input); - for (int i = 0; i < sections.size(); i++) + try (DataInputStream in = new DataInputStream(input)) { - input.position(sections.get(i).left); - long exp = in.readLong(); - assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp; + for (int i = 0; i < sections.size(); i++) + { + input.position(sections.get(i).left); + long readValue = in.readLong(); + assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue); + } } } }