spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [08/20] git commit: Changed local backend to use Akka actor
Date Wed, 25 Dec 2013 00:35:39 GMT
Changed local backend to use Akka actor


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c64690d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c64690d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c64690d7

Branch: refs/heads/master
Commit: c64690d7252248df97bbe4b2bef8f540b977842d
Parents: 46f9c6b
Author: Kay Ousterhout <kayousterhout@gmail.com>
Authored: Thu Nov 14 09:34:56 2013 -0800
Committer: Kay Ousterhout <kayousterhout@gmail.com>
Committed: Thu Nov 14 09:34:56 2013 -0800

----------------------------------------------------------------------
 .../spark/scheduler/local/LocalBackend.scala    | 80 ++++++++++++++------
 1 file changed, 57 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c64690d7/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 3e9d31c..d9b941d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -21,21 +21,26 @@ import java.nio.ByteBuffer
 
 import akka.actor.{Actor, ActorRef, Props}
 
-import org.apache.spark.{SparkContext, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
 import org.apache.spark.scheduler.{SchedulerBackend, ClusterScheduler, WorkerOffer}
 
+private case class ReviveOffers()
+
+private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+
+private case class KillTask(taskId: Long)
+
 /**
- * LocalBackend is used when running a local version of Spark where the executor, backend,
and
- * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching
tasks
- * on a single Executor (created by the LocalBackend) running locally.
- *
- * THREADING: Because methods can be called both from the Executor and the TaskScheduler,
and
- * because the Executor class is not thread safe, all methods are synchronized.
+ * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the
calls on
+ * LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
+ * and the ClusterScheduler.
  */
-private[spark] class LocalBackend(scheduler: ClusterScheduler, private val totalCores: Int)
-  extends SchedulerBackend with ExecutorBackend {
+private[spark] class LocalActor(
+  scheduler: ClusterScheduler,
+  executorBackend: LocalBackend,
+  private val totalCores: Int) extends Actor with Logging {
 
   private var freeCores = totalCores
 
@@ -44,31 +49,60 @@ private[spark] class LocalBackend(scheduler: ClusterScheduler, private
val total
 
   val executor = new Executor(localExecutorId, localExecutorHostname, Seq.empty, isLocal
= true)
 
-  override def start() {
-  }
+  def receive = {
+    case ReviveOffers =>
+      reviveOffers()
 
-  override def stop() {
+    case StatusUpdate(taskId, state, serializedData) =>
+      scheduler.statusUpdate(taskId, state, serializedData)
+      if (TaskState.isFinished(state)) {
+        freeCores += 1
+        reviveOffers()
+      }
+
+    case KillTask(taskId) =>
+      executor.killTask(taskId)
   }
 
-  override def reviveOffers() = synchronized {
-   val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
+  def reviveOffers() {
+    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
     for (task <- scheduler.resourceOffers(offers).flatten) {
       freeCores -= 1
-      executor.launchTask(this, task.taskId, task.serializedTask)
+      executor.launchTask(executorBackend, task.taskId, task.serializedTask)
     }
   }
+}
+
+/**
+ * LocalBackend is used when running a local version of Spark where the executor, backend,
and
+ * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching
tasks
+ * on a single Executor (created by the LocalBackend) running locally.
+ */
+private[spark] class LocalBackend(scheduler: ClusterScheduler, private val totalCores: Int)
+  extends SchedulerBackend with ExecutorBackend {
+
+  var localActor: ActorRef = null
+
+  override def start() {
+    localActor = SparkEnv.get.actorSystem.actorOf(
+      Props(new LocalActor(scheduler, this, totalCores)),
+      "LocalBackendActor")
+  }
+
+  override def stop() {
+  }
+
+  override def reviveOffers() {
+   localActor ! ReviveOffers
+  }
 
   override def defaultParallelism() = totalCores
 
-  override def killTask(taskId: Long, executorId: String) = synchronized {
-    executor.killTask(taskId)
+  override def killTask(taskId: Long, executorId: String) {
+    localActor ! KillTask(taskId)
   }
 
-  override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) =
synchronized {
-    scheduler.statusUpdate(taskId, state, serializedData)
-    if (TaskState.isFinished(state)) {
-      freeCores += 1
-      reviveOffers()
-    }
+  override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
+    localActor ! StatusUpdate(taskId, state, serializedData)
   }
 }


Mime
View raw message