spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-14542][CORE] PipeRDD should allow configurable buffer size for…
Date Tue, 10 May 2016 14:29:15 GMT
Repository: spark
Updated Branches:
  refs/heads/master 570647267 -> a019e6efb


[SPARK-14542][CORE] PipeRDD should allow configurable buffer size for…

## What changes were proposed in this pull request?

Currently PipedRDD internally uses PrintWriter to write data to the stdin of the piped process,
which by default uses a BufferedWriter of buffer size 8k. In our experiment, we have seen
that 8k buffer size is too small and the job spends significant amount of CPU time in system
calls to copy the data. We should have a way to configure the buffer size for the writer.

## How was this patch tested?
Ran PipedRDDSuite tests.

Author: Sital Kedia <skedia@fb.com>

Closes #12309 from sitalkedia/bufferedPipedRDD.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a019e6ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a019e6ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a019e6ef

Branch: refs/heads/master
Commit: a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8
Parents: 5706472
Author: Sital Kedia <skedia@fb.com>
Authored: Tue May 10 15:28:35 2016 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Tue May 10 15:28:35 2016 +0100

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 58 +++++++++++++-------
 .../scala/org/apache/spark/rdd/PipedRDD.scala   | 10 +++-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  7 ++-
 .../org/apache/spark/rdd/PipedRDDSuite.scala    |  2 +-
 project/MimaExcludes.scala                      |  4 ++
 5 files changed, 54 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6f3b8fa..c17ca12 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
 
 import java.{lang => jl}
 import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -80,7 +80,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
    * This should ''not'' be called by users directly, but is available for implementors of
custom
    * subclasses of RDD.
    */
-  def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
+  def iterator(split: Partition, taskContext: TaskContext): JIterator[T] =
     rdd.iterator(split, taskContext).asJava
 
   // Transformations (return a new RDD)
@@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
    * of the original partition.
    */
   def mapPartitionsWithIndex[R](
-      f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
+      f: JFunction2[jl.Integer, JIterator[T], JIterator[R]],
       preservesPartitioning: Boolean = false): JavaRDD[R] =
     new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala,
         preservesPartitioning)(fakeClassTag))(fakeClassTag)
@@ -147,7 +147,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
+  def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
     }
@@ -157,7 +157,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
+  def mapPartitions[U](f: FlatMapFunction[JIterator[T], U],
       preservesPartitioning: Boolean): JavaRDD[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
@@ -169,7 +169,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD
= {
+  def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
     }
@@ -179,7 +179,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]):
   JavaPairRDD[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
@@ -190,7 +190,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+  def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]],
       preservesPartitioning: Boolean): JavaDoubleRDD = {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
@@ -202,7 +202,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2],
       preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Applies a function f to each partition of this RDD.
    */
-  def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
+  def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = {
     rdd.foreachPartition(x => f.call(x.asJava))
   }
 
@@ -256,19 +256,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
+  def pipe(command: String): JavaRDD[String] = {
+    rdd.pipe(command)
+  }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: JList[String]): JavaRDD[String] =
+  def pipe(command: JList[String]): JavaRDD[String] = {
     rdd.pipe(command.asScala)
+  }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
+  def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
     rdd.pipe(command.asScala, env.asScala)
+  }
+
+  /**
+   * Return an RDD created by piping elements to a forked external process.
+   */
+  def pipe(command: JList[String],
+           env: JMap[String, String],
+           separateWorkingDir: Boolean,
+           bufferSize: Int): JavaRDD[String] = {
+    rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
+  }
 
   /**
    * Zips this RDD with another one, returning key-value pairs with the first element in
each RDD,
@@ -288,7 +302,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
    */
   def zipPartitions[U, V](
       other: JavaRDDLike[U, _],
-      f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] =
{
+      f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
     def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
       (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
     }
@@ -446,8 +460,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
    * Return the count of each unique value in this RDD as a map of (value, count) pairs.
The final
    * combine step happens locally on the master, equivalent to running a single reduce task.
    */
-  def countByValue(): java.util.Map[T, jl.Long] =
-    mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[java.util.Map[T, jl.Long]]
+  def countByValue(): JMap[T, jl.Long] =
+    mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]]
 
   /**
    * (Experimental) Approximate version of countByValue().
@@ -455,13 +469,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   def countByValueApprox(
     timeout: Long,
     confidence: Double
-    ): PartialResult[java.util.Map[T, BoundedDouble]] =
+    ): PartialResult[JMap[T, BoundedDouble]] =
     rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap)
 
   /**
    * (Experimental) Approximate version of countByValue().
    */
