spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From squito <...@git.apache.org>
Subject [GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Date Tue, 24 Apr 2018 15:35:33 GMT
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183779968
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task
attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really
happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between
each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx",
1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    --- End diff --
    
    good point, fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message