spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiangxb1987 <...@git.apache.org>
Subject [GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Date Fri, 20 Jul 2018 17:03:49 GMT
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21758#discussion_r204108963
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1386,29 +1418,90 @@ class DAGScheduler(
                   )
                 }
               }
    -          // Mark the map whose fetch failed as broken in the map stage
    -          if (mapId != -1) {
    -            mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    -          }
    +        }
     
    -          // TODO: mark the executor as failed only if there were lots of fetch failures
on it
    -          if (bmAddress != null) {
    -            val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled
&&
    -              unRegisterOutputOnHostOnFetchFailure) {
    -              // We had a fetch failure with the external shuffle service, so we
    -              // assume all shuffle data on the node is bad.
    -              Some(bmAddress.host)
    -            } else {
    -              // Unregister shuffle data just for one executor (we don't have any
    -              // reason to believe shuffle data has been lost for the entire host).
    -              None
    +      case failure: TaskFailedReason if task.isBarrier =>
    +        // Also handle the task failed reasons here.
    +        failure match {
    +          case Resubmitted =>
    +            logInfo("Resubmitted " + task + ", so marking it as still running")
    +            stage match {
    +              case sms: ShuffleMapStage =>
    +                sms.pendingPartitions += task.partitionId
    +
    +              case _ =>
    +                throw new SparkException("TaskSetManagers should only send Resubmitted
task " +
    +                  "statuses for tasks in ShuffleMapStages.")
                 }
    -            removeExecutorAndUnregisterOutputs(
    -              execId = bmAddress.executorId,
    -              fileLost = true,
    -              hostToUnregisterOutputs = hostToUnregisterOutputs,
    -              maybeEpoch = Some(task.epoch))
    +
    +          case _ => // Do nothing.
    +        }
    +
    +        // Always fail the current stage and retry all the tasks when a barrier task
fail.
    +        val failedStage = stageIdToStage(task.stageId)
    +        logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier
task " +
    +          "failed.")
    +        val message = s"Stage failed because barrier task $task finished unsuccessfully.
" +
    +          s"${failure.toErrorString}"
    +        try {
    +          // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +          taskScheduler.cancelTasks(stageId, interruptThread = false)
    +        } catch {
    +          case e: UnsupportedOperationException =>
    +            // Cannot continue with barrier stage if failed to cancel zombie barrier
tasks.
    +            logWarning(s"Could not cancel tasks for stage $stageId", e)
    +            abortStage(failedStage, "Could not cancel zombie barrier tasks for stage
" +
    +              s"$failedStage (${failedStage.name})", Some(e))
    +        }
    +        markStageAsFinished(failedStage, Some(message))
    +
    +        failedStage.failedAttemptIds.add(task.stageAttemptId)
    +        val shouldAbortStage =
    --- End diff --
    
    Could we do it in a followup to make the review of current PR easier ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message