Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 04F80185F5 for ; Fri, 15 Jan 2016 18:22:57 +0000 (UTC) Received: (qmail 20295 invoked by uid 500); 15 Jan 2016 18:22:56 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 20200 invoked by uid 500); 15 Jan 2016 18:22:56 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 19795 invoked by uid 99); 15 Jan 2016 18:22:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jan 2016 18:22:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C47C7E098F; Fri, 15 Jan 2016 18:22:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Fri, 15 Jan 2016 18:22:58 -0000 Message-Id: <4edd049e6f7c44438d412364728083a3@git.apache.org> In-Reply-To: <6f2aa5b7c8b54a80b524e98ab0b82b75@git.apache.org> References: <6f2aa5b7c8b54a80b524e98ab0b82b75@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/15] cassandra git commit: Revert CASSANDRA-10012 and add more loggings Revert CASSANDRA-10012 and add more loggings patch by Paulo Motta; reviewed by yukim for CASSANDRA-10961 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58a0079c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58a0079c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58a0079c Branch: refs/heads/cassandra-3.0 Commit: 58a0079c391d12dab97e036f05be070dfaddcc95 Parents: abe0c67 Author: Paulo Motta Authored: Fri Jan 15 12:04:32 2016 -0600 Committer: Yuki Morishita Committed: Fri Jan 15 12:09:56 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/composites/AbstractCType.java | 3 +- .../cassandra/streaming/ConnectionHandler.java | 2 +- .../cassandra/streaming/StreamReader.java | 26 ++++++++--- .../cassandra/streaming/StreamWriter.java | 9 ++++ .../compress/CompressedInputStream.java | 45 +++++++------------ .../compress/CompressedStreamReader.java | 31 +++++++++---- .../compress/CompressedStreamWriter.java | 12 +++++ .../compress/CompressedInputStreamTest.java | 46 -------------------- 9 files changed, 83 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4b87ed0..3d84a30 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.13 + * Revert CASSANDRA-10012 and add more logging (CASSANDRA-10961) * Allow simultaneous bootstrapping with strict consistency when no vnodes are used (CASSANDRA-11005) * Log a message when major compaction does not result in a single file (CASSANDRA-10847) * (cqlsh) fix cqlsh_copy_tests when vnodes are disabled (CASSANDRA-10997) http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/db/composites/AbstractCType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java index 5af7458..fecc847 100644 --- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java +++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java @@ -375,7 +375,8 @@ public abstract class AbstractCType implements CType protected static void checkRemaining(ByteBuffer bb, int offs, int length) { if (offs + length > bb.limit()) - throw new IllegalArgumentException("Not enough bytes"); + throw new IllegalArgumentException(String.format("Not enough bytes. Offset: %d. Length: %d. Buffer size: %d", + offs, length, bb.limit())); } private static class Serializer implements CType.Serializer http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index aa3504a..ac267f9 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -248,11 +248,11 @@ public class ConnectionHandler { // receive message StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session); + logger.debug("[Stream #{}] Received {}", session.planId(), message); // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok // to ignore here since we'll have asked for a retry. if (message != null) { - logger.debug("[Stream #{}] Received {}", session.planId(), message); session.messageReceived(message); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 18013fe..1e3ba7f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -58,6 +58,7 @@ public class StreamReader protected final StreamSession session; protected final Descriptor.Version inputVersion; protected final long repairedAt; + protected final int fileSeqNum; protected Descriptor desc; @@ -69,6 +70,7 @@ public class StreamReader this.sections = header.sections; this.inputVersion = new Descriptor.Version(header.version); this.repairedAt = header.repairedAt; + this.fileSeqNum = header.sequenceNumber; } /** @@ -78,33 +80,46 @@ public class StreamReader */ public SSTableWriter read(ReadableByteChannel channel) throws IOException { - logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt); long totalSize = totalSize(); Pair kscf = Schema.instance.getCF(cfId); - if (kscf == null) + ColumnFamilyStore cfs = null; + if (kscf != null) + cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + if (kscf == null || cfs == null) { // schema was dropped during streaming throw new IOException("CF " + cfId + " was dropped during streaming"); } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + cfs.getColumnFamilyName()); DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); BytesReadTracker in = new BytesReadTracker(dis); SSTableWriter writer = null; + DecoratedKey key = null; try { writer = createWriter(cfs, totalSize, repairedAt); while (in.getBytesRead() < totalSize) { - writeRow(writer, in, cfs); + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + writeRow(key, writer, in, cfs); // TODO move this to BytesReadTracker session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", + session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize); return writer; } catch (Throwable e) { + if (key != null) + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName()); if (writer != null) { try @@ -162,9 +177,8 @@ public class StreamReader return size; } - protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException + protected void writeRow(DecoratedKey key, SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException { - DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); writer.appendFromStream(key, cfs.metadata, in, inputVersion); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java index 43bc26a..2579414 100644 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java @@ -24,6 +24,9 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.ning.compress.lzf.LZFOutputStream; import org.apache.cassandra.io.sstable.Component; @@ -42,6 +45,8 @@ public class StreamWriter { private static final int DEFAULT_CHUNK_SIZE = 64 * 1024; + private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class); + protected final SSTableReader sstable; protected final Collection> sections; protected final StreamRateLimiter limiter; @@ -71,6 +76,8 @@ public class StreamWriter public void write(WritableByteChannel channel) throws IOException { long totalSize = totalSize(); + logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); RandomAccessReader file = sstable.openDataReader(); ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists() ? DataIntegrityMetadata.checksumValidator(sstable.descriptor) @@ -109,6 +116,8 @@ public class StreamWriter // make sure that current section is send compressedOutput.flush(); } + logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", + session.planId(), sstable.getFilename(), session.peer, progress, totalSize); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index b4a3065..6280ccd 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -31,6 +31,9 @@ import java.util.zip.Checksum; import com.google.common.collect.Iterators; import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; @@ -40,12 +43,15 @@ import org.apache.cassandra.utils.WrappedRunnable; */ public class CompressedInputStream extends InputStream { + + private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class); + private final CompressionInfo info; // chunk buffer private final BlockingQueue dataBuffer; // uncompressed bytes - private final byte[] buffer; + private byte[] buffer; // offset from the beginning of the buffer protected long bufferOffset = 0; @@ -64,8 +70,6 @@ public class CompressedInputStream extends InputStream private long totalCompressedBytesRead; private final boolean hasPostCompressionAdlerChecksums; - private Thread readerThread; - /** * @param source Input source to read compressed data from * @param info Compression info @@ -77,10 +81,9 @@ public class CompressedInputStream extends InputStream this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums; this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks - this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); + this.dataBuffer = new ArrayBlockingQueue(Math.min(info.chunks.length, 1024)); - readerThread = new Thread(new Reader(source, info, dataBuffer)); - readerThread.start(); + new Thread(new Reader(source, info, dataBuffer)).start(); } public int read() throws IOException @@ -146,7 +149,7 @@ public class CompressedInputStream extends InputStream return totalCompressedBytesRead; } - class Reader extends WrappedRunnable + static class Reader extends WrappedRunnable { private final InputStream source; private final Iterator chunks; @@ -162,7 +165,7 @@ public class CompressedInputStream extends InputStream protected void runMayThrow() throws Exception { byte[] compressedWithCRC; - while (!Thread.currentThread().isInterrupted() && chunks.hasNext()) + while (chunks.hasNext()) { CompressionMetadata.Chunk chunk = chunks.next(); @@ -172,43 +175,25 @@ public class CompressedInputStream extends InputStream int bufferRead = 0; while (bufferRead < readLength) { - int r; try { - r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead); + int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead); if (r < 0) { dataBuffer.put(POISON_PILL); return; // throw exception where we consume dataBuffer } + bufferRead += r; } catch (IOException e) { + logger.warn("Error while reading compressed input stream.", e); dataBuffer.put(POISON_PILL); - throw e; + return; // throw exception where we consume dataBuffer } - bufferRead += r; } dataBuffer.put(compressedWithCRC); } - synchronized(CompressedInputStream.this) - { - readerThread = null; - } - } - } - - @Override - public void close() throws IOException - { - synchronized(this) - { - if (readerThread != null) - { - readerThread.interrupt(); - readerThread = null; - } } } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 4f60773..fd0d9c8 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -24,6 +24,7 @@ import java.nio.channels.ReadableByteChannel; import com.google.common.base.Throwables; +import org.apache.cassandra.db.DecoratedKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,10 +33,12 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.FileMessageHeader; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; import org.apache.cassandra.utils.Pair; @@ -61,40 +64,56 @@ public class CompressedStreamReader extends StreamReader @Override public SSTableWriter read(ReadableByteChannel channel) throws IOException { - logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt); long totalSize = totalSize(); Pair kscf = Schema.instance.getCF(cfId); - if (kscf == null) + ColumnFamilyStore cfs = null; + if (kscf != null) + cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + if (kscf == null || cfs == null) { // schema was dropped during streaming throw new IOException("CF " + cfId + " was dropped during streaming"); } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + cfs.getColumnFamilyName()); CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); SSTableWriter writer = null; + DecoratedKey key = null; try { writer = createWriter(cfs, totalSize, repairedAt); + int sectionIdx = 0; for (Pair section : sections) { long length = section.right - section.left; // skip to beginning of section inside chunk cis.position(section.left); in.reset(0); + logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, length); while (in.getBytesRead() < length) { - writeRow(writer, in, cfs); + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + writeRow(key, writer, in, cfs); + // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, + session.peer, cis.getTotalCompressedBytesRead(), totalSize); return writer; } catch (Throwable e) { + if (key != null) + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName()); if (writer != null) { try @@ -113,10 +132,6 @@ public class CompressedStreamReader extends StreamReader else throw Throwables.propagate(e); } - finally - { - cis.close(); - } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index 001c927..6fe08e6 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@ -24,6 +24,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; @@ -40,6 +43,8 @@ public class CompressedStreamWriter extends StreamWriter { public static final int CHUNK_SIZE = 10 * 1024 * 1024; + private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class); + private final CompressionInfo compressionInfo; public CompressedStreamWriter(SSTableReader sstable, Collection> sections, CompressionInfo compressionInfo, StreamSession session) @@ -52,12 +57,15 @@ public class CompressedStreamWriter extends StreamWriter public void write(WritableByteChannel channel) throws IOException { long totalSize = totalSize(); + logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); RandomAccessReader file = sstable.openDataReader(); FileChannel fc = file.getChannel(); long progress = 0L; // calculate chunks to transfer. we want to send continuous chunks altogether. List> sections = getTransferSections(compressionInfo.chunks); + int sectionIdx = 0; try { // stream each of the required sections of the file @@ -65,6 +73,8 @@ public class CompressedStreamWriter extends StreamWriter { // length of the section to stream long length = section.right - section.left; + logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length); + // tracks write progress long bytesTransferred = 0; while (bytesTransferred < length) @@ -77,6 +87,8 @@ public class CompressedStreamWriter extends StreamWriter session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize); } } + logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", + session.planId(), sstable.getFilename(), session.peer, progress, totalSize); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/58a0079c/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index 87e0003..c70b932 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -58,52 +58,6 @@ public class CompressedInputStreamTest } /** - * Test CompressedInputStream not hang when closed while reading - * @throws Exception - */ - @Test(expected = EOFException.class) - public void testClose() throws Exception - { - CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.emptyMap()); - CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)}; - final SynchronousQueue blocker = new SynchronousQueue<>(); - InputStream blockingInput = new InputStream() - { - @Override - public int read() throws IOException - { - try - { - // 10 second cut off not to stop other test in case - return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS)); - } - catch (InterruptedException e) - { - throw new IOException("Interrupted as expected", e); - } - } - }; - CompressionInfo info = new CompressionInfo(chunks, param); - try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true)) - { - new Thread(new Runnable() - { - @Override - public void run() - { - try - { - cis.close(); - } - catch (Exception ignore) {} - } - }).start(); - // block here - cis.read(); - } - } - - /** * @param valuesToCheck array of longs of range(0-999) * @throws Exception */