spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sahil Takiar (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE
Date Mon, 25 Sep 2017 18:05:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170759#comment-16170759
] 

Sahil Takiar edited comment on SPARK-20466 at 9/25/17 6:04 PM:
---------------------------------------------------------------

I just hit this issue in Hive-on-Spark when running some TPC-DS queries. It seems to be intermittent,
re-tries of the task succeed. I have a very similar stack trace:

{code}
java.lang.NullPointerException
        at org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:364)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.&lt;init&gt;(HadoopRDD.scala:238)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

The {{JobConf}} object can be {{null}} if {{HadoopRDD#getJobConf}} returns {{null}}. Looks
like there is a race condition in {{#getJobConf}} [here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L160].
The method {{HadoopRDD.containsCachedMetadata}} looks into an internal metadata cache - {{SparkEnv#hadoopJobMetadata}}.
This cache uses soft references, so the JVM may reclaim entries from the map whenever there
is some GC pressure. In which case, any get request on the key will return a {{null}}. The
race condition is that the {{#getJobConf}} method first checks if the cache contains the key,
and then retrieves it. In between the {{containsKey}} and {{get}} its possible the the key
is GCed by the JVM. This would cause {{#getJobConf}} to return {{null}}.

The fix should be pretty simple, don't use the {{containsKey(key)}} method on the cache, just
run a {{get(key)}} and check if it returns {{null}} or not.

Happy to create a PR if other agrees with my analysis.


was (Author: stakiar):
I just hit this issue in Hive-on-Spark when running some TPC-DS queries. It seems to be intermittent,
re-tries of the task succeed. I have a very similar stack trace:

{code}
java.lang.NullPointerException
        at org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:364)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.&lt;init&gt;(HadoopRDD.scala:238)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

The {{JobConf}} object can be {{null}} if {{HadoopRDD#getJobConf}} returns {{null}}. Looks
like there is a race condition in {{#getJobConf}} [here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L160].
The method {{HadoopRDD.containsCachedMetadata}} looks into an internal metadata cache - {{SparkEnv#hadoopJobMetadata}}.
This cache uses soft references, so the JVM may reclaim entries from the map whenever there
is some GC pressure. In which case, any get request on the key will return a {{null}}. The
race condition is that the {{#getJobConf}} method first checks if the cache contains the key,
and then retrieves. In between the {{containsKey}} and {{get}} its possible the the key is
GCed by the JVM. This would cause {{#getJobConf}} to return {{null}}.

The fix should be pretty simple, don't use the {{containsKey(key)}} method on the cache, just
run a {{get(key)}} and check if it returns {{null}} or not.

Happy to create a PR if other agrees with my analysis.

> HadoopRDD#addLocalConfiguration throws NPE
> ------------------------------------------
>
>                 Key: SPARK-20466
>                 URL: https://issues.apache.org/jira/browse/SPARK-20466
>             Project: Spark
>          Issue Type: Bug
>          Components: YARN
>    Affects Versions: 2.0.2
>            Reporter: liyunzhang_intel
>            Priority: Minor
>         Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 16.0 (TID
986)$ 
> java.lang.NullPointerException$
> ^Iat org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>    /** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId:
Int,
>                             conf: JobConf) {
>     val jobID = new JobID(jobTrackerId, jobId)
>     val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId)
>     if ( conf != null){
>     conf.set("mapred.tip.id", taId.getTaskID.toString)
>     conf.set("mapred.task.id", taId.toString)
>     conf.setBoolean("mapred.task.is.map", true)
>     conf.setInt("mapred.task.partition", splitId)
>     conf.set("mapred.job.id", jobID.toString)
>    }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message