spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: Use transferTo when copy merge files in ExternalSorter
Date Wed, 13 Aug 2014 06:19:49 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 837bf60fd -> be674b34b


Use transferTo when copy merge files in ExternalSorter

Since this is a file to file copy, using transferTo should be faster.

Author: Raymond Liu <raymond.liu@intel.com>

Closes #1884 from colorant/externalSorter and squashes the following commits:

6e42f3c [Raymond Liu] More code into copyStream
bfb496b [Raymond Liu] Use transferTo when copy merge files in ExternalSorter

(cherry picked from commit 246cb3f158686348a698d1c0da3001c314727129)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be674b34
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be674b34
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be674b34

Branch: refs/heads/branch-1.1
Commit: be674b34bed93eafeb621cbac5d5bb5f3a60e8f4
Parents: 837bf60
Author: Raymond Liu <raymond.liu@intel.com>
Authored: Tue Aug 12 23:19:35 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Tue Aug 12 23:19:45 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 29 +++++++++++++++-----
 .../spark/util/collection/ExternalSorter.scala  |  7 ++---
 2 files changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be674b34/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c60be4f..8cac5da 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -284,17 +284,32 @@ private[spark] object Utils extends Logging {
   /** Copy all data from an InputStream to an OutputStream */
   def copyStream(in: InputStream,
                  out: OutputStream,
-                 closeStreams: Boolean = false)
+                 closeStreams: Boolean = false): Long =
   {
+    var count = 0L
     try {
-      val buf = new Array[Byte](8192)
-      var n = 0
-      while (n != -1) {
-        n = in.read(buf)
-        if (n != -1) {
-          out.write(buf, 0, n)
+      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream])
{
+        // When both streams are File stream, use transferTo to improve copy performance.
+        val inChannel = in.asInstanceOf[FileInputStream].getChannel()
+        val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
+        val size = inChannel.size()
+
+        // In case transferTo method transferred less data than we have required.
+        while (count < size) {
+          count += inChannel.transferTo(count, size - count, outChannel)
+        }
+      } else {
+        val buf = new Array[Byte](8192)
+        var n = 0
+        while (n != -1) {
+          n = in.read(buf)
+          if (n != -1) {
+            out.write(buf, 0, n)
+            count += n
+          }
         }
       }
+      count
     } finally {
       if (closeStreams) {
         try {

http://git-wip-us.apache.org/repos/asf/spark/blob/be674b34/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index b73d5e0..5d8a648 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -745,12 +745,11 @@ private[spark] class ExternalSorter[K, V, C](
       try {
         out = new FileOutputStream(outputFile)
         for (i <- 0 until numPartitions) {
-          val file = partitionWriters(i).fileSegment().file
-          in = new FileInputStream(file)
-          org.apache.spark.util.Utils.copyStream(in, out)
+          in = new FileInputStream(partitionWriters(i).fileSegment().file)
+          val size = org.apache.spark.util.Utils.copyStream(in, out, false)
           in.close()
           in = null
-          lengths(i) = file.length()
+          lengths(i) = size
           offsets(i + 1) = offsets(i) + lengths(i)
         }
       } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message