spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-25568][CORE] Continue to update the remaining accumulators when failing to update one accumulator
Date Sun, 30 Sep 2018 01:11:03 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 eb78380c0 -> 73408f085


[SPARK-25568][CORE] Continue to update the remaining accumulators when failing to update one
accumulator

## What changes were proposed in this pull request?

Since we don't fail a job when `AccumulatorV2.merge` fails, we should try to update the remaining
accumulators so that they can still report correct values.

## How was this patch tested?

The new unit test.

Closes #22586 from zsxwing/SPARK-25568.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
(cherry picked from commit b6b8a6632e2b6e5482aaf4bfa093700752a9df80)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73408f08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73408f08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73408f08

Branch: refs/heads/branch-2.3
Commit: 73408f08589201b1035636bf3e0cdf4aae5e87dd
Parents: eb78380
Author: Shixiong Zhu <zsxwing@gmail.com>
Authored: Sat Sep 29 18:10:04 2018 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Sat Sep 29 18:10:58 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 20 ++++++++++++++------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 20 ++++++++++++++++++++
 docs/rdd-programming-guide.md                   |  4 ++++
 3 files changed, 38 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73408f08/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3a99d13..402ec3d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1119,9 +1119,10 @@ class DAGScheduler(
   private def updateAccumulators(event: CompletionEvent): Unit = {
     val task = event.task
     val stage = stageIdToStage(task.stageId)
-    try {
-      event.accumUpdates.foreach { updates =>
-        val id = updates.id
+
+    event.accumUpdates.foreach { updates =>
+      val id = updates.id
+      try {
         // Find the corresponding accumulator on the driver and update it
         val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
           case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
@@ -1135,10 +1136,17 @@ class DAGScheduler(
           event.taskInfo.setAccumulables(
             acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
         }
+      } catch {
+        case NonFatal(e) =>
+          // Log the class name to make it easy to find the bad implementation
+          val accumClassName = AccumulatorContext.get(id) match {
+            case Some(accum) => accum.getClass.getName
+            case None => "Unknown class"
+          }
+          logError(
+            s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}",
+            e)
       }
-    } catch {
-      case NonFatal(e) =>
-        logError(s"Failed to update accumulators for task ${task.partitionId}", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/73408f08/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index efdd11e..4bfe618 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1785,6 +1785,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext
with TimeLi
     assert(sc.parallelize(1 to 10, 2).count() === 10)
   }
 
+  test("misbehaved accumulator should not impact other accumulators") {
+    val bad = new LongAccumulator {
+      override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit = {
+        throw new DAGSchedulerSuiteDummyException
+      }
+    }
+    sc.register(bad, "bad")
+    val good = sc.longAccumulator("good")
+
+    sc.parallelize(1 to 10, 2).foreach { item =>
+      bad.add(1)
+      good.add(1)
+    }
+
+    // This is to ensure the `bad` accumulator did fail to update its value
+    assert(bad.value == 0L)
+    // Should be able to update the "good" accumulator
+    assert(good.value == 10L)
+  }
+
   /**
    * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
    *  Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.

http://git-wip-us.apache.org/repos/asf/spark/blob/73408f08/docs/rdd-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/rdd-programming-guide.md b/docs/rdd-programming-guide.md
index 29af159..a2db1b2 100644
--- a/docs/rdd-programming-guide.md
+++ b/docs/rdd-programming-guide.md
@@ -1465,6 +1465,10 @@ jsc.sc().register(myVectorAcc, "MyVectorAcc1");
 
 Note that, when programmers define their own type of AccumulatorV2, the resulting type can
be different than that of the elements added.
 
+*Warning*: When a Spark task finishes, Spark will try to merge the accumulated updates in
this task to an accumulator.
+If it fails, Spark will ignore the failure and still mark the task successful and continue
to run other tasks. Hence,
+a buggy accumulator will not impact a Spark job, but it may not get updated correctly although
a Spark job is successful.
+
 </div>
 
 <div data-lang="python"  markdown="1">


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message