spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-20904][CORE] Don't report task failures to driver during shutdown.
Date Sun, 23 Jul 2017 15:24:11 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 da403b953 -> 62ca13dca


[SPARK-20904][CORE] Don't report task failures to driver during shutdown.

Executors run a thread pool with daemon threads to run tasks. This means
that those threads remain active when the JVM is shutting down, meaning
those tasks are affected by code that runs in shutdown hooks.

So if a shutdown hook messes with something that the task is using (e.g.
an HDFS connection), the task will fail and will report that failure to
the driver. That will make the driver mark the task as failed regardless
of what caused the executor to shut down. So, for example, if YARN pre-empted
that executor, the driver would consider that task failed when it should
instead ignore the failure.

This change avoids reporting failures to the driver when shutdown hooks
are executing; this fixes the YARN preemption accounting, and doesn't really
change things much for other scenarios, other than reporting a more generic
error ("Executor lost") when the executor shuts down unexpectedly - which
is arguably more correct.

Tested with a hacky app running on spark-shell that tried to cause failures
only when shutdown hooks were running, verified that preemption didn't cause
the app to fail because of task failures exceeding the threshold.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18594 from vanzin/SPARK-20904.

(cherry picked from commit cecd285a2aabad4e7db5a3d18944b87fbc4eee6c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62ca13dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62ca13dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62ca13dc

Branch: refs/heads/branch-2.2
Commit: 62ca13dcaf79b85fca02de5628b607196534c605
Parents: da403b9
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Sun Jul 23 23:23:13 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Sun Jul 23 23:23:59 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    | 47 ++++++++++++--------
 1 file changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/62ca13dc/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d54dd2d..e53d91d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -467,29 +467,38 @@ private[spark] class Executor(
           // the default uncaught exception handler, which will terminate the Executor.
           logError(s"Exception in $taskName (TID $taskId)", t)
 
-          // Collect latest accumulator values to report back to the driver
-          val accums: Seq[AccumulatorV2[_, _]] =
-            if (task != null) {
-              task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
-              task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
-              task.collectAccumulatorUpdates(taskFailed = true)
-            } else {
-              Seq.empty
-            }
+          // SPARK-20904: Do not report failure to driver if if happened during shut down.
Because
+          // libraries may set up shutdown hooks that race with running tasks during shutdown,
+          // spurious failures may occur and can result in improper accounting in the driver
(e.g.
+          // the task failure would not be ignored if the shutdown happened because of premption,
+          // instead of an app issue).
+          if (!ShutdownHookManager.inShutdown()) {
+            // Collect latest accumulator values to report back to the driver
+            val accums: Seq[AccumulatorV2[_, _]] =
+              if (task != null) {
+                task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
+                task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
+                task.collectAccumulatorUpdates(taskFailed = true)
+              } else {
+                Seq.empty
+              }
 
-          val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
+            val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
 
-          val serializedTaskEndReason = {
-            try {
-              ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
-            } catch {
-              case _: NotSerializableException =>
-                // t is not serializable so just send the stacktrace
-                ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
+            val serializedTaskEndReason = {
+              try {
+                ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
+              } catch {
+                case _: NotSerializableException =>
+                  // t is not serializable so just send the stacktrace
+                  ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
+              }
             }
+            setTaskFinishedAndClearInterruptStatus()
+            execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
+          } else {
+            logInfo("Not reporting error to driver during JVM shutdown.")
           }
-          setTaskFinishedAndClearInterruptStatus()
-          execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
 
           // Don't forcibly exit unless the exception was inherently fatal, to avoid
           // stopping other tasks unnecessarily.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message