spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iras...@apache.org
Subject spark git commit: [SPARK-23948] Trigger mapstage's job listener in submitMissingTasks
Date Tue, 17 Apr 2018 20:53:49 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 564019b92 -> 6b99d5bc3


[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks

## What changes were proposed in this pull request?

SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`,
`markMapStageJobAsFinished` is called only in (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933
and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314);

But think about below scenario:
1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0;
2. We submit stage1 by `submitMapStage`;
3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got resubmitted as stage0_1
and stage1_1;
4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but stage1 is
not inside `runningStages`. So even though all splits(including the speculated tasks) in stage1
succeeded, job listener in stage1 will not be called;
5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there is no missing
tasks. But in current code, job listener is not triggered.

We should call the job listener for map stage in `5`.

## How was this patch tested?

Not added yet.

Author: jinxing <jinxing6042126.com>

(cherry picked from commit 3990daaf3b6ca2c5a9f7790030096262efb12cb2)

Author: jinxing <jinxing6042@126.com>

Closes #21085 from squito/cp.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b99d5bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b99d5bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b99d5bc

Branch: refs/heads/branch-2.3
Commit: 6b99d5bc3f3898a0aff30468a623a3f64bb20b62
Parents: 564019b
Author: jinxing <jinxing6042@126.com>
Authored: Tue Apr 17 15:53:29 2018 -0500
Committer: Imran Rashid <irashid@cloudera.com>
Committed: Tue Apr 17 15:53:29 2018 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 33 +++++++------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 52 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6b99d5bc/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8c46a84..78b6b34 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1092,17 +1092,16 @@ class DAGScheduler(
       // the stage as completed here in case there are no tasks to run
       markStageAsFinished(stage, None)
 
-      val debugString = stage match {
+      stage match {
         case stage: ShuffleMapStage =>
-          s"Stage ${stage} is actually done; " +
-            s"(available: ${stage.isAvailable}," +
-            s"available outputs: ${stage.numAvailableOutputs}," +
-            s"partitions: ${stage.numPartitions})"
+          logDebug(s"Stage ${stage} is actually done; " +
+              s"(available: ${stage.isAvailable}," +
+              s"available outputs: ${stage.numAvailableOutputs}," +
+              s"partitions: ${stage.numPartitions})")
+          markMapStageJobsAsFinished(stage)
         case stage : ResultStage =>
-          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
+          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
       }
-      logDebug(debugString)
-
       submitWaitingChildStages(stage)
     }
   }
@@ -1307,13 +1306,7 @@ class DAGScheduler(
                   shuffleStage.findMissingPartitions().mkString(", "))
                 submitStage(shuffleStage)
               } else {
-                // Mark any map-stage jobs waiting on this stage as finished
-                if (shuffleStage.mapStageJobs.nonEmpty) {
-                  val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
-                  for (job <- shuffleStage.mapStageJobs) {
-                    markMapStageJobAsFinished(job, stats)
-                  }
-                }
+                markMapStageJobsAsFinished(shuffleStage)
                 submitWaitingChildStages(shuffleStage)
               }
             }
@@ -1433,6 +1426,16 @@ class DAGScheduler(
     }
   }
 
+  private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit
= {
+    // Mark any map-stage jobs waiting on this stage as finished
+    if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
+      val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
+      for (job <- shuffleStage.mapStageJobs) {
+        markMapStageJobAsFinished(job, stats)
+      }
+    }
+  }
+
   /**
    * Responds to an executor being lost. This is called inside the event loop, so it assumes
it can
    * modify the scheduler's internal state. Use executorLost() to post a loss event from
outside.

http://git-wip-us.apache.org/repos/asf/spark/blob/6b99d5bc/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d812b5b..8b6ec37 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2146,6 +2146,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext
with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("Trigger mapstage's job listener in submitMissingTasks") {
+    val rdd1 = new MyRDD(sc, 2, Nil)
+    val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+    val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
+    val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+
+    val listener1 = new SimpleListener
+    val listener2 = new SimpleListener
+
+    submitMapStage(dep1, listener1)
+    submitMapStage(dep2, listener2)
+
+    // Complete the stage0.
+    assert(taskSets(0).stageId === 0)
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+      (Success, makeMapStatus("hostB", rdd1.partitions.length))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
+        HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+    assert(listener1.results.size === 1)
+
+    // When attempting stage1, trigger a fetch failure.
+    assert(taskSets(1).stageId === 1)
+    complete(taskSets(1), Seq(
+      (Success, makeMapStatus("hostC", rdd2.partitions.length)),
+      (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
+    scheduler.resubmitFailedStages()
+    // Stage1 listener should not have a result yet
+    assert(listener2.results.size === 0)
+
+    // Speculative task succeeded in stage1.
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(1),
+      Success,
+      makeMapStatus("hostD", rdd2.partitions.length)))
+    // stage1 listener still should not have a result, though there's no missing partitions
+    // in it. Because stage1 has been failed and is not inside `runningStages` at this moment.
+    assert(listener2.results.size === 0)
+
+    // Stage0 should now be running as task set 2; make its task succeed
+    assert(taskSets(2).stageId === 0)
+    complete(taskSets(2), Seq(
+      (Success, makeMapStatus("hostC", rdd2.partitions.length))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
+        Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+
+    // After stage0 is finished, stage1 will be submitted and found there is no missing
+    // partitions in it. Then listener got triggered.
+    assert(listener2.results.size === 1)
+    assertDataStructuresEmpty()
+  }
+
   /**
    * In this test, we run a map stage where one of the executors fails but we still receive
a
    * "zombie" complete message from that executor. We want to make sure the stage is not
reported


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message