spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: Merge pull request #124 from tgravescs/sparkHadoopUtilFix
Date Mon, 04 Nov 2013 07:45:32 GMT
Updated Branches:
  refs/heads/branch-0.8 ba0e8584a -> ec0e4f057


Merge pull request #124 from tgravescs/sparkHadoopUtilFix

Pull SparkHadoopUtil out of SparkEnv (jira SPARK-886)

Having the logic to initialize the correct SparkHadoopUtil in SparkEnv prevents it from being
used until after the SparkContext is initialized.   This causes issues like https://spark-project.atlassian.net/browse/SPARK-886.
 It also makes it hard to use in singleton objects.  For instance I want to use it in the
security code.

(cherry picked from commit 33de11c51dd2dbcbbf1801c54d9ce5ffaa324657)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ec0e4f05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ec0e4f05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ec0e4f05

Branch: refs/heads/branch-0.8
Commit: ec0e4f0573aabf751d2a3046d058b3fd509aac54
Parents: ba0e858
Author: Matei Zaharia <matei@eecs.berkeley.edu>
Authored: Wed Oct 30 16:58:27 2013 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Sun Nov 3 23:45:23 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 11 ++++-----
 .../main/scala/org/apache/spark/SparkEnv.scala  | 17 +++----------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 26 +++++++++++++++-----
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  7 +++---
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  7 +++---
 .../spark/scheduler/InputFormatInfo.scala       |  7 +++---
 .../scala/org/apache/spark/util/Utils.scala     |  3 +--
 .../org/apache/spark/examples/SparkHdfsLR.scala |  3 ++-
 8 files changed, 43 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec0e4f05/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bfe1c13..b67d02f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
 
 import org.apache.mesos.MesosNativeLibrary
 