-  def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
+  def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] =
     rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap)
 
   /**
@@ -596,9 +610,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Returns the maximum element from this RDD as defined by the specified
    * Comparator[T].
+   *
    * @param comp the comparator that defines ordering
    * @return the maximum of the RDD
-   * */
+   */
   def max(comp: Comparator[T]): T = {
     rdd.max()(Ordering.comparatorToOrdering(comp))
   }
@@ -606,9 +621,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
   /**
    * Returns the minimum element from this RDD as defined by the specified
    * Comparator[T].
+   *
    * @param comp the comparator that defines ordering
    * @return the minimum of the RDD
-   * */
+   */
   def min(comp: Comparator[T]): T = {
     rdd.min()(Ordering.comparatorToOrdering(comp))
   }
@@ -684,7 +700,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
    * The asynchronous version of the `foreachPartition` action, which
    * applies a function f to each partition of this RDD.
    */
-  def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void]
= {
+  def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = {
     new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)),
       { x => null.asInstanceOf[Void] })
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index dd8e46b..4561685 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.rdd
 
+import java.io.BufferedWriter
 import java.io.File
 import java.io.FilenameFilter
 import java.io.IOException
+import java.io.OutputStreamWriter
 import java.io.PrintWriter
 import java.util.StringTokenizer
 import java.util.concurrent.atomic.AtomicReference
@@ -45,7 +47,8 @@ private[spark] class PipedRDD[T: ClassTag](
     envVars: Map[String, String],
     printPipeContext: (String => Unit) => Unit,
     printRDDElement: (T, String => Unit) => Unit,
-    separateWorkingDir: Boolean)
+    separateWorkingDir: Boolean,
+    bufferSize: Int)
   extends RDD[String](prev) {
 
   // Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -58,7 +61,7 @@ private[spark] class PipedRDD[T: ClassTag](
       printRDDElement: (T, String => Unit) => Unit = null,
       separateWorkingDir: Boolean = false) =
     this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
-      separateWorkingDir)
+      separateWorkingDir, 8192)
 
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
@@ -144,7 +147,8 @@ private[spark] class PipedRDD[T: ClassTag](
     new Thread(s"stdin writer for $command") {
       override def run(): Unit = {
         TaskContext.setTaskContext(context)
-        val out = new PrintWriter(proc.getOutputStream)
+        val out = new PrintWriter(new BufferedWriter(
+          new OutputStreamWriter(proc.getOutputStream), bufferSize))
         try {
           // scalastyle:off println
           // input the pipe context firstly

http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 499a8b9..d85d0ff 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -724,6 +724,7 @@ abstract class RDD[T: ClassTag](
    *                        def printRDDElement(record:(String, Seq[String]), f:String=&gt;Unit)
=
    *                          for (e &lt;- record._2) {f(e)}
    * @param separateWorkingDir Use separate working directories for each task.
+   * @param bufferSize Buffer size for the stdin writer for the piped process.
    * @return the result RDD
    */
   def pipe(
@@ -731,11 +732,13 @@ abstract class RDD[T: ClassTag](
       env: Map[String, String] = Map(),
       printPipeContext: (String => Unit) => Unit = null,
       printRDDElement: (T, String => Unit) => Unit = null,
-      separateWorkingDir: Boolean = false): RDD[String] = withScope {
+      separateWorkingDir: Boolean = false,
+      bufferSize: Int = 8192): RDD[String] = withScope {
     new PipedRDD(this, command, env,
       if (printPipeContext ne null) sc.clean(printPipeContext) else null,
       if (printRDDElement ne null) sc.clean(printRDDElement) else null,
-      separateWorkingDir)
+      separateWorkingDir,
+      bufferSize)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index e9cc819..fe2058d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -171,7 +171,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
       val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
       val collectPwd = pipedPwd.collect()
       assert(collectPwd(0).contains("tasks/"))
-      val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
+      val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
       // make sure symlinks were created
       assert(pipedLs.length > 0)
       // clean up top level tasks directory

http://git-wip-us.apache.org/repos/asf/spark/blob/a019e6ef/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a5d57e1..b0d862d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -686,6 +686,10 @@ object MimaExcludes {
         ProblemFilters.exclude[IncompatibleMethTypeProblem](
           "org.apache.spark.sql.DataFrameReader.this")
       ) ++ Seq(
+        // SPARK-14542 configurable buffer size for pipe RDD
+        ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe"),
+        ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")
+      ) ++ Seq(
         // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread
for memory
         ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
       ) ++ Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message