spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration throws NPE
Date Tue, 03 Oct 2017 23:53:48 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 5e3f2544a -> 81232ce03


[SPARK-20466][CORE] HadoopRDD#addLocalConfiguration throws NPE

## What changes were proposed in this pull request?

Fix for SPARK-20466, full description of the issue in the JIRA. To summarize, `HadoopRDD`
uses a metadata cache to cache `JobConf` objects. The cache uses soft-references, which means
the JVM can delete entries from the cache whenever there is GC pressure. `HadoopRDD#getJobConf`
had a bug where it would check if the cache contained the `JobConf`, if it did it would get
the `JobConf` from the cache and return it. This doesn't work when soft-references are used
as the JVM can delete the entry between the existence check and the get call.

## How was this patch tested?

Haven't thought of a good way to test this yet given the issue only occurs sometimes, and
happens during high GC pressure. Was thinking of using mocks to verify `#getJobConf` is doing
the right thing. I deleted the method `HadoopRDD#containsCachedMetadata` so that we don't
hit this issue again.

Author: Sahil Takiar <stakiar@cloudera.com>

Closes #19413 from sahilTakiar/master.

(cherry picked from commit e36ec38d89472df0dfe12222b6af54cd6eea8e98)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81232ce0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81232ce0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81232ce0

Branch: refs/heads/branch-2.2
Commit: 81232ce0306556fc1472da30d706abd1fb4bf687
Parents: 5e3f254
Author: Sahil Takiar <stakiar@cloudera.com>
Authored: Tue Oct 3 16:53:32 2017 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Tue Oct 3 16:53:43 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 33 +++++++++++---------
 1 file changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/81232ce0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 76ea8b8..23b3442 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -157,20 +157,25 @@ class HadoopRDD[K, V](
       if (conf.isInstanceOf[JobConf]) {
         logDebug("Re-using user-broadcasted JobConf")
         conf.asInstanceOf[JobConf]
-      } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
-        logDebug("Re-using cached JobConf")
-        HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
       } else {
-        // Create a JobConf that will be cached and used across this RDD's getJobConf() calls
in the
-        // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
-        // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary
objects.
-        // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
-        HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
-          logDebug("Creating new JobConf and caching it for later re-use")
-          val newJobConf = new JobConf(conf)
-          initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
-          HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
-          newJobConf
+        Option(HadoopRDD.getCachedMetadata(jobConfCacheKey))
+          .map { conf =>
+            logDebug("Re-using cached JobConf")
+            conf.asInstanceOf[JobConf]
+          }
+          .getOrElse {
+            // Create a JobConf that will be cached and used across this RDD's getJobConf()
calls in
+            // the local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+            // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary
+            // objects. Synchronize to prevent ConcurrentModificationException (SPARK-1097,
+            // HADOOP-10456).
+            HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+              logDebug("Creating new JobConf and caching it for later re-use")
+              val newJobConf = new JobConf(conf)
+              initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
+              HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+              newJobConf
+          }
         }
       }
     }
@@ -360,8 +365,6 @@ private[spark] object HadoopRDD extends Logging {
    */
   def getCachedMetadata(key: String): Any = SparkEnv.get.hadoopJobMetadata.get(key)
 
-  def containsCachedMetadata(key: String): Boolean = SparkEnv.get.hadoopJobMetadata.containsKey(key)
-
   private def putCachedMetadata(key: String, value: Any): Unit =
     SparkEnv.get.hadoopJobMetadata.put(key, value)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message