spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From squito <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-8103][core] DAGScheduler should not sub...
Date Fri, 17 Jul 2015 19:56:47 GMT
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6750#discussion_r34925317
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1128,38 +1128,47 @@ class DAGScheduler(
             val failedStage = stageIdToStage(task.stageId)
             val mapStage = shuffleToMapStage(shuffleId)
     
    -        // It is likely that we receive multiple FetchFailed for a single stage (because
we have
    -        // multiple tasks running concurrently on different executors). In that case,
it is possible
    -        // the fetch failure has already been handled by the scheduler.
    -        if (runningStages.contains(failedStage)) {
    -          logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
    -            s"due to a fetch failure from $mapStage (${mapStage.name})")
    -          markStageAsFinished(failedStage, Some(failureMessage))
    -        }
    +        if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
    +          logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt"
+
    +            s" ${task.stageAttemptId}, which has already failed")
    +        } else {
     
    -        if (disallowStageRetryForTest) {
    -          abortStage(failedStage, "Fetch failure will not retry stage due to testing
config")
    -        } else if (failedStages.isEmpty) {
    -          // Don't schedule an event to resubmit failed stages if failed isn't empty,
because
    -          // in that case the event will already have been scheduled.
    -          // TODO: Cancel running tasks in the stage
    -          logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -            s"$failedStage (${failedStage.name}) due to fetch failure")
    -          messageScheduler.schedule(new Runnable {
    -            override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
    -          }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
    -        }
    -        failedStages += failedStage
    -        failedStages += mapStage
    -        // Mark the map whose fetch failed as broken in the map stage
    -        if (mapId != -1) {
    -          mapStage.removeOutputLoc(mapId, bmAddress)
    -          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    -        }
    +          // It is likely that we receive multiple FetchFailed for a single stage (because
we have
    +          // multiple tasks running concurrently on different executors). In that case,
it is
    +          // possible the fetch failure has already been handled by the scheduler.
    +          if (runningStages.contains(failedStage)) {
    +            logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
    +              s"due to a fetch failure from $mapStage (${mapStage.name})")
    +            markStageAsFinished(failedStage, Some(failureMessage))
    +          } else {
    +            logInfo(s"Ignoring fetch failure from $task as it's from $failedStage, "
+
    --- End diff --
    
    woops, sorry I forgot about this one!  thanks, changed to the logDebug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message