Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1DDF7171F5 for ; Fri, 26 Sep 2014 18:33:16 +0000 (UTC) Received: (qmail 28350 invoked by uid 500); 26 Sep 2014 18:33:15 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 28249 invoked by uid 500); 26 Sep 2014 18:33:15 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 28000 invoked by uid 99); 26 Sep 2014 18:33:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Sep 2014 18:33:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4B1499B37A8; Fri, 26 Sep 2014 18:33:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Fri, 26 Sep 2014 18:33:17 -0000 Message-Id: <15e479d472a243dda271a7b2036c06a7@git.apache.org> In-Reply-To: <9c667de37a144d1eace2bcc81494dc79@git.apache.org> References: <9c667de37a144d1eace2bcc81494dc79@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] git commit: Fix wrong progress when streaming uncompressed Fix wrong progress when streaming uncompressed patch by yukim; reviewed by Josh McKenzie for CASSANDRA-7878 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3db38d7e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3db38d7e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3db38d7e Branch: refs/heads/trunk Commit: 3db38d7ed12960657cd5a79374c4ef28d6b9a966 Parents: b1166c0 Author: Yuki Morishita Authored: Thu Sep 4 11:27:39 2014 -0500 Committer: Yuki Morishita Committed: Fri Sep 26 12:22:44 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamWriter.java | 22 ++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db38d7e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 00603f3..40e2f2c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -23,6 +23,7 @@ * Make disruptor_thrift_server invocation pool configurable (CASSANDRA-7594) * Make repair no-op when RF=1 (CASSANDRA-7864) * Fix NPE when table dropped during streaming (CASSANDRA-7946) + * Fix wrong progress when streaming uncompressed (CASSANDRA-7878) Merged from 1.2: * Don't index tombstones (CASSANDRA-7828) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3db38d7e/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java index 5a5163f..43bc26a 100644 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java @@ -87,7 +87,7 @@ public class StreamWriter for (Pair section : sections) { long start = validator == null ? section.left : validator.chunkStart(section.left); - int skipBytes = (int) (section.left - start); + int readOffset = (int) (section.left - start); // seek to the beginning of the section file.seek(start); if (validator != null) @@ -96,14 +96,14 @@ public class StreamWriter // length of the section to read long length = section.right - start; // tracks write progress - long bytesTransferred = 0; - while (bytesTransferred < length) + long bytesRead = 0; + while (bytesRead < length) { - long lastWrite = write(file, validator, skipBytes, length, bytesTransferred); - bytesTransferred += lastWrite; - progress += lastWrite; + long lastBytesRead = write(file, validator, readOffset, length, bytesRead); + bytesRead += lastBytesRead; + progress += (lastBytesRead - readOffset); session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize); - skipBytes = 0; + readOffset = 0; } // make sure that current section is send @@ -132,10 +132,10 @@ public class StreamWriter * @param reader The file reader to read from * @param validator validator to verify data integrity * @param start number of bytes to skip transfer, but include for validation. - * @param length The full length that should be transferred - * @param bytesTransferred Number of bytes remaining to transfer + * @param length The full length that should be read from {@code reader} + * @param bytesTransferred Number of bytes already read out of {@code length} * - * @return Number of bytes transferred + * @return Number of bytes read * * @throws java.io.IOException on any I/O error */ @@ -148,7 +148,7 @@ public class StreamWriter if (validator != null) validator.validate(transferBuffer, 0, minReadable); - limiter.acquire(toTransfer); + limiter.acquire(toTransfer - start); compressedOutput.write(transferBuffer, start, (toTransfer - start)); return toTransfer;