spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: [SPARK-1415] Hadoop min split for wholeTextFiles()
Date Sun, 13 Apr 2014 20:18:59 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4bc07eebb -> 037fe4d2b


[SPARK-1415] Hadoop min split for wholeTextFiles()

JIRA issue [here](https://issues.apache.org/jira/browse/SPARK-1415).

New Hadoop API of `InputFormat` does not provide the `minSplits` parameter, which makes the
API incompatible between `HadoopRDD` and `NewHadoopRDD`. The PR is for constructing compatible
APIs.

Though `minSplits` is deprecated by New Hadoop API, we think it is better to make APIs compatible
here.

**Note** that `minSplits` in `wholeTextFiles` could only be treated as a *suggestion*, the
real number of splits may not be greater than `minSplits` due to `isSplitable()=false`.

Author: Xusen Yin <yinxusen@gmail.com>

Closes #376 from yinxusen/hadoop-min-split and squashes the following commits:

76417f6 [Xusen Yin] refine comments
c10af60 [Xusen Yin] refine comments and rewrite new class for wholeTextFile
766d05b [Xusen Yin] refine Java API and comments
4875755 [Xusen Yin] add minSplits for WholeTextFiles


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/037fe4d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/037fe4d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/037fe4d2

Branch: refs/heads/master
Commit: 037fe4d2ba01be5610baa3dd9c5c9d3a5e5e1064
Parents: 4bc07ee
Author: Xusen Yin <yinxusen@gmail.com>
Authored: Sun Apr 13 13:18:52 2014 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Sun Apr 13 13:18:52 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 17 ++++--
 .../spark/api/java/JavaSparkContext.scala       | 14 ++++-
 .../spark/input/WholeTextFileInputFormat.scala  | 14 +++++
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 60 ++++++++++++++++----
 .../java/org/apache/spark/JavaAPISuite.java     |  2 +-
 .../input/WholeTextFileRecordReaderSuite.scala  |  2 +-
 6 files changed, 90 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/037fe4d2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5a36e6f..456070f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -454,14 +454,21 @@ class SparkContext(config: SparkConf) extends Logging {
    *   (a-hdfs-path/part-nnnnn, its content)
    * }}}
    *
-   * @note Small files are preferred, as each file will be loaded fully in memory.
+   * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+   *
+   * @param minSplits A suggestion value of the minimal splitting number for input data.
    */
-  def wholeTextFiles(path: String): RDD[(String, String)] = {
-    newAPIHadoopFile(
-      path,
+  def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)]
= {
+    val job = new NewHadoopJob(hadoopConfiguration)
+    NewFileInputFormat.addInputPath(job, new Path(path))
+    val updateConf = job.getConfiguration
+    new WholeTextFileRDD(
+      this,
       classOf[WholeTextFileInputFormat],
       classOf[String],
-      classOf[String])
+      classOf[String],
+      updateConf,
+      minSplits)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/037fe4d2/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 1e8242a..7fbefe1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -177,7 +177,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    *   (a-hdfs-path/part-nnnnn, its content)
    * }}}
    *
-   * @note Small files are preferred, as each file will be loaded fully in memory.
+   * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+   *
+   * @param minSplits A suggestion value of the minimal splitting number for input data.
+   */
+  def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
+    new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
+
+  /**
+   * Read a directory of text files from HDFS, a local file system (available on all nodes),
or any
+   * Hadoop-supported file system URI. Each file is read as a single record and returned
in a
+   * key-value pair, where the key is the path of each file, the value is the content of
each file.
+   *
+   * @see `wholeTextFiles(path: String, minSplits: Int)`.
    */
   def wholeTextFiles(path: String): JavaPairRDD[String, String] =
     new JavaPairRDD(sc.wholeTextFiles(path))

http://git-wip-us.apache.org/repos/asf/spark/blob/037fe4d2/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 4887fb6..80d055a 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.input
 
+import scala.collection.JavaConversions._
+
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.hadoop.mapreduce.JobContext
@@ -44,4 +46,16 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
       context,
       classOf[WholeTextFileRecordReader])
   }
