spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-10611] Clone Configuration for each task for NewHadoopRDD
Date Fri, 18 Sep 2015 22:41:04 GMT
Repository: spark
Updated Branches:
  refs/heads/master 348d7c9a9 -> 8074208fa


[SPARK-10611] Clone Configuration for each task for NewHadoopRDD

This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in
the same way SPARK-2546 fixed the issue for HadoopRDD.

Author: Mingyu Kim <mkim@palantir.com>

Closes #8763 from mingyukim/mkim/SPARK-10611.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8074208f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8074208f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8074208f

Branch: refs/heads/master
Commit: 8074208fa47fa654c1055c48cfa0d923edeeb04f
Parents: 348d7c9
Author: Mingyu Kim <mkim@palantir.com>
Authored: Fri Sep 18 15:40:58 2015 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Fri Sep 18 15:40:58 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/BinaryFileRDD.scala    |  5 +--
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 37 ++++++++++++++++----
 2 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 6fec00d..aedced7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T](
 
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
+    val conf = getConf
     inputFormat match {
       case configurable: Configurable =>
-        configurable.setConf(getConf)
+        configurable.setConf(conf)
       case _ =>
     }
-    val jobContext = newJobContext(getConf, jobId)
+    val jobContext = newJobContext(conf, jobId)
     inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 174979a..2872b93 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
   extends Partition {
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
   override def hashCode(): Int = 41 * (41 + rddId) + index
 }
 
@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
 
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
+  private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf",
false)
+
+  def getConf: Configuration = {
+    val conf: Configuration = confBroadcast.value.value
+    if (shouldCloneJobConf) {
+      // Hadoop Configuration objects are not thread-safe, which may lead to various problems
if
+      // one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611).
 This
+      // problem occurs somewhat rarely because most jobs treat the configuration as though
it's
+      // immutable.  One solution, implemented here, is to clone the Configuration object.
+      // Unfortunately, this clone can be very expensive.  To avoid unexpected performance
+      // regressions for workloads and Hadoop versions that do not suffer from these thread-safety
+      // issues, this cloning is disabled by default.
+      NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+        logDebug("Cloning Hadoop Configuration")
+        new Configuration(conf)
+      }
+    } else {
+      conf
+    }
+  }
+
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
     inputFormat match {
@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
     val iter = new Iterator[(K, V)] {
       val split = theSplit.asInstanceOf[NewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)
-      val conf = confBroadcast.value.value
+      val conf = getConf
 
       val inputMetrics = context.taskMetrics
         .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
@@ -230,12 +250,16 @@ class NewHadoopRDD[K, V](
     super.persist(storageLevel)
   }
 
-
-  def getConf: Configuration = confBroadcast.value.value
 }
 
 private[spark] object NewHadoopRDD {
   /**
+   * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
+   * Therefore, we synchronize on this lock before calling new Configuration().
+   */
+  val CONFIGURATION_INSTANTIATION_LOCK = new Object()
+
+  /**
    * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit
to
    * the given function rather than the index of the partition.
    */
@@ -268,12 +292,13 @@ private[spark] class WholeTextFileRDD(
 
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
+    val conf = getConf
     inputFormat match {
       case configurable: Configurable =>
-        configurable.setConf(getConf)
+        configurable.setConf(conf)
       case _ =>
     }
-    val jobContext = newJobContext(getConf, jobId)
+    val jobContext = newJobContext(conf, jobId)
     inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message