spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-8911] Fix local mode endless heartbeats
Date Tue, 14 Jul 2015 19:47:15 GMT
Repository: spark
Updated Branches:
  refs/heads/master c4e98ff06 -> 8fb3a65cb


[SPARK-8911] Fix local mode endless heartbeats

As of #7173 we expect executors to properly register with the driver before responding to
their heartbeats. This behavior is not matched in local mode. This patch adds the missing
event that needs to be posted.

Author: Andrew Or <andrew@databricks.com>

Closes #7382 from andrewor14/fix-local-heartbeat and squashes the following commits:

1258bdf [Andrew Or] Post ExecutorAdded event to local executor


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fb3a65c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fb3a65c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fb3a65c

Branch: refs/heads/master
Commit: 8fb3a65cbb714120d612e58ef9d12b0521a83260
Parents: c4e98ff
Author: Andrew Or <andrew@databricks.com>
Authored: Tue Jul 14 12:47:11 2015 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Tue Jul 14 12:47:11 2015 -0700

----------------------------------------------------------------------
 .../spark/scheduler/local/LocalBackend.scala    | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8fb3a65c/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 776e5d3..4d48fcf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -25,7 +25,8 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
-import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 
 private case class ReviveOffers()
 
@@ -50,8 +51,8 @@ private[spark] class LocalEndpoint(
 
   private var freeCores = totalCores
 
-  private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
-  private val localExecutorHostname = "localhost"
+  val localExecutorId = SparkContext.DRIVER_IDENTIFIER
+  val localExecutorHostname = "localhost"
 
   private val executor = new Executor(
     localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
@@ -99,8 +100,9 @@ private[spark] class LocalBackend(
   extends SchedulerBackend with ExecutorBackend with Logging {
 
   private val appId = "local-" + System.currentTimeMillis
-  var localEndpoint: RpcEndpointRef = null
+  private var localEndpoint: RpcEndpointRef = null
   private val userClassPath = getUserClasspath(conf)
+  private val listenerBus = scheduler.sc.listenerBus
 
   /**
    * Returns a list of URLs representing the user classpath.
@@ -113,9 +115,13 @@ private[spark] class LocalBackend(
   }
 
   override def start() {
-    localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
-      "LocalBackendEndpoint",
-      new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
+    val rpcEnv = SparkEnv.get.rpcEnv
+    val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
+    localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint)
+    listenerBus.post(SparkListenerExecutorAdded(
+      System.currentTimeMillis,
+      executorEndpoint.localExecutorId,
+      new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
   }
 
   override def stop() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message