spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: SPARK-1235: manage the DAGScheduler EventProcessActor with supervisor and refactor the DAGScheduler with Akka
Date Fri, 25 Apr 2014 23:04:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master df6d81425 -> 027f1b85f


SPARK-1235: manage the DAGScheduler EventProcessActor with supervisor and refactor the DAGScheduler
with Akka

https://spark-project.atlassian.net/browse/SPARK-1235

In the current implementation, the running job will hang if the DAGScheduler crashes for some
reason (eventProcessActor throws exception in receive() )

The reason is that the actor will automatically restart when the exception is thrown during
the running but is not captured properly (Akka behaviour), and the JobWaiters are still waiting
there for the completion of the tasks

In this patch, I refactored the DAGScheduler with Akka and manage the eventProcessActor with
supervisor, so that upon the failure of a eventProcessActor, the supervisor will terminate
the EventProcessActor and close the SparkContext

thanks for @kayousterhout and @markhamstra to give the hints in JIRA

Author: CodingCat <zhunansjtu@gmail.com>
Author: Xiangrui Meng <meng@databricks.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>

Closes #186 from CodingCat/SPARK-1235 and squashes the following commits:

a7fb0ee [CodingCat] throw Exception on failure of creating DAG
124d82d [CodingCat] blocking the constructor until event actor is ready
baf2d38 [CodingCat] fix the issue brought by non-blocking actorOf
35c886a [CodingCat] fix bug
82d08b3 [CodingCat] calling actorOf on system to ensure it is blocking
310a579 [CodingCat] style fix
cd02d9a [Nan Zhu] small fix
561cfbc [CodingCat] recover doCheckpoint
c048d0e [CodingCat] call submitWaitingStages for every event
a9eea039 [CodingCat] address Matei's comments
ac878ab [CodingCat] typo fix
5d1636a [CodingCat] re-trigger the test.....
9dfb033 [CodingCat] remove unnecessary changes
a7a2a97 [CodingCat] add StageCancelled message
fdf3b17 [CodingCat] just to retrigger the test......
089bc2f [CodingCat] address andrew's comments
228f4b0 [CodingCat] address comments from Mark
b68c1c7 [CodingCat] refactor DAGScheduler with Akka
810efd8 [Xiangrui Meng] akka solution


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/027f1b85
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/027f1b85
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/027f1b85