-import org.apache.spark.deploy.LocalSparkCluster
+import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
@@ -248,7 +248,7 @@ class SparkContext(
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
*/
   val hadoopConfiguration = {
     val env = SparkEnv.get
-    val conf = env.hadoop.newConfiguration()
+    val conf = SparkHadoopUtil.get.newConfiguration()
     // Explicitly check for S3 environment variables
     if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
         System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
@@ -382,7 +382,7 @@ class SparkContext(
       minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
     // Add necessary security credentials to the JobConf before broadcasting it.
-    SparkEnv.get.hadoop.addCredentials(conf)
+    SparkHadoopUtil.get.addCredentials(conf)
     new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
@@ -700,7 +700,7 @@ class SparkContext(
         val uri = new URI(path)
         key = uri.getScheme match {
           case null | "file" =>
-            if (env.hadoop.isYarnMode()) {
+            if (SparkHadoopUtil.get.isYarnMode()) {
               // In order for this to work on yarn the user must specify the --addjars option
to
               // the client to upload the file into the distributed cache to make it show
up in the
               // current working directory.
@@ -935,9 +935,8 @@ class SparkContext(
    * prevent accidental overriding of checkpoint files in the existing directory.
    */
   def setCheckpointDir(dir: String, useExisting: Boolean = false) {
-    val env = SparkEnv.get
     val path = new Path(dir)
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
+    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
     if (!useExisting) {
       if (fs.exists(path)) {
         throw new Exception("Checkpoint directory '" + path + "' already exists.")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec0e4f05/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 29968c2..062852a 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -25,13 +25,13 @@ import akka.remote.RemoteActorRefProvider
 
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
 import org.apache.spark.network.ConnectionManager
 import org.apache.spark.serializer.{Serializer, SerializerManager}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.api.python.PythonWorkerFactory
 
+import com.google.common.collect.MapMaker
 
 /**
  * Holds all the runtime environment objects for a running Spark instance (either master
or worker),
@@ -58,18 +58,9 @@ class SparkEnv (
 
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
-  val hadoop = {
-    val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
-    if(yarnMode) {
-      try {
-        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
-      } catch {
-        case th: Throwable => throw new SparkException("Unable to load YARN support",
th)
-      }
-    } else {
-      new SparkHadoopUtil
-    }
-  }
+  // A general, soft-reference map for metadata needed during HadoopRDD split computation
+  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
+  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
   def stop() {
     pythonWorkers.foreach { case(key, worker) => worker.stop() }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec0e4f05/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 83cd3df..6bc846a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -20,17 +20,13 @@ package org.apache.spark.deploy
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 
-import com.google.common.collect.MapMaker
-
+import org.apache.spark.SparkException
 
 /**
  * Contains util methods to interact with Hadoop from Spark.
  */
 private[spark]
 class SparkHadoopUtil {
-  // A general, soft-reference map for metadata needed during HadoopRDD split computation
-  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
-  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
   /**
    * Return an appropriate (subclass) of Configuration. Creating config can initializes some
Hadoop
@@ -45,5 +41,23 @@ class SparkHadoopUtil {
   def addCredentials(conf: JobConf) {}
 
   def isYarnMode(): Boolean = { false }
-
+}
+  
+object SparkHadoopUtil {
+  private val hadoop = { 
+    val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+    if (yarnMode) {
+      try {
+        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+      } catch {
+       case th: Throwable => throw new SparkException("Unable to load YARN support", th)
+      }
+    } else {
+      new SparkHadoopUtil
+    }
+  }
+  
+  def get: SparkHadoopUtil = {
+    hadoop
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec0e4f05/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index ccaaecb..d3033ea 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.rdd
 
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.{NullWritable, BytesWritable}
@@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging {
   def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T])
{
     val env = SparkEnv.get
     val outputDir = new Path(path)
-    val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
+    val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
 
     val finalOutputName = splitIdToFile(ctx.partitionId)
     val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging {
 
   def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
     val env = SparkEnv.get
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
+    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
     val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
@@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging {
     val sc = new SparkContext(cluster, "CheckpointRDD Test")
     val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
     val path = new Path(hdfsPath, "temp")
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
+    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
     sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
     val cpRDD = new CheckpointRDD[Int](sc, path.toString)
     assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not
the same")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec0e4f05/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index fad042c..32901a5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.NextIterator
 import org.apache.hadoop.conf.{Configuration, Configurable}
 
@@ -198,10 +199,10 @@ private[spark] object HadoopRDD {
    * The three methods below are helpers for accessing the local map, a property of the SparkEnv
of
    * the local process.
    */
-  def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+  def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key)
 
-  def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+  def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key)
 
   def putCachedMetadata(key: String, value: Any) =
-    SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
+    SparkEnv.get.hadoopJobMetadata.put(key, value)
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec0e4f05/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 370ccd1..1791ee6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
 import scala.collection.immutable.Set
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 import org.apache.hadoop.security.UserGroupInformation
@@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz:
Cl
 
   // This method does not expect failures, since validate has already passed ...
   private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
-    val env = SparkEnv.get
     val conf = new JobConf(configuration)
-    env.hadoop.addCredentials(conf)
+    SparkHadoopUtil.get.addCredentials(conf)
     FileInputFormat.setInputPaths(conf, path)
 
     val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz:
Cl
 
   // This method does not expect failures, since validate has already passed ...
   private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
-    val env = SparkEnv.get
     val jobConf = new JobConf(configuration)
-    env.hadoop.addCredentials(jobConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
     FileInputFormat.setInputPaths(jobConf, path)
 
     val instance: org.apache.hadoop.mapred.InputFormat[_, _] =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec0e4f05/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a3b3968..fd2811e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -280,9 +280,8 @@ private[spark] object Utils extends Logging {
         }
       case _ =>
         // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and
others
-        val env = SparkEnv.get
         val uri = new URI(url)
-        val conf = env.hadoop.newConfiguration()
+        val conf = SparkHadoopUtil.get.newConfiguration()
         val fs = FileSystem.get(uri, conf)
         val in = fs.open(new Path(uri))
         val out = new FileOutputStream(tempFile)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec0e4f05/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 6466828..86dd9ca 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -21,6 +21,7 @@ import java.util.Random
 import scala.math.exp
 import org.apache.spark.util.Vector
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.InputFormatInfo
 
 /**
@@ -51,7 +52,7 @@ object SparkHdfsLR {
       System.exit(1)
     }
     val inputPath = args(1)
-    val conf = SparkEnv.get.hadoop.newConfiguration()
+    val conf = SparkHadoopUtil.get.newConfiguration()
     val sc = new SparkContext(args(0), "SparkHdfsLR",
       System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), 
       InputFormatInfo.computePreferredLocations(


Mime
View raw message