+
+  /**
+   * Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minSplits: Int) {
+    val files = listStatus(context)
+    val totalLen = files.map { file =>
+      if (file.isDir) 0L else file.getLen
+    }.sum
+    val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
+    super.setMaxSplitSize(maxSplitSize)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/037fe4d2/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2d8dfa5..8684b64 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -24,11 +24,18 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 
-import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable,
SparkContext, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
-
-private[spark]
-class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with
Writable)
+import org.apache.spark.input.WholeTextFileInputFormat
+import org.apache.spark.InterruptibleIterator
+import org.apache.spark.Logging
+import org.apache.spark.Partition
+import org.apache.spark.SerializableWritable
+import org.apache.spark.{SparkContext, TaskContext}
+
+private[spark] class NewHadoopPartition(
+    rddId: Int,
+    val index: Int,
+    @transient rawSplit: InputSplit with Writable)
   extends Partition {
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -65,17 +72,19 @@ class NewHadoopRDD[K, V](
   private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
   // private val serializableConf = new SerializableWritable(conf)
 
-  private val jobtrackerId: String = {
+  private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     formatter.format(new Date())
   }
 
-  @transient private val jobId = new JobID(jobtrackerId, id)
+  @transient protected val jobId = new JobID(jobTrackerId, id)
 
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
-    if (inputFormat.isInstanceOf[Configurable]) {
-      inputFormat.asInstanceOf[Configurable].setConf(conf)
+    inputFormat match {
+      case configurable: Configurable =>
+        configurable.setConf(conf)
+      case _ =>
     }
     val jobContext = newJobContext(conf, jobId)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
@@ -91,11 +100,13 @@ class NewHadoopRDD[K, V](
       val split = theSplit.asInstanceOf[NewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)
       val conf = confBroadcast.value.value
-      val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0)
+      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
       val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
       val format = inputFormatClass.newInstance
-      if (format.isInstanceOf[Configurable]) {
-        format.asInstanceOf[Configurable].setConf(conf)
+      format match {
+        case configurable: Configurable =>
+          configurable.setConf(conf)
+        case _ =>
       }
       val reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
@@ -141,3 +152,30 @@ class NewHadoopRDD[K, V](
   def getConf: Configuration = confBroadcast.value.value
 }
 
+private[spark] class WholeTextFileRDD(
+    sc : SparkContext,
+    inputFormatClass: Class[_ <: WholeTextFileInputFormat],
+    keyClass: Class[String],
+    valueClass: Class[String],
+    @transient conf: Configuration,
+    minSplits: Int)
+  extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf)
{
+
+  override def getPartitions: Array[Partition] = {
+    val inputFormat = inputFormatClass.newInstance
+    inputFormat match {
+      case configurable: Configurable =>
+        configurable.setConf(conf)
+      case _ =>
+    }
+    val jobContext = newJobContext(conf, jobId)
+    inputFormat.setMaxSplitSize(jobContext, minSplits)
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val result = new Array[Partition](rawSplits.size)
+    for (i <- 0 until rawSplits.size) {
+      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with
Writable])
+    }
+    result
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/037fe4d2/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index ab2fdac..8d2e9f1 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -626,7 +626,7 @@ public class JavaAPISuite implements Serializable {
     container.put(tempDirName+"/part-00000", new Text(content1).toString());
     container.put(tempDirName+"/part-00001", new Text(content2).toString());
 
-    JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName);
+    JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
     List<Tuple2<String, String>> result = readRDD.collect();
 
     for (Tuple2<String, String> res : result) {

http://git-wip-us.apache.org/repos/asf/spark/blob/037fe4d2/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index e89b296..33d6de9 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -73,7 +73,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll
{
       createNativeFile(dir, filename, contents)
     }
 
-    val res = sc.wholeTextFiles(dir.toString).collect()
+    val res = sc.wholeTextFiles(dir.toString, 3).collect()
 
     assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
       "Number of files read out does not fit with the actual value.")


Mime
View raw message