Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 80A3F1887D for ; Mon, 23 Nov 2015 20:00:41 +0000 (UTC) Received: (qmail 17688 invoked by uid 500); 23 Nov 2015 20:00:40 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 17598 invoked by uid 500); 23 Nov 2015 20:00:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 17176 invoked by uid 99); 23 Nov 2015 20:00:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Nov 2015 20:00:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 06065E0998; Mon, 23 Nov 2015 20:00:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Mon, 23 Nov 2015 20:00:44 -0000 Message-Id: <841f705d0abd4e66a6ca9eb6be5c3574@git.apache.org> In-Reply-To: <0f9111065ec1438f9ba2460c940ea625@git.apache.org> References: <0f9111065ec1438f9ba2460c940ea625@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2 Merge branch 'cassandra-2.1' into cassandra-2.2 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe Branch: refs/heads/trunk Commit: 056055febd55e1c19a6216627b8568e60141b9fa Parents: 2aa8342 8b9a916 Author: Yuki Morishita Authored: Mon Nov 23 13:17:39 2015 -0600 Committer: Yuki Morishita Committed: Mon Nov 23 13:17:39 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compress/CompressedInputStream.java | 46 +++++++-- .../compress/CompressedStreamReader.java | 4 + .../compress/CompressedInputStreamTest.java | 98 +++++++++++++++----- 4 files changed, 117 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 146a0ce,c4dd54e..d11be26 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,17 -1,5 +1,18 @@@ -2.1.12 +2.2.4 + * Don't do anticompaction after subrange repair (CASSANDRA-10422) + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) +Merged from 2.1: + * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012) * (cqlsh) Support counters in COPY commands (CASSANDRA-9043) * Try next replica if not possible to connect to primary replica on ColumnFamilyRecordReader (CASSANDRA-2388) http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 872afcd,b4a3065..daa339a --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte private static final byte[] POISON_PILL = new byte[0]; private long totalCompressedBytesRead; - private final boolean hasPostCompressionAdlerChecksums; + private Thread readerThread; + /** * @param source Input source to read compressed data from * @param info Compression info */ - public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums) + public CompressedInputStream(InputStream source, CompressionInfo info) { this.info = info; - this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32(); - this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums; + this.checksum = new Adler32(); this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks - this.dataBuffer = new ArrayBlockingQueue(Math.min(info.chunks.length, 1024)); + this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); - new Thread(new Reader(source, info, dataBuffer)).start(); + readerThread = new Thread(new Reader(source, info, dataBuffer)); + readerThread.start(); } public int read() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index 0214c76,87e0003..e692441 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes { testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true); } + + /** + * Test CompressedInputStream not hang when closed while reading - * @throws Exception ++ * @throws IOException + */ + @Test(expected = EOFException.class) - public void testClose() throws Exception ++ public void testClose() throws IOException + { + CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.emptyMap()); + CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)}; + final SynchronousQueue blocker = new SynchronousQueue<>(); + InputStream blockingInput = new InputStream() + { + @Override + public int read() throws IOException + { + try + { + // 10 second cut off not to stop other test in case + return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new IOException("Interrupted as expected", e); + } + } + }; + CompressionInfo info = new CompressionInfo(chunks, param); - try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true)) ++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info)) + { + new Thread(new Runnable() + { + @Override + public void run() + { + try + { + cis.close(); + } + catch (Exception ignore) {} + } + }).start(); + // block here + cis.read(); + } + } + /** * @param valuesToCheck array of longs of range(0-999) * @throws Exception @@@ -70,18 -115,20 +115,20 @@@ File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db"); Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath()); MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)); - CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP); - CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector); - Map index = new HashMap(); - for (long l = 0L; l < 1000; l++) + CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.emptyMap()); + Map index = new HashMap<>(); + try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector)) { - index.put(l, writer.getFilePointer()); - writer.stream.writeLong(l); + for (long l = 0L; l < 1000; l++) + { + index.put(l, writer.getFilePointer()); + writer.stream.writeLong(l); + } - writer.close(); ++ writer.finish(); } - writer.finish(); CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath()); - List> sections = new ArrayList>(); + List> sections = new ArrayList<>(); for (long l : valuesToCheck) { long position = index.get(l); @@@ -118,14 -166,16 +166,16 @@@ // read buffer using CompressedInputStream CompressionInfo info = new CompressionInfo(chunks, param); - CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true); + CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info); - DataInputStream in = new DataInputStream(input); - for (int i = 0; i < sections.size(); i++) + try (DataInputStream in = new DataInputStream(input)) { - input.position(sections.get(i).left); - long readValue = in.readLong(); - assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue; + for (int i = 0; i < sections.size(); i++) + { + input.position(sections.get(i).left); + long readValue = in.readLong(); + assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue); + } } } }