Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DD81E18108 for ; Wed, 13 Jan 2016 19:16:55 +0000 (UTC) Received: (qmail 23731 invoked by uid 500); 13 Jan 2016 19:16:54 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 23463 invoked by uid 500); 13 Jan 2016 19:16:54 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 22694 invoked by uid 99); 13 Jan 2016 19:16:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jan 2016 19:16:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 06D26E00AA; Wed, 13 Jan 2016 19:16:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Wed, 13 Jan 2016 19:16:56 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/10] cassandra git commit: Fix error streaming section more than 2GB Fix error streaming section more than 2GB patch by Paulo Motta; reviewed by yukim for CASSANDRA-10961 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/582bdba4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/582bdba4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/582bdba4 Branch: refs/heads/trunk Commit: 582bdba4b201e6ab8e2a9a05cff3566f1bab9dce Parents: c7f0032 Author: Paulo Motta Authored: Wed Jan 13 12:51:17 2016 -0600 Committer: Yuki Morishita Committed: Wed Jan 13 13:00:11 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/composites/AbstractCType.java | 3 +- .../cassandra/streaming/ConnectionHandler.java | 2 +- .../cassandra/streaming/StreamReader.java | 26 ++++++++--- .../cassandra/streaming/StreamWriter.java | 10 ++++- .../compress/CompressedInputStream.java | 45 +++++++------------ .../compress/CompressedStreamReader.java | 32 ++++++++++---- .../compress/CompressedStreamWriter.java | 16 ++++++- .../compress/CompressedInputStreamTest.java | 46 -------------------- 9 files changed, 86 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1554cf5..11f2529 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.5 + * Fix error streaming section more than 2GB (CASSANDRA-10961) * (cqlsh) Also apply --connect-timeout to control connection timeout (CASSANDRA-10959) * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975) http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/db/composites/AbstractCType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java index a982280..2190c69 100644 --- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java +++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java @@ -359,7 +359,8 @@ public abstract class AbstractCType implements CType protected static void checkRemaining(ByteBuffer bb, int offs, int length) { if (offs + length > bb.limit()) - throw new IllegalArgumentException("Not enough bytes"); + throw new IllegalArgumentException(String.format("Not enough bytes. Offset: %d. Length: %d. Buffer size: %d", + offs, length, bb.limit())); } private static class Serializer implements CType.Serializer http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 681f61e..1ec7e1c 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -259,11 +259,11 @@ public class ConnectionHandler { // receive message StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session); + logger.debug("[Stream #{}] Received {}", session.planId(), message); // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok // to ignore here since we'll have asked for a retry. if (message != null) { - logger.debug("[Stream #{}] Received {}", session.planId(), message); session.messageReceived(message); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index fe3b13d..8789720 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -60,6 +60,7 @@ public class StreamReader protected final long repairedAt; protected final SSTableFormat.Type format; protected final int sstableLevel; + protected final int fileSeqNum; protected Descriptor desc; @@ -73,6 +74,7 @@ public class StreamReader this.repairedAt = header.repairedAt; this.format = header.format; this.sstableLevel = header.sstableLevel; + this.fileSeqNum = header.sequenceNumber; } /** @@ -83,33 +85,46 @@ public class StreamReader @SuppressWarnings("resource") public SSTableWriter read(ReadableByteChannel channel) throws IOException { - logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel); long totalSize = totalSize(); Pair kscf = Schema.instance.getCF(cfId); - if (kscf == null) + ColumnFamilyStore cfs = null; + if (kscf != null) + cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + if (kscf == null || cfs == null) { // schema was dropped during streaming throw new IOException("CF " + cfId + " was dropped during streaming"); } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + cfs.getColumnFamilyName()); DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); BytesReadTracker in = new BytesReadTracker(dis); SSTableWriter writer = null; + DecoratedKey key = null; try { writer = createWriter(cfs, totalSize, repairedAt, format); while (in.getBytesRead() < totalSize) { - writeRow(writer, in, cfs); + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + writeRow(key, writer, in, cfs); // TODO move this to BytesReadTracker session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", + session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize); return writer; } catch (Throwable e) { + if (key != null) + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName()); if (writer != null) { try @@ -167,9 +182,8 @@ public class StreamReader return size; } - protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException + protected void writeRow(DecoratedKey key, SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException { - DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); writer.appendFromStream(key, cfs.metadata, in, inputVersion); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java index 106677c..721ae1e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.ning.compress.lzf.LZFOutputStream; import org.apache.cassandra.io.sstable.Component; @@ -41,6 +44,8 @@ public class StreamWriter { private static final int DEFAULT_CHUNK_SIZE = 64 * 1024; + private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class); + protected final SSTableReader sstable; protected final Collection> sections; protected final StreamRateLimiter limiter; @@ -70,7 +75,8 @@ public class StreamWriter public void write(DataOutputStreamPlus output) throws IOException { long totalSize = totalSize(); - + logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); try(RandomAccessReader file = sstable.openDataReader(); ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists() @@ -109,6 +115,8 @@ public class StreamWriter // make sure that current section is sent compressedOutput.flush(); } + logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", + session.planId(), sstable.getFilename(), session.peer, progress, totalSize); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index daa339a..489fed9 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -30,6 +30,9 @@ import java.util.zip.Checksum; import com.google.common.collect.Iterators; import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.utils.WrappedRunnable; @@ -38,12 +41,15 @@ import org.apache.cassandra.utils.WrappedRunnable; */ public class CompressedInputStream extends InputStream { + + private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class); + private final CompressionInfo info; // chunk buffer private final BlockingQueue dataBuffer; // uncompressed bytes - private final byte[] buffer; + private byte[] buffer; // offset from the beginning of the buffer protected long bufferOffset = 0; @@ -61,8 +67,6 @@ public class CompressedInputStream extends InputStream private long totalCompressedBytesRead; - private Thread readerThread; - /** * @param source Input source to read compressed data from * @param info Compression info @@ -73,10 +77,9 @@ public class CompressedInputStream extends InputStream this.checksum = new Adler32(); this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks - this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); + this.dataBuffer = new ArrayBlockingQueue(Math.min(info.chunks.length, 1024)); - readerThread = new Thread(new Reader(source, info, dataBuffer)); - readerThread.start(); + new Thread(new Reader(source, info, dataBuffer)).start(); } public int read() throws IOException @@ -135,7 +138,7 @@ public class CompressedInputStream extends InputStream return totalCompressedBytesRead; } - class Reader extends WrappedRunnable + static class Reader extends WrappedRunnable { private final InputStream source; private final Iterator chunks; @@ -151,7 +154,7 @@ public class CompressedInputStream extends InputStream protected void runMayThrow() throws Exception { byte[] compressedWithCRC; - while (!Thread.currentThread().isInterrupted() && chunks.hasNext()) + while (chunks.hasNext()) { CompressionMetadata.Chunk chunk = chunks.next(); @@ -161,43 +164,25 @@ public class CompressedInputStream extends InputStream int bufferRead = 0; while (bufferRead < readLength) { - int r; try { - r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead); + int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead); if (r < 0) { dataBuffer.put(POISON_PILL); return; // throw exception where we consume dataBuffer } + bufferRead += r; } catch (IOException e) { + logger.warn("Error while reading compressed input stream.", e); dataBuffer.put(POISON_PILL); - throw e; + return; // throw exception where we consume dataBuffer } - bufferRead += r; } dataBuffer.put(compressedWithCRC); } - synchronized(CompressedInputStream.this) - { - readerThread = null; - } - } - } - - @Override - public void close() throws IOException - { - synchronized(this) - { - if (readerThread != null) - { - readerThread.interrupt(); - readerThread = null; - } } } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 2277943..c684e4f 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel; import com.google.common.base.Throwables; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.slf4j.Logger; @@ -34,10 +35,12 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.FileMessageHeader; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; import org.apache.cassandra.utils.Pair; @@ -64,44 +67,59 @@ public class CompressedStreamReader extends StreamReader @SuppressWarnings("resource") public SSTableWriter read(ReadableByteChannel channel) throws IOException { - logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt); long totalSize = totalSize(); Pair kscf = Schema.instance.getCF(cfId); - if (kscf == null) + ColumnFamilyStore cfs = null; + if (kscf != null) + cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + if (kscf == null || cfs == null) { // schema was dropped during streaming throw new IOException("CF " + cfId + " was dropped during streaming"); } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + cfs.getColumnFamilyName()); CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); SSTableWriter writer = null; + DecoratedKey key = null; try { writer = createWriter(cfs, totalSize, repairedAt, format); + int sectionIdx = 0; for (Pair section : sections) { assert cis.getTotalCompressedBytesRead() <= totalSize; - int sectionLength = (int) (section.right - section.left); + long sectionLength = section.right - section.left; + logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength); // skip to beginning of section inside chunk cis.position(section.left); in.reset(0); while (in.getBytesRead() < sectionLength) { - writeRow(writer, in, cfs); + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + writeRow(key, writer, in, cfs); // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, + session.peer, cis.getTotalCompressedBytesRead(), totalSize); return writer; } catch (Throwable e) { + if (key != null) + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName()); if (writer != null) { try @@ -120,10 +138,6 @@ public class CompressedStreamReader extends StreamReader else throw Throwables.propagate(e); } - finally - { - cis.close(); - } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index 144980c..99e9bd6 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@ -25,11 +25,13 @@ import java.util.List; import com.google.common.base.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamSession; @@ -43,6 +45,8 @@ public class CompressedStreamWriter extends StreamWriter { public static final int CHUNK_SIZE = 10 * 1024 * 1024; + private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class); + private final CompressionInfo compressionInfo; public CompressedStreamWriter(SSTableReader sstable, Collection> sections, CompressionInfo compressionInfo, StreamSession session) @@ -55,16 +59,24 @@ public class CompressedStreamWriter extends StreamWriter public void write(DataOutputStreamPlus out) throws IOException { long totalSize = totalSize(); + logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel()) { long progress = 0L; // calculate chunks to transfer. we want to send continuous chunks altogether. List> sections = getTransferSections(compressionInfo.chunks); + + int sectionIdx = 0; + // stream each of the required sections of the file for (final Pair section : sections) { // length of the section to stream long length = section.right - section.left; + + logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length); + // tracks write progress long bytesTransferred = 0; while (bytesTransferred < length) @@ -84,6 +96,8 @@ public class CompressedStreamWriter extends StreamWriter session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize); } } + logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", + session.planId(), sstable.getFilename(), session.peer, progress, totalSize); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index e692441..0becd18 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -58,52 +58,6 @@ public class CompressedInputStreamTest } /** - * Test CompressedInputStream not hang when closed while reading - * @throws IOException - */ - @Test(expected = EOFException.class) - public void testClose() throws IOException - { - CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.emptyMap()); - CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)}; - final SynchronousQueue blocker = new SynchronousQueue<>(); - InputStream blockingInput = new InputStream() - { - @Override - public int read() throws IOException - { - try - { - // 10 second cut off not to stop other test in case - return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS)); - } - catch (InterruptedException e) - { - throw new IOException("Interrupted as expected", e); - } - } - }; - CompressionInfo info = new CompressionInfo(chunks, param); - try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info)) - { - new Thread(new Runnable() - { - @Override - public void run() - { - try - { - cis.close(); - } - catch (Exception ignore) {} - } - }).start(); - // block here - cis.read(); - } - } - - /** * @param valuesToCheck array of longs of range(0-999) * @throws Exception */