Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90B6A11399 for ; Fri, 25 Apr 2014 23:04:58 +0000 (UTC) Received: (qmail 6024 invoked by uid 500); 25 Apr 2014 23:04:57 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 5989 invoked by uid 500); 25 Apr 2014 23:04:57 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.apache.org Delivered-To: mailing list commits@spark.apache.org Received: (qmail 5981 invoked by uid 99); 25 Apr 2014 23:04:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Apr 2014 23:04:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 13D0D952845; Fri, 25 Apr 2014 23:04:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: matei@apache.org To: commits@spark.apache.org Message-Id: <6bef81d4fc5a444caed32c66eba4b81f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: SPARK-1235: manage the DAGScheduler EventProcessActor with supervisor and refactor the DAGScheduler with Akka Date: Fri, 25 Apr 2014 23:04:57 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master df6d81425 -> 027f1b85f SPARK-1235: manage the DAGScheduler EventProcessActor with supervisor and refactor the DAGScheduler with Akka https://spark-project.atlassian.net/browse/SPARK-1235 In the current implementation, the running job will hang if the DAGScheduler crashes for some reason (eventProcessActor throws exception in receive() ) The reason is that the actor will automatically restart when the exception is thrown during the running but is not captured properly (Akka behaviour), and the JobWaiters are still waiting there for the completion of the tasks In this patch, I refactored the DAGScheduler with Akka and manage the eventProcessActor with supervisor, so that upon the failure of a eventProcessActor, the supervisor will terminate the EventProcessActor and close the SparkContext thanks for @kayousterhout and @markhamstra to give the hints in JIRA Author: CodingCat Author: Xiangrui Meng Author: Nan Zhu Closes #186 from CodingCat/SPARK-1235 and squashes the following commits: a7fb0ee [CodingCat] throw Exception on failure of creating DAG 124d82d [CodingCat] blocking the constructor until event actor is ready baf2d38 [CodingCat] fix the issue brought by non-blocking actorOf 35c886a [CodingCat] fix bug 82d08b3 [CodingCat] calling actorOf on system to ensure it is blocking 310a579 [CodingCat] style fix cd02d9a [Nan Zhu] small fix 561cfbc [CodingCat] recover doCheckpoint c048d0e [CodingCat] call submitWaitingStages for every event a9eea039 [CodingCat] address Matei's comments ac878ab [CodingCat] typo fix 5d1636a [CodingCat] re-trigger the test..... 9dfb033 [CodingCat] remove unnecessary changes a7a2a97 [CodingCat] add StageCancelled message fdf3b17 [CodingCat] just to retrigger the test...... 089bc2f [CodingCat] address andrew's comments 228f4b0 [CodingCat] address comments from Mark b68c1c7 [CodingCat] refactor DAGScheduler with Akka 810efd8 [Xiangrui Meng] akka solution Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/027f1b85 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/027f1b85 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/027f1b85 Branch: refs/heads/master Commit: 027f1b85f961ce16ee069afe3d90a36dce009994 Parents: df6d814 Author: CodingCat Authored: Fri Apr 25 16:04:48 2014 -0700 Committer: Matei Zaharia Committed: Fri Apr 25 16:04:48 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 20 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 6 +- .../apache/spark/scheduler/DAGScheduler.scala | 419 +++++++++++-------- .../spark/scheduler/DAGSchedulerEvent.scala | 4 +- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 58 ++- .../scheduler/TaskSchedulerImplSuite.scala | 2 +- 7 files changed, 290 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e9d2f57..eb14d87 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -300,10 +300,17 @@ class SparkContext(config: SparkConf) extends Logging { // Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) - taskScheduler.start() + @volatile private[spark] var dagScheduler: DAGScheduler = _ + try { + dagScheduler = new DAGScheduler(this) + } catch { + case e: Exception => throw + new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage)) + } - @volatile private[spark] var dagScheduler = new DAGScheduler(this) - dagScheduler.start() + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's + // constructor + taskScheduler.start() private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { @@ -1022,8 +1029,8 @@ class SparkContext(config: SparkConf) extends Logging { partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - partitions.foreach{ p => - require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + if (dagScheduler == null) { + throw new SparkException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) @@ -1132,9 +1139,6 @@ class SparkContext(config: SparkConf) extends Logging { resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { - partitions.foreach{ p => - require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") - } val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e8bbfbf..3b3524f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1142,9 +1142,9 @@ abstract class RDD[T: ClassTag]( @transient private var doCheckpointCalled = false /** - * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler - * after a job using this RDD has completed (therefore the RDD has been materialized and - * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs. + * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD + * has completed (therefore the RDD has been materialized and potentially stored in memory). + * doCheckpoint() is called recursively on the parent RDDs. */ private[spark] def doCheckpoint() { if (!doCheckpointCalled) { http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dbde9b5..ff411e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -22,10 +22,16 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.concurrent.Await import scala.concurrent.duration._ +import scala.language.postfixOps import scala.reflect.ClassTag import akka.actor._ +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy.Stop +import akka.pattern.ask +import akka.util.Timeout import org.apache.spark._ import org.apache.spark.executor.TaskMetrics @@ -47,14 +53,11 @@ import org.apache.spark.util.Utils * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task * a small number of times before cancelling the whole stage. * - * THREADING: This class runs all its logic in a single thread executing the run() method, to which - * events are submitted using a synchronized queue (eventQueue). The public API methods, such as - * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods - * should be private. */ private[spark] class DAGScheduler( - taskScheduler: TaskScheduler, + private[scheduler] val sc: SparkContext, + private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, @@ -65,6 +68,7 @@ class DAGScheduler( def this(sc: SparkContext, taskScheduler: TaskScheduler) = { this( + sc, taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], @@ -74,8 +78,6 @@ class DAGScheduler( def this(sc: SparkContext) = this(sc, sc.taskScheduler) - private var eventProcessActor: ActorRef = _ - private[scheduler] val nextJobId = new AtomicInteger(0) private[scheduler] def numTotalJobs: Int = nextJobId.get() private val nextStageId = new AtomicInteger(0) @@ -113,50 +115,31 @@ class DAGScheduler( // stray messages to detect. private val failedEpoch = new HashMap[String, Long] - taskScheduler.setDAGScheduler(this) + private val dagSchedulerActorSupervisor = + env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) - /** - * Starts the event processing actor. The actor has two responsibilities: - * - * 1. Waits for events like job submission, task finished, task failure etc., and calls - * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them. - * 2. Schedules a periodical task to resubmit failed stages. - * - * NOTE: the actor cannot be started in the constructor, because the periodical task references - * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus - * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed. - */ - def start() { - eventProcessActor = env.actorSystem.actorOf(Props(new Actor { - /** - * The main event loop of the DAG scheduler. - */ - def receive = { - case event: DAGSchedulerEvent => - logTrace("Got event of type " + event.getClass.getName) - - /** - * All events are forwarded to `processEvent()`, so that the event processing logic can - * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite` - * for details. - */ - if (!processEvent(event)) { - submitWaitingStages() - } else { - context.stop(self) - } - } - })) + private[scheduler] var eventProcessActor: ActorRef = _ + + private def initializeEventProcessActor() { + // blocking the thread until supervisor is started, which ensures eventProcessActor is + // not null before any job is submitted + implicit val timeout = Timeout(30 seconds) + val initEventActorReply = + dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) + eventProcessActor = Await.result(initEventActorReply, timeout.duration). + asInstanceOf[ActorRef] } + initializeEventProcessActor() + // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { eventProcessActor ! BeginEvent(task, taskInfo) } // Called to report that a task has completed and results are being fetched remotely. - def taskGettingResult(task: Task[_], taskInfo: TaskInfo) { - eventProcessActor ! GettingResultEvent(task, taskInfo) + def taskGettingResult(taskInfo: TaskInfo) { + eventProcessActor ! GettingResultEvent(taskInfo) } // Called by TaskScheduler to report task completions or failures. @@ -436,7 +419,7 @@ class DAGScheduler( { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length - partitions.find(p => p >= maxPartitions).foreach { p => + partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) @@ -511,6 +494,15 @@ class DAGScheduler( eventProcessActor ! AllJobsCancelled } + private[scheduler] def doCancelAllJobs() { + // Cancel all running jobs. + runningStages.map(_.jobId).foreach(handleJobCancellation(_, + reason = "as part of cancellation of all jobs")) + activeJobs.clear() // These should already be empty by this point, + jobIdToActiveJob.clear() // but just in case we lost track of some jobs... + submitWaitingStages() + } + /** * Cancel all jobs associated with a running or scheduled stage. */ @@ -519,147 +511,29 @@ class DAGScheduler( } /** - * Process one event retrieved from the event processing actor. - * - * @param event The event to be processed. - * @return `true` if we should stop the event loop. - */ - private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { - event match { - case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => - var finalStage: Stage = null - try { - // New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD - // whose underlying HDFS files have been deleted. - finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) - } catch { - case e: Exception => - logWarning("Creating new stage failed due to exception - job: " + jobId, e) - listener.jobFailed(e) - return false - } - val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) - clearCacheLocs() - logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length + - " output partitions (allowLocal=" + allowLocal + ")") - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) - if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { - // Compute very short actions like first() or take() with no parent stages locally. - listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) - runLocally(job) - } else { - jobIdToActiveJob(jobId) = job - activeJobs += job - resultStageToJob(finalStage) = job - listenerBus.post( - SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties)) - submitStage(finalStage) - } - - case StageCancelled(stageId) => - handleStageCancellation(stageId) - - case JobCancelled(jobId) => - handleJobCancellation(jobId) - - case JobGroupCancelled(groupId) => - // Cancel all jobs belonging to this job group. - // First finds all active jobs with this group id, and then kill stages for them. - val activeInGroup = activeJobs.filter(activeJob => - groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) - val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach(jobId => handleJobCancellation(jobId, - "as part of cancelled job group %s".format(groupId))) - - case AllJobsCancelled => - // Cancel all running jobs. - runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, - "as part of cancellation of all jobs")) - activeJobs.clear() // These should already be empty by this point, - jobIdToActiveJob.clear() // but just in case we lost track of some jobs... - - case ExecutorAdded(execId, host) => - handleExecutorAdded(execId, host) - - case ExecutorLost(execId) => - handleExecutorLost(execId) - - case BeginEvent(task, taskInfo) => - for ( - stage <- stageIdToStage.get(task.stageId); - stageInfo <- stageToInfos.get(stage) - ) { - if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && - !stageInfo.emittedTaskSizeWarning) { - stageInfo.emittedTaskSizeWarning = true - logWarning(("Stage %d (%s) contains a task of very large " + - "size (%d KB). The maximum recommended task size is %d KB.").format( - task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN)) - } - } - listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) - - case GettingResultEvent(task, taskInfo) => - listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) - - case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => - val stageId = task.stageId - val taskType = Utils.getFormattedClassName(task) - listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics)) - handleTaskCompletion(completion) - - case TaskSetFailed(taskSet, reason) => - stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) } - - case ResubmitFailedStages => - if (failedStages.size > 0) { - // Failed stages may be removed by job cancellation, so failed might be empty even if - // the ResubmitFailedStages event has been scheduled. - resubmitFailedStages() - } - - case StopDAGScheduler => - // Cancel any active jobs - for (job <- activeJobs) { - val error = new SparkException("Job cancelled because SparkContext was shut down") - job.listener.jobFailed(error) - // Tell the listeners that all of the running stages have ended. Don't bother - // cancelling the stages because if the DAG scheduler is stopped, the entire application - // is in the process of getting stopped. - val stageFailedMessage = "Stage cancelled because SparkContext was shut down" - runningStages.foreach { stage => - val info = stageToInfos(stage) - info.stageFailed(stageFailedMessage) - listenerBus.post(SparkListenerStageCompleted(info)) - } - listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) - } - return true - } - false - } - - /** * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since * the last fetch failure. */ private[scheduler] def resubmitFailedStages() { - logInfo("Resubmitting failed stages") - clearCacheLocs() - val failedStagesCopy = failedStages.toArray - failedStages.clear() - for (stage <- failedStagesCopy.sortBy(_.jobId)) { - submitStage(stage) + if (failedStages.size > 0) { + // Failed stages may be removed by job cancellation, so failed might be empty even if + // the ResubmitFailedStages event has been scheduled. + logInfo("Resubmitting failed stages") + clearCacheLocs() + val failedStagesCopy = failedStages.toArray + failedStages.clear() + for (stage <- failedStagesCopy.sortBy(_.jobId)) { + submitStage(stage) + } } + submitWaitingStages() } /** * Check for waiting or failed stages which are now eligible for resubmission. * Ordinarily run on every iteration of the event loop. */ - private[scheduler] def submitWaitingStages() { + private def submitWaitingStages() { // TODO: We might want to run this less often, when we are sure that something has become // runnable that wasn't before. logTrace("Checking for newly runnable parent stages") @@ -730,6 +604,102 @@ class DAGScheduler( } } + private[scheduler] def handleJobGroupCancelled(groupId: String) { + // Cancel all jobs belonging to this job group. + // First finds all active jobs with this group id, and then kill stages for them. + val activeInGroup = activeJobs.filter(activeJob => + groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) + val jobIds = activeInGroup.map(_.jobId) + jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId))) + submitWaitingStages() + } + + private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { + for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) { + if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 && + !stageInfo.emittedTaskSizeWarning) { + stageInfo.emittedTaskSizeWarning = true + logWarning(("Stage %d (%s) contains a task of very large " + + "size (%d KB). The maximum recommended task size is %d KB.").format( + task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, + DAGScheduler.TASK_SIZE_TO_WARN)) + } + } + listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) + submitWaitingStages() + } + + private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) { + stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) } + submitWaitingStages() + } + + private[scheduler] def cleanUpAfterSchedulerStop() { + for (job <- activeJobs) { + val error = new SparkException("Job cancelled because SparkContext was shut down") + job.listener.jobFailed(error) + // Tell the listeners that all of the running stages have ended. Don't bother + // cancelling the stages because if the DAG scheduler is stopped, the entire application + // is in the process of getting stopped. + val stageFailedMessage = "Stage cancelled because SparkContext was shut down" + runningStages.foreach { stage => + val info = stageToInfos(stage) + info.stageFailed(stageFailedMessage) + listenerBus.post(SparkListenerStageCompleted(info)) + } + listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + } + } + + private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) { + listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) + submitWaitingStages() + } + + private[scheduler] def handleJobSubmitted(jobId: Int, + finalRDD: RDD[_], + func: (TaskContext, Iterator[_]) => _, + partitions: Array[Int], + allowLocal: Boolean, + callSite: String, + listener: JobListener, + properties: Properties = null) + { + var finalStage: Stage = null + try { + // New stage creation may throw an exception if, for example, jobs are run on a + // HadoopRDD whose underlying HDFS files have been deleted. + finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite)) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + listener.jobFailed(e) + return + } + if (finalStage != null) { + val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) + clearCacheLocs() + logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( + job.jobId, callSite, partitions.length, allowLocal)) + logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { + // Compute very short actions like first() or take() with no parent stages locally. + listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) + runLocally(job) + } else { + jobIdToActiveJob(jobId) = job + activeJobs += job + resultStageToJob(finalStage) = job + listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, + properties)) + submitStage(finalStage) + } + } + submitWaitingStages() + } + /** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) @@ -819,9 +789,12 @@ class DAGScheduler( * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. */ - private def handleTaskCompletion(event: CompletionEvent) { + private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task - + val stageId = task.stageId + val taskType = Utils.getFormattedClassName(task) + listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, + event.taskMetrics)) if (!stageIdToStage.contains(task.stageId)) { // Skip all the actions if the stage has been cancelled. return @@ -964,6 +937,7 @@ class DAGScheduler( // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } + submitWaitingStages() } /** @@ -973,7 +947,7 @@ class DAGScheduler( * Optionally the epoch during which the failure was caught can be passed to avoid allowing * stray fetch failures from possibly retriggering the detection of a node as lost. */ - private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) { + private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch @@ -993,17 +967,19 @@ class DAGScheduler( logDebug("Additional executor lost message for " + execId + "(epoch " + currentEpoch + ")") } + submitWaitingStages() } - private def handleExecutorAdded(execId: String, host: String) { + private[scheduler] def handleExecutorAdded(execId: String, host: String) { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } + submitWaitingStages() } - private def handleStageCancellation(stageId: Int) { + private[scheduler] def handleStageCancellation(stageId: Int) { if (stageIdToJobIds.contains(stageId)) { val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray jobsThatUseStage.foreach(jobId => { @@ -1012,22 +988,24 @@ class DAGScheduler( } else { logInfo("No active jobs to kill for Stage " + stageId) } + submitWaitingStages() } - private def handleJobCancellation(jobId: Int, reason: String = "") { + private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") { if (!jobIdToStageIds.contains(jobId)) { logDebug("Trying to cancel unregistered job " + jobId) } else { failJobAndIndependentStages(jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason), None) } + submitWaitingStages() } /** * Aborts all jobs depending on a particular Stage. This is called in response to a task set * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. */ - private def abortStage(failedStage: Stage, reason: String) { + private[scheduler] def abortStage(failedStage: Stage, reason: String) { if (!stageIdToStage.contains(failedStage.id)) { // Skip all the actions if the stage has been removed. return @@ -1156,13 +1134,88 @@ class DAGScheduler( } def stop() { - if (eventProcessActor != null) { - eventProcessActor ! StopDAGScheduler - } + logInfo("Stopping DAGScheduler") + dagSchedulerActorSupervisor ! PoisonPill taskScheduler.stop() } } +private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) + extends Actor with Logging { + + override val supervisorStrategy = + OneForOneStrategy() { + case x: Exception => + logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" + .format(x.getMessage)) + dagScheduler.doCancelAllJobs() + dagScheduler.sc.stop() + Stop + } + + def receive = { + case p: Props => sender ! context.actorOf(p) + case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") + } +} + +private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) + extends Actor with Logging { + + override def preStart() { + // set DAGScheduler for taskScheduler to ensure eventProcessActor is always + // valid when the messages arrive + dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) + } + + /** + * The main event loop of the DAG scheduler. + */ + def receive = { + case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => + dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, + listener, properties) + + case StageCancelled(stageId) => + dagScheduler.handleStageCancellation(stageId) + + case JobCancelled(jobId) => + dagScheduler.handleJobCancellation(jobId) + + case JobGroupCancelled(groupId) => + dagScheduler.handleJobGroupCancelled(groupId) + + case AllJobsCancelled => + dagScheduler.doCancelAllJobs() + + case ExecutorAdded(execId, host) => + dagScheduler.handleExecutorAdded(execId, host) + + case ExecutorLost(execId) => + dagScheduler.handleExecutorLost(execId) + + case BeginEvent(task, taskInfo) => + dagScheduler.handleBeginEvent(task, taskInfo) + + case GettingResultEvent(taskInfo) => + dagScheduler.handleGetTaskResult(taskInfo) + + case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => + dagScheduler.handleTaskCompletion(completion) + + case TaskSetFailed(taskSet, reason) => + dagScheduler.handleTaskSetFailed(taskSet, reason) + + case ResubmitFailedStages => + dagScheduler.resubmitFailedStages() + } + + override def postStop() { + // Cancel any active jobs in postStop hook + dagScheduler.cleanUpAfterSchedulerStop() + } +} + private[spark] object DAGScheduler { // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 0800c56..23f5744 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -57,7 +57,7 @@ private[scheduler] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent private[scheduler] -case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent +case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent private[scheduler] case class CompletionEvent( task: Task[_], @@ -76,5 +76,3 @@ private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent - -private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a81b834..f3bd079 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -465,7 +465,7 @@ private[spark] class TaskSetManager( def handleTaskGettingResult(tid: Long) = { val info = taskInfos(tid) info.markGettingResult() - sched.dagScheduler.taskGettingResult(tasks(info.index), info) + sched.dagScheduler.taskGettingResult(info) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ff69eb7..d172dd1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -21,6 +21,8 @@ import scala.Tuple2 import scala.collection.mutable.{HashSet, HashMap, Map} import scala.language.reflectiveCalls +import akka.actor._ +import akka.testkit.{ImplicitSender, TestKit, TestActorRef} import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark._ @@ -28,19 +30,16 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -/** - * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler - * rather than spawning an event loop thread as happens in the real code. They use EasyMock - * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are - * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead - * host notifications are sent). In addition, tests may check for side effects on a non-mocked - * MapOutputTracker instance. - * - * Tests primarily consist of running DAGScheduler#processEvent and - * DAGScheduler#submitWaitingStages (via test utility functions like runEvent or respondToTaskSet) - * and capturing the resulting TaskSets from the mock TaskScheduler. - */ -class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { +class BuggyDAGEventProcessActor extends Actor { + val state = 0 + def receive = { + case _ => throw new SparkException("error") + } +} + +class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite + with ImplicitSender with BeforeAndAfter with LocalSparkContext { + val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() @@ -82,6 +81,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont var mapOutputTracker: MapOutputTrackerMaster = null var scheduler: DAGScheduler = null + var dagEventProcessTestActor: TestActorRef[DAGSchedulerEventProcessActor] = null /** * Set of cache locations to return from our mock BlockManagerMaster. @@ -121,6 +121,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont results.clear() mapOutputTracker = new MapOutputTrackerMaster(conf) scheduler = new DAGScheduler( + sc, taskScheduler, sc.listenerBus, mapOutputTracker, @@ -131,10 +132,13 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont runLocallyWithinThread(job) } } + dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor]( + Props(classOf[DAGSchedulerEventProcessActor], scheduler))(system) } - after { - scheduler.stop() + override def afterAll() { + super.afterAll() + TestKit.shutdownActorSystem(system) } /** @@ -178,8 +182,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont * DAGScheduler event loop. */ private def runEvent(event: DAGSchedulerEvent) { - assert(!scheduler.processEvent(event)) - scheduler.submitWaitingStages() + dagEventProcessTestActor.receive(event) } /** @@ -209,7 +212,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont listener: JobListener = jobListener): Int = { val jobId = scheduler.nextJobId.getAndIncrement() runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener)) - return jobId + jobId } /** Sends TaskSetFailed to the scheduler. */ @@ -223,19 +226,17 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont } test("zero split job") { - val rdd = makeRdd(0, Nil) var numResults = 0 val fakeListener = new JobListener() { override def taskSucceeded(partition: Int, value: Any) = numResults += 1 override def jobFailed(exception: Exception) = throw exception } - submit(rdd, Array(), listener = fakeListener) + submit(makeRdd(0, Nil), Array(), listener = fakeListener) assert(numResults === 0) } test("run trivial job") { - val rdd = makeRdd(1, Nil) - submit(rdd, Array(0)) + submit(makeRdd(1, Nil), Array(0)) complete(taskSets(0), List((Success, 42))) assert(results === Map(0 -> 42)) assertDataStructuresEmpty @@ -529,6 +530,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertDataStructuresEmpty } + test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") { + val actorSystem = ActorSystem("test") + val supervisor = actorSystem.actorOf( + Props(classOf[DAGSchedulerActorSupervisor], scheduler), "dagSupervisor") + supervisor ! Props[BuggyDAGEventProcessActor] + val child = expectMsgType[ActorRef] + watch(child) + child ! "hi" + expectMsgPF(){ case Terminated(child) => () } + assert(scheduler.sc.dagScheduler === null) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. @@ -561,3 +574,4 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(scheduler.waitingStages.isEmpty) } } + http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 2fb750d..a8b605c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -305,7 +305,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} override def executorAdded(execId: String, host: String) {} } - + taskScheduler.setDAGScheduler(dagScheduler) // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0))