spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-22681] Accumulator should only be updated once for each task in result stage
Date Tue, 05 Dec 2017 17:15:28 GMT
Repository: spark
Updated Branches:
  refs/heads/master 326f1d672 -> 03fdc92e4


[SPARK-22681] Accumulator should only be updated once for each task in result stage

## What changes were proposed in this pull request?
As the doc says "For accumulator updates performed inside actions only, Spark guarantees that
each task’s update to the accumulator will only be applied once, i.e. restarted tasks will
not update the value."
But currently the code doesn't guarantee this.

## How was this patch tested?
New added tests.

Author: Carson Wang <carson.wang@intel.com>

Closes #19877 from carsonwang/fixAccum.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03fdc92e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03fdc92e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03fdc92e

Branch: refs/heads/master
Commit: 03fdc92e42d260a2b7c0090115f162ba5c091aae
Parents: 326f1d6
Author: Carson Wang <carson.wang@intel.com>
Authored: Tue Dec 5 09:15:22 2017 -0800
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Tue Dec 5 09:15:22 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 14 +++++++++++---
 .../spark/scheduler/DAGSchedulerSuite.scala     | 20 ++++++++++++++++++++
 2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03fdc92e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9153751..c2498d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1187,9 +1187,17 @@ class DAGScheduler(
     // only updated in certain cases.
     event.reason match {
       case Success =>
-        stage match {
-          case rs: ResultStage if rs.activeJob.isEmpty =>
-            // Ignore update if task's job has finished.
+        task match {
+          case rt: ResultTask[_, _] =>
+            val resultStage = stage.asInstanceOf[ResultStage]
+            resultStage.activeJob match {
+              case Some(job) =>
+                // Only update the accumulator once for each result task.
+                if (!job.finished(rt.outputId)) {
+                  updateAccumulators(event)
+                }
+              case None => // Ignore update if task's job has finished.
+            }
           case _ =>
             updateAccumulators(event)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/03fdc92e/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index feefb6a..d812b5b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1832,6 +1832,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext
with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("accumulator not calculated for resubmitted task in result stage") {
+    val accum = AccumulatorSuite.createLongAccum("a")
+    val finalRdd = new MyRDD(sc, 2, Nil)
+    submit(finalRdd, Array(0, 1))
+    // finish the first task
+    completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+    // verify stage exists
+    assert(scheduler.stageIdToStage.contains(0))
+
+    // finish the first task again (simulate a speculative task or a resubmitted task)
+    completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+
+    // The accumulator should only be updated once.
+    assert(accum.value === 1)
+
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success, 42))
+    assertDataStructuresEmpty()
+  }
+
   test("accumulators are updated on exception failures") {
     val acc1 = AccumulatorSuite.createLongAccum("ingenieur")
     val acc2 = AccumulatorSuite.createLongAccum("boulanger")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message