spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/9] git commit: Add a cache for HadoopRDD metadata needed during computation.
Date Sun, 06 Oct 2013 02:29:08 GMT
Add a cache for HadoopRDD metadata needed during computation.

Currently, the cache is in SparkHadoopUtils, since it's conveniently a member of the SparkEnv.

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a6eeb5ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a6eeb5ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a6eeb5ff

Branch: refs/heads/master
Commit: a6eeb5ffd54956667ec4e793149fdab90041ad6c
Parents: be0fc72
Author: Harvey <h.feng@berkeley.edu>
Authored: Sun Sep 22 03:05:02 2013 -0700
Committer: Harvey <h.feng@berkeley.edu>
Committed: Sun Sep 22 03:09:17 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |  4 +-
 .../scala/org/apache/spark/SparkContext.scala   | 29 +++++---
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  6 ++
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 77 +++++++++++++-------
 4 files changed, 79 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a6eeb5ff/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 68b99ca..3d36761 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -26,7 +26,9 @@ import org.apache.spark.rdd.RDD
     sure a node doesn't load two copies of an RDD at once.
   */
 private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-  private val loading = new HashSet[String]
+
+  /** Keys of RDD splits that are being computed/loaded. */
+  private val loading = new HashSet[String]()
 
   /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel:
StorageLevel)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a6eeb5ff/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 894cc67..47fe743 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
 
 import org.apache.mesos.MesosNativeLibrary
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.LocalSparkCluster
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
@@ -342,16 +343,26 @@ class SparkContext(
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
-      ) : RDD[(K, V)] = {
-    val broadcastHadoopConfiguration = broadcast(new SerializableWritable(hadoopConfiguration))
+      ): RDD[(K, V)] = {
+    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+    hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+  }
+
+  /**
+   * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
+   * that has already been broadcast, assuming that it's safe to use it to construct a
+   * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can
be resued).
+   */
+  def hadoopFile[K, V](
+      path: String,
+      confBroadcast: Broadcast[SerializableWritable[Configuration]],
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int
+      ): RDD[(K, V)] = {
     new HadoopFileRDD(
-      this,
-      path,
-      broadcastHadoopConfiguration,
-      inputFormatClass,
-      keyClass,
-      valueClass,
-      minSplits)
+      this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a6eeb5ff/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 0a5f4c3..f416b95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -16,6 +16,9 @@
  */
 
 package org.apache.spark.deploy
+
+import com.google.common.collect.MapMaker
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 
@@ -24,6 +27,9 @@ import org.apache.hadoop.mapred.JobConf
  * Contains util methods to interact with Hadoop from spark.
  */
 class SparkHadoopUtil {
+  // A general map for metadata needed during HadoopRDD split computation (e.g., HadoopFileRDD
uses
+  // this to cache JobConfs).
+  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
   // Return an appropriate (subclass) of Configuration. Creating config can initializes some
hadoop subsystems
   def newConfiguration(): Configuration = new Configuration()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a6eeb5ff/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e259ef5..1ae8e41 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -37,7 +37,7 @@ import org.apache.hadoop.conf.{Configuration, Configurable}
  * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local
file
  * system, or S3).
  * This accepts a general, broadcasted Hadoop Configuration because those tend to remain
the same
- * across multiple reads; the 'path' is the only variable that is different acrodd new JobConfs
+ * across multiple reads; the 'path' is the only variable that is different across new JobConfs
  * created from the Configuration.
  */
 class HadoopFileRDD[K, V](
@@ -50,13 +50,18 @@ class HadoopFileRDD[K, V](
     minSplits: Int)
   extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
 
-  private val localJobConf: JobConf = {
-    val jobConf = new JobConf(hadoopConfBroadcast.value.value)
-    FileInputFormat.setInputPaths(jobConf, path)
-    jobConf
-  }
+  private val jobConfCacheKey = "rdd_%d_job_conf".format(id)
 
-  override def getJobConf: JobConf = localJobConf
+  override def getJobConf(): JobConf = {
+    if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      val newJobConf = new JobConf(hadoopConfBroadcast.value.value)
+      FileInputFormat.setInputPaths(newJobConf, path)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
 }
 
 /**
@@ -71,10 +76,13 @@ class HadoopDatasetRDD[K, V](
     minSplits: Int)
   extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
 
+  // Add necessary security credentials to the JobConf before broadcasting it.
+  SparkEnv.get.hadoop.addCredentials(conf)
+
   // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
   private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
 
-  override def getJobConf: JobConf = confBroadcast.value.value
+  override def getJobConf(): JobConf = confBroadcast.value.value
 }
 
 /**
@@ -101,20 +109,31 @@ abstract class HadoopRDD[K, V](
     minSplits: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
-  // The JobConf used to obtain input splits for Hadoop reads. The subclass is responsible
for
-  // determining how the JobConf is initialized.
-  protected def getJobConf: JobConf
+  private val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+  // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+  protected def getJobConf(): JobConf
 
-  def getConf: Configuration = getJobConf
+  def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+    if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+      return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K,
V]]
+    }
+    val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]],
conf)
+      .asInstanceOf[InputFormat[K, V]]
+    if (newInputFormat.isInstanceOf[Configurable]) {
+      newInputFormat.asInstanceOf[Configurable].setConf(conf)
+    }
+    HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+    return newInputFormat
+  }
 
   override def getPartitions: Array[Partition] = {
-    val env = SparkEnv.get
-    env.hadoop.addCredentials(getJobConf)
-    val inputFormat = createInputFormat(getJobConf)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
     if (inputFormat.isInstanceOf[Configurable]) {
-      inputFormat.asInstanceOf[Configurable].setConf(getJobConf)
+      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
     }
-    val inputSplits = inputFormat.getSplits(getJobConf, minSplits)
+    val inputSplits = inputFormat.getSplits(jobConf, minSplits)
     val array = new Array[Partition](inputSplits.size)
     for (i <- 0 until inputSplits.size) {
       array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -122,21 +141,14 @@ abstract class HadoopRDD[K, V](
     array
   }
 
-  def createInputFormat(conf: JobConf): InputFormat[K, V] = {
-    ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
-      .asInstanceOf[InputFormat[K, V]]
-  }
-
   override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K,
V)] {
     val split = theSplit.asInstanceOf[HadoopPartition]
     logInfo("Input split: " + split.inputSplit)
     var reader: RecordReader[K, V] = null
 
-    val fmt = createInputFormat(getJobConf)
-    if (fmt.isInstanceOf[Configurable]) {
-      fmt.asInstanceOf[Configurable].setConf(getJobConf)
-    }
-    reader = fmt.getRecordReader(split.inputSplit.value, getJobConf, Reporter.NULL)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
+    reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
 
     // Register an on-task-completion callback to close the input stream.
     context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -172,4 +184,15 @@ abstract class HadoopRDD[K, V](
   override def checkpoint() {
     // Do nothing. Hadoop RDD should not be checkpointed.
   }
+
+  def getConf: Configuration = getJobConf()
+}
+
+object HadoopRDD {
+  def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+  def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+  def putCachedMetadata(key: String, value: Any) =
+    SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
 }


Mime
View raw message