Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ACFE1113DD for ; Wed, 13 Aug 2014 06:19:49 +0000 (UTC) Received: (qmail 37179 invoked by uid 500); 13 Aug 2014 06:19:49 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 37142 invoked by uid 500); 13 Aug 2014 06:19:49 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 37133 invoked by uid 99); 13 Aug 2014 06:19:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Aug 2014 06:19:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4359D9A42C4; Wed, 13 Aug 2014 06:19:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: <3bea5fdae55c48d0bc5ae2ad7dd89672@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Use transferTo when copy merge files in ExternalSorter Date: Wed, 13 Aug 2014 06:19:49 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-1.1 837bf60fd -> be674b34b Use transferTo when copy merge files in ExternalSorter Since this is a file to file copy, using transferTo should be faster. Author: Raymond Liu Closes #1884 from colorant/externalSorter and squashes the following commits: 6e42f3c [Raymond Liu] More code into copyStream bfb496b [Raymond Liu] Use transferTo when copy merge files in ExternalSorter (cherry picked from commit 246cb3f158686348a698d1c0da3001c314727129) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be674b34 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be674b34 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be674b34 Branch: refs/heads/branch-1.1 Commit: be674b34bed93eafeb621cbac5d5bb5f3a60e8f4 Parents: 837bf60 Author: Raymond Liu Authored: Tue Aug 12 23:19:35 2014 -0700 Committer: Reynold Xin Committed: Tue Aug 12 23:19:45 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/Utils.scala | 29 +++++++++++++++----- .../spark/util/collection/ExternalSorter.scala | 7 ++--- 2 files changed, 25 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/be674b34/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c60be4f..8cac5da 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -284,17 +284,32 @@ private[spark] object Utils extends Logging { /** Copy all data from an InputStream to an OutputStream */ def copyStream(in: InputStream, out: OutputStream, - closeStreams: Boolean = false) + closeStreams: Boolean = false): Long = { + var count = 0L try { - val buf = new Array[Byte](8192) - var n = 0 - while (n != -1) { - n = in.read(buf) - if (n != -1) { - out.write(buf, 0, n) + if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) { + // When both streams are File stream, use transferTo to improve copy performance. + val inChannel = in.asInstanceOf[FileInputStream].getChannel() + val outChannel = out.asInstanceOf[FileOutputStream].getChannel() + val size = inChannel.size() + + // In case transferTo method transferred less data than we have required. + while (count < size) { + count += inChannel.transferTo(count, size - count, outChannel) + } + } else { + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { + n = in.read(buf) + if (n != -1) { + out.write(buf, 0, n) + count += n + } } } + count } finally { if (closeStreams) { try { http://git-wip-us.apache.org/repos/asf/spark/blob/be674b34/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index b73d5e0..5d8a648 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -745,12 +745,11 @@ private[spark] class ExternalSorter[K, V, C]( try { out = new FileOutputStream(outputFile) for (i <- 0 until numPartitions) { - val file = partitionWriters(i).fileSegment().file - in = new FileInputStream(file) - org.apache.spark.util.Utils.copyStream(in, out) + in = new FileInputStream(partitionWriters(i).fileSegment().file) + val size = org.apache.spark.util.Utils.copyStream(in, out, false) in.close() in = null - lengths(i) = file.length() + lengths(i) = size offsets(i + 1) = offsets(i) + lengths(i) } } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org