Branch: refs/heads/master
Commit: 027f1b85f961ce16ee069afe3d90a36dce009994
Parents: df6d814
Author: CodingCat <zhunansjtu@gmail.com>
Authored: Fri Apr 25 16:04:48 2014 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Fri Apr 25 16:04:48 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  20 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   6 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 419 +++++++++++--------
 .../spark/scheduler/DAGSchedulerEvent.scala     |   4 +-
 .../apache/spark/scheduler/TaskSetManager.scala |   2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  58 ++-
 .../scheduler/TaskSchedulerImplSuite.scala      |   2 +-
 7 files changed, 290 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e9d2f57..eb14d87 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -300,10 +300,17 @@ class SparkContext(config: SparkConf) extends Logging {
 
   // Create and start the scheduler
   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
-  taskScheduler.start()
+  @volatile private[spark] var dagScheduler: DAGScheduler = _
+  try {
+    dagScheduler = new DAGScheduler(this)
+  } catch {
+    case e: Exception => throw
+      new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
+  }
 
-  @volatile private[spark] var dagScheduler = new DAGScheduler(this)
-  dagScheduler.start()
+  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
+  // constructor
+  taskScheduler.start()
 
   private[spark] val cleaner: Option[ContextCleaner] = {
     if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
@@ -1022,8 +1029,8 @@ class SparkContext(config: SparkConf) extends Logging {
       partitions: Seq[Int],
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit) {
-    partitions.foreach{ p =>
-      require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested:
$p")
+    if (dagScheduler == null) {
+      throw new SparkException("SparkContext has been shutdown")
     }
     val callSite = getCallSite
     val cleanedFunc = clean(func)
@@ -1132,9 +1139,6 @@ class SparkContext(config: SparkConf) extends Logging {
       resultHandler: (Int, U) => Unit,
       resultFunc: => R): SimpleFutureAction[R] =
   {
-    partitions.foreach{ p =>
-      require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested:
$p")
-    }
     val cleanF = clean(processPartition)
     val callSite = getCallSite
     val waiter = dagScheduler.submitJob(

http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e8bbfbf..3b3524f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1142,9 +1142,9 @@ abstract class RDD[T: ClassTag](
   @transient private var doCheckpointCalled = false
 
   /**
-   * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
-   * after a job using this RDD has completed (therefore the RDD has been materialized and
-   * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
+   * Performs the checkpointing of this RDD by saving this. It is called after a job using
this RDD
+   * has completed (therefore the RDD has been materialized and potentially stored in memory).
+   * doCheckpoint() is called recursively on the parent RDDs.
    */
   private[spark] def doCheckpoint() {
     if (!doCheckpointCalled) {

http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dbde9b5..ff411e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -22,10 +22,16 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.concurrent.Await
 import scala.concurrent.duration._
+import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import akka.actor._
+import akka.actor.OneForOneStrategy
+import akka.actor.SupervisorStrategy.Stop
+import akka.pattern.ask
+import akka.util.Timeout
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
@@ -47,14 +53,11 @@ import org.apache.spark.util.Utils
  * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each
task
  * a small number of times before cancelling the whole stage.
  *
- * THREADING: This class runs all its logic in a single thread executing the run() method,
to which
- * events are submitted using a synchronized queue (eventQueue). The public API methods,
such as
- * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other
methods
- * should be private.
  */
 private[spark]
 class DAGScheduler(
-    taskScheduler: TaskScheduler,
+    private[scheduler] val sc: SparkContext,
+    private[scheduler] val taskScheduler: TaskScheduler,
     listenerBus: LiveListenerBus,
     mapOutputTracker: MapOutputTrackerMaster,
     blockManagerMaster: BlockManagerMaster,
@@ -65,6 +68,7 @@ class DAGScheduler(
 
   def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
     this(
+      sc,
       taskScheduler,
       sc.listenerBus,
       sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
@@ -74,8 +78,6 @@ class DAGScheduler(
 
   def this(sc: SparkContext) = this(sc, sc.taskScheduler)
 
-  private var eventProcessActor: ActorRef = _
-
   private[scheduler] val nextJobId = new AtomicInteger(0)
   private[scheduler] def numTotalJobs: Int = nextJobId.get()
   private val nextStageId = new AtomicInteger(0)
@@ -113,50 +115,31 @@ class DAGScheduler(
   //       stray messages to detect.
   private val failedEpoch = new HashMap[String, Long]
 
-  taskScheduler.setDAGScheduler(this)
+  private val dagSchedulerActorSupervisor =
+    env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
 
-  /**
-   * Starts the event processing actor.  The actor has two responsibilities:
-   *
-   * 1. Waits for events like job submission, task finished, task failure etc., and calls
-   *    [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
-   * 2. Schedules a periodical task to resubmit failed stages.
-   *
-   * NOTE: the actor cannot be started in the constructor, because the periodical task references
-   * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object,
thus
-   * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
-   */
-  def start() {
-    eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
-      /**
-       * The main event loop of the DAG scheduler.
-       */
-      def receive = {
-        case event: DAGSchedulerEvent =>
-          logTrace("Got event of type " + event.getClass.getName)
-
-          /**
-           * All events are forwarded to `processEvent()`, so that the event processing logic
can
-           * easily tested without starting a dedicated actor.  Please refer to `DAGSchedulerSuite`
-           * for details.
-           */
-          if (!processEvent(event)) {
-            submitWaitingStages()
-          } else {
-            context.stop(self)
-          }
-      }
-    }))
+  private[scheduler] var eventProcessActor: ActorRef = _
+
+  private def initializeEventProcessActor() {
+    // blocking the thread until supervisor is started, which ensures eventProcessActor is
+    // not null before any job is submitted
+    implicit val timeout = Timeout(30 seconds)
+    val initEventActorReply =
+      dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
+    eventProcessActor = Await.result(initEventActorReply, timeout.duration).
+      asInstanceOf[ActorRef]
   }
 
+  initializeEventProcessActor()
+
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     eventProcessActor ! BeginEvent(task, taskInfo)
   }
 
   // Called to report that a task has completed and results are being fetched remotely.
-  def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
-    eventProcessActor ! GettingResultEvent(task, taskInfo)
+  def taskGettingResult(taskInfo: TaskInfo) {
+    eventProcessActor ! GettingResultEvent(taskInfo)
   }
 
   // Called by TaskScheduler to report task completions or failures.
@@ -436,7 +419,7 @@ class DAGScheduler(
   {
     // Check to make sure we are not launching a task on a partition that does not exist.
     val maxPartitions = rdd.partitions.length
-    partitions.find(p => p >= maxPartitions).foreach { p =>
+    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
       throw new IllegalArgumentException(
         "Attempting to access a non-existent partition: " + p + ". " +
           "Total number of partitions: " + maxPartitions)
@@ -511,6 +494,15 @@ class DAGScheduler(
     eventProcessActor ! AllJobsCancelled
   }
 
+  private[scheduler] def doCancelAllJobs() {
+    // Cancel all running jobs.
+    runningStages.map(_.jobId).foreach(handleJobCancellation(_,
+      reason = "as part of cancellation of all jobs"))
+    activeJobs.clear() // These should already be empty by this point,
+    jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
+    submitWaitingStages()
+  }
+
   /**
    * Cancel all jobs associated with a running or scheduled stage.
    */
@@ -519,147 +511,29 @@ class DAGScheduler(
   }
 
   /**
-   * Process one event retrieved from the event processing actor.
-   *
-   * @param event The event to be processed.
-   * @return `true` if we should stop the event loop.
-   */
-  private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
-    event match {
-      case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)
=>
-        var finalStage: Stage = null
-        try {
-          // New stage creation may throw an exception if, for example, jobs are run on a
HadoopRDD
-          // whose underlying HDFS files have been deleted.
-          finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
-        } catch {
-          case e: Exception =>
-            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
-            listener.jobFailed(e)
-            return false
-        }
-        val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener,
properties)
-        clearCacheLocs()
-        logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length
+
-                " output partitions (allowLocal=" + allowLocal + ")")
-        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
-        logInfo("Parents of final stage: " + finalStage.parents)
-        logInfo("Missing parents: " + getMissingParentStages(finalStage))
-        if (allowLocal && finalStage.parents.size == 0 && partitions.length
== 1) {
-          // Compute very short actions like first() or take() with no parent stages locally.
-          listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
-          runLocally(job)
-        } else {
-          jobIdToActiveJob(jobId) = job
-          activeJobs += job
-          resultStageToJob(finalStage) = job
-          listenerBus.post(
-            SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
-          submitStage(finalStage)
-        }
-
-      case StageCancelled(stageId) =>
-        handleStageCancellation(stageId)
-
-      case JobCancelled(jobId) =>
-        handleJobCancellation(jobId)
-
-      case JobGroupCancelled(groupId) =>
-        // Cancel all jobs belonging to this job group.
-        // First finds all active jobs with this group id, and then kill stages for them.
-        val activeInGroup = activeJobs.filter(activeJob =>
-          groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
-        val jobIds = activeInGroup.map(_.jobId)
-        jobIds.foreach(jobId => handleJobCancellation(jobId,
-          "as part of cancelled job group %s".format(groupId)))
-
-      case AllJobsCancelled =>
-        // Cancel all running jobs.
-        runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
-          "as part of cancellation of all jobs"))
-        activeJobs.clear()      // These should already be empty by this point,
-        jobIdToActiveJob.clear()   // but just in case we lost track of some jobs...
-
-      case ExecutorAdded(execId, host) =>
-        handleExecutorAdded(execId, host)
-
-      case ExecutorLost(execId) =>
-        handleExecutorLost(execId)
-
-      case BeginEvent(task, taskInfo) =>
-        for (
-          stage <- stageIdToStage.get(task.stageId);
-          stageInfo <- stageToInfos.get(stage)
-        ) {
-          if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
-              !stageInfo.emittedTaskSizeWarning) {
-            stageInfo.emittedTaskSizeWarning = true
-            logWarning(("Stage %d (%s) contains a task of very large " +
-              "size (%d KB). The maximum recommended task size is %d KB.").format(
-              task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
-          }
-        }
-        listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
-
-      case GettingResultEvent(task, taskInfo) =>
-        listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
-
-      case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
-        val stageId = task.stageId
-        val taskType = Utils.getFormattedClassName(task)
-        listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics))
-        handleTaskCompletion(completion)
-
-      case TaskSetFailed(taskSet, reason) =>
-        stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
-
-      case ResubmitFailedStages =>
-        if (failedStages.size > 0) {
-          // Failed stages may be removed by job cancellation, so failed might be empty even
if
-          // the ResubmitFailedStages event has been scheduled.
-          resubmitFailedStages()
-        }
-
-      case StopDAGScheduler =>
-        // Cancel any active jobs
-        for (job <- activeJobs) {
-          val error = new SparkException("Job cancelled because SparkContext was shut down")
-          job.listener.jobFailed(error)
-          // Tell the listeners that all of the running stages have ended.  Don't bother
-          // cancelling the stages because if the DAG scheduler is stopped, the entire application
-          // is in the process of getting stopped.
-          val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
-          runningStages.foreach { stage =>
-            val info = stageToInfos(stage)
-            info.stageFailed(stageFailedMessage)
-            listenerBus.post(SparkListenerStageCompleted(info))
-          }
-          listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
-        }
-        return true
-    }
-    false
-  }
-
-  /**
    * Resubmit any failed stages. Ordinarily called after a small amount of time has passed
since
    * the last fetch failure.
    */
   private[scheduler] def resubmitFailedStages() {
-    logInfo("Resubmitting failed stages")
-    clearCacheLocs()
-    val failedStagesCopy = failedStages.toArray
-    failedStages.clear()
-    for (stage <- failedStagesCopy.sortBy(_.jobId)) {
-      submitStage(stage)
+    if (failedStages.size > 0) {
+      // Failed stages may be removed by job cancellation, so failed might be empty even
if
+      // the ResubmitFailedStages event has been scheduled.
+      logInfo("Resubmitting failed stages")
+      clearCacheLocs()
+      val failedStagesCopy = failedStages.toArray
+      failedStages.clear()
+      for (stage <- failedStagesCopy.sortBy(_.jobId)) {
+        submitStage(stage)
+      }
     }
+    submitWaitingStages()
   }
 
   /**
    * Check for waiting or failed stages which are now eligible for resubmission.
    * Ordinarily run on every iteration of the event loop.
    */
-  private[scheduler] def submitWaitingStages() {
+  private def submitWaitingStages() {
     // TODO: We might want to run this less often, when we are sure that something has become
     // runnable that wasn't before.
     logTrace("Checking for newly runnable parent stages")
@@ -730,6 +604,102 @@ class DAGScheduler(
     }
   }
 
+  private[scheduler] def handleJobGroupCancelled(groupId: String) {
+    // Cancel all jobs belonging to this job group.
+    // First finds all active jobs with this group id, and then kill stages for them.
+    val activeInGroup = activeJobs.filter(activeJob =>
+      groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+    val jobIds = activeInGroup.map(_.jobId)
+    jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
+    for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage))
{
+      if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 &&
+        !stageInfo.emittedTaskSizeWarning) {
+        stageInfo.emittedTaskSizeWarning = true
+        logWarning(("Stage %d (%s) contains a task of very large " +
+          "size (%d KB). The maximum recommended task size is %d KB.").format(
+            task.stageId, stageInfo.name, taskInfo.serializedSize / 1024,
+            DAGScheduler.TASK_SIZE_TO_WARN))
+      }
+    }
+    listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) {
+    stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) }
+    submitWaitingStages()
+  }
+
+  private[scheduler] def cleanUpAfterSchedulerStop() {
+    for (job <- activeJobs) {
+      val error = new SparkException("Job cancelled because SparkContext was shut down")
+      job.listener.jobFailed(error)
+      // Tell the listeners that all of the running stages have ended.  Don't bother
+      // cancelling the stages because if the DAG scheduler is stopped, the entire application
+      // is in the process of getting stopped.
+      val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
+      runningStages.foreach { stage =>
+        val info = stageToInfos(stage)
+        info.stageFailed(stageFailedMessage)
+        listenerBus.post(SparkListenerStageCompleted(info))
+      }
+      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    }
+  }
+
+  private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
+    listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleJobSubmitted(jobId: Int,
+      finalRDD: RDD[_],
+      func: (TaskContext, Iterator[_]) => _,
+      partitions: Array[Int],
+      allowLocal: Boolean,
+      callSite: String,
+      listener: JobListener,
+      properties: Properties = null)
+  {
+    var finalStage: Stage = null
+    try {
+      // New stage creation may throw an exception if, for example, jobs are run on a
+      // HadoopRDD whose underlying HDFS files have been deleted.
+      finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
+    } catch {
+      case e: Exception =>
+        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
+        listener.jobFailed(e)
+        return
+    }
+    if (finalStage != null) {
+      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
+      clearCacheLocs()
+      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
+        job.jobId, callSite, partitions.length, allowLocal))
+      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
+      logInfo("Parents of final stage: " + finalStage.parents)
+      logInfo("Missing parents: " + getMissingParentStages(finalStage))
+      if (allowLocal && finalStage.parents.size == 0 && partitions.length
== 1) {
+        // Compute very short actions like first() or take() with no parent stages locally.
+        listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
+        runLocally(job)
+      } else {
+        jobIdToActiveJob(jobId) = job
+        activeJobs += job
+        resultStageToJob(finalStage) = job
+        listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
+          properties))
+        submitStage(finalStage)
+      }
+    }
+    submitWaitingStages()
+  }
+
   /** Submits stage, but first recursively submits any missing parents. */
   private def submitStage(stage: Stage) {
     val jobId = activeJobForStage(stage)
@@ -819,9 +789,12 @@ class DAGScheduler(
    * Responds to a task finishing. This is called inside the event loop so it assumes that
it can
    * modify the scheduler's internal state. Use taskEnded() to post a task end event from
outside.
    */
-  private def handleTaskCompletion(event: CompletionEvent) {
+  private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
     val task = event.task
-
+    val stageId = task.stageId
+    val taskType = Utils.getFormattedClassName(task)
+    listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
+      event.taskMetrics))
     if (!stageIdToStage.contains(task.stageId)) {
       // Skip all the actions if the stage has been cancelled.
       return
@@ -964,6 +937,7 @@ class DAGScheduler(
         // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
         // will abort the job.
     }
+    submitWaitingStages()
   }
 
   /**
@@ -973,7 +947,7 @@ class DAGScheduler(
    * Optionally the epoch during which the failure was caught can be passed to avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node as lost.
    */
-  private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
+  private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None)
{
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
       failedEpoch(execId) = currentEpoch
@@ -993,17 +967,19 @@ class DAGScheduler(
       logDebug("Additional executor lost message for " + execId +
                "(epoch " + currentEpoch + ")")
     }
+    submitWaitingStages()
   }
 
-  private def handleExecutorAdded(execId: String, host: String) {
+  private[scheduler] def handleExecutorAdded(execId: String, host: String) {
     // remove from failedEpoch(execId) ?
     if (failedEpoch.contains(execId)) {
       logInfo("Host added was in lost list earlier: " + host)
       failedEpoch -= execId
     }
+    submitWaitingStages()
   }
 
-  private def handleStageCancellation(stageId: Int) {
+  private[scheduler] def handleStageCancellation(stageId: Int) {
     if (stageIdToJobIds.contains(stageId)) {
       val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
       jobsThatUseStage.foreach(jobId => {
@@ -1012,22 +988,24 @@ class DAGScheduler(
     } else {
       logInfo("No active jobs to kill for Stage " + stageId)
     }
+    submitWaitingStages()
   }
 
-  private def handleJobCancellation(jobId: Int, reason: String = "") {
+  private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
     if (!jobIdToStageIds.contains(jobId)) {
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
       failJobAndIndependentStages(jobIdToActiveJob(jobId),
         "Job %d cancelled %s".format(jobId, reason), None)
     }
+    submitWaitingStages()
   }
 
   /**
    * Aborts all jobs depending on a particular Stage. This is called in response to a task
set
    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
    */
-  private def abortStage(failedStage: Stage, reason: String) {
+  private[scheduler] def abortStage(failedStage: Stage, reason: String) {
     if (!stageIdToStage.contains(failedStage.id)) {
       // Skip all the actions if the stage has been removed.
       return
@@ -1156,13 +1134,88 @@ class DAGScheduler(
   }
 
   def stop() {
-    if (eventProcessActor != null) {
-      eventProcessActor ! StopDAGScheduler
-    }
+    logInfo("Stopping DAGScheduler")
+    dagSchedulerActorSupervisor ! PoisonPill
     taskScheduler.stop()
   }
 }
 
+private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
+  extends Actor with Logging {
+
+  override val supervisorStrategy =
+    OneForOneStrategy() {
+      case x: Exception =>
+        logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
+          .format(x.getMessage))
+        dagScheduler.doCancelAllJobs()
+        dagScheduler.sc.stop()
+        Stop
+    }
+
+  def receive = {
+    case p: Props => sender ! context.actorOf(p)
+    case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
+  }
+}
+
+private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
+  extends Actor with Logging {
+
+  override def preStart() {
+    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
+    // valid when the messages arrive
+    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
+  }
+
+  /**
+   * The main event loop of the DAG scheduler.
+   */
+  def receive = {
+    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)
=>
+      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
+        listener, properties)
+
+    case StageCancelled(stageId) =>
+      dagScheduler.handleStageCancellation(stageId)
+
+    case JobCancelled(jobId) =>
+      dagScheduler.handleJobCancellation(jobId)
+
+    case JobGroupCancelled(groupId) =>
+      dagScheduler.handleJobGroupCancelled(groupId)
+
+    case AllJobsCancelled =>
+      dagScheduler.doCancelAllJobs()
+
+    case ExecutorAdded(execId, host) =>
+      dagScheduler.handleExecutorAdded(execId, host)
+
+    case ExecutorLost(execId) =>
+      dagScheduler.handleExecutorLost(execId)
+
+    case BeginEvent(task, taskInfo) =>
+      dagScheduler.handleBeginEvent(task, taskInfo)
+
+    case GettingResultEvent(taskInfo) =>
+      dagScheduler.handleGetTaskResult(taskInfo)
+
+    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
+      dagScheduler.handleTaskCompletion(completion)
+
+    case TaskSetFailed(taskSet, reason) =>
+      dagScheduler.handleTaskSetFailed(taskSet, reason)
+
+    case ResubmitFailedStages =>
+      dagScheduler.resubmitFailedStages()
+  }
+
+  override def postStop() {
+    // Cancel any active jobs in postStop hook
+    dagScheduler.cleanUpAfterSchedulerStop()
+  }
+}
+
 private[spark] object DAGScheduler {
   // The time, in millis, to wait for fetch failure events to stop coming in after one is
detected;
   // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage
one by one

http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 0800c56..23f5744 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -57,7 +57,7 @@ private[scheduler]
 case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
 
 private[scheduler]
-case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent
 
 private[scheduler] case class CompletionEvent(
     task: Task[_],
@@ -76,5 +76,3 @@ private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
 
 private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
-
-private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a81b834..f3bd079 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -465,7 +465,7 @@ private[spark] class TaskSetManager(
   def handleTaskGettingResult(tid: Long) = {
     val info = taskInfos(tid)
     info.markGettingResult()
-    sched.dagScheduler.taskGettingResult(tasks(info.index), info)
+    sched.dagScheduler.taskGettingResult(info)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index ff69eb7..d172dd1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -21,6 +21,8 @@ import scala.Tuple2
 import scala.collection.mutable.{HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 
+import akka.actor._
+import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark._
@@ -28,19 +30,16 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 
-/**
- * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
- * rather than spawning an event loop thread as happens in the real code. They use EasyMock
- * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets
are
- * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which
dead
- * host notifications are sent). In addition, tests may check for side effects on a non-mocked
- * MapOutputTracker instance.
- *
- * Tests primarily consist of running DAGScheduler#processEvent and
- * DAGScheduler#submitWaitingStages (via test utility functions like runEvent or respondToTaskSet)
- * and capturing the resulting TaskSets from the mock TaskScheduler.
- */
-class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
+class BuggyDAGEventProcessActor extends Actor {
+  val state = 0
+  def receive = {
+    case _ => throw new SparkException("error")
+  }
+}
+
+class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite
+  with ImplicitSender with BeforeAndAfter with LocalSparkContext {
+
   val conf = new SparkConf
   /** Set of TaskSets the DAGScheduler has requested executed. */
   val taskSets = scala.collection.mutable.Buffer[TaskSet]()
@@ -82,6 +81,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
 
   var mapOutputTracker: MapOutputTrackerMaster = null
   var scheduler: DAGScheduler = null
+  var dagEventProcessTestActor: TestActorRef[DAGSchedulerEventProcessActor] = null
 
   /**
    * Set of cache locations to return from our mock BlockManagerMaster.
@@ -121,6 +121,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     results.clear()
     mapOutputTracker = new MapOutputTrackerMaster(conf)
     scheduler = new DAGScheduler(
+        sc,
         taskScheduler,
         sc.listenerBus,
         mapOutputTracker,
@@ -131,10 +132,13 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
         runLocallyWithinThread(job)
       }
     }
+    dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+      Props(classOf[DAGSchedulerEventProcessActor], scheduler))(system)
   }
 
-  after {
-    scheduler.stop()
+  override def afterAll() {
+    super.afterAll()
+    TestKit.shutdownActorSystem(system)
   }
 
   /**
@@ -178,8 +182,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
    * DAGScheduler event loop.
    */
   private def runEvent(event: DAGSchedulerEvent) {
-    assert(!scheduler.processEvent(event))
-    scheduler.submitWaitingStages()
+    dagEventProcessTestActor.receive(event)
   }
 
   /**
@@ -209,7 +212,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
       listener: JobListener = jobListener): Int = {
     val jobId = scheduler.nextJobId.getAndIncrement()
     runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener))
-    return jobId
+    jobId
   }
 
   /** Sends TaskSetFailed to the scheduler. */
@@ -223,19 +226,17 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
   }
 
   test("zero split job") {
-    val rdd = makeRdd(0, Nil)
     var numResults = 0
     val fakeListener = new JobListener() {
       override def taskSucceeded(partition: Int, value: Any) = numResults += 1
       override def jobFailed(exception: Exception) = throw exception
     }
-    submit(rdd, Array(), listener = fakeListener)
+    submit(makeRdd(0, Nil), Array(), listener = fakeListener)
     assert(numResults === 0)
   }
 
   test("run trivial job") {
-    val rdd = makeRdd(1, Nil)
-    submit(rdd, Array(0))
+    submit(makeRdd(1, Nil), Array(0))
     complete(taskSets(0), List((Success, 42)))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty
@@ -529,6 +530,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     assertDataStructuresEmpty
   }
 
+  test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes")
{
+    val actorSystem = ActorSystem("test")
+    val supervisor = actorSystem.actorOf(
+      Props(classOf[DAGSchedulerActorSupervisor], scheduler), "dagSupervisor")
+    supervisor ! Props[BuggyDAGEventProcessActor]
+    val child = expectMsgType[ActorRef]
+    watch(child)
+    child ! "hi"
+    expectMsgPF(){ case Terminated(child) => () }
+    assert(scheduler.sc.dagScheduler === null)
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
    * Note that this checks only the host and not the executor ID.
@@ -561,3 +574,4 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     assert(scheduler.waitingStages.isEmpty)
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/027f1b85/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 2fb750d..a8b605c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -305,7 +305,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with
Loggin
       override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
       override def executorAdded(execId: String, host: String) {}
     }
-
+    taskScheduler.setDAGScheduler(dagScheduler)
     // Give zero core offers. Should not generate any tasks
     val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
       new WorkerOffer("executor1", "host1", 0))


Mime
View raw message