spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver
Date Fri, 27 Feb 2015 02:43:24 GMT
Repository: spark
Updated Branches:
  refs/heads/master fbc469473 -> 18f209843


 [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver

If a blockManager has not send heartBeat more than 120s, BlockManagerMasterActor will remove
it. But coarseGrainedSchedulerBackend can only remove executor after an DisassociatedEvent.
 We should expireDeadHosts at HeartbeatReceiver.

Author: Hong Shen <hongshen@tencent.com>

Closes #4363 from shenh062326/my_change3 and squashes the following commits:

2c9a46a [Hong Shen] Change some code style.
1a042ff [Hong Shen] Change some code style.
2dc456e [Hong Shen] Change some code style.
d221493 [Hong Shen] Fix test failed
7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver
b904aed [Hong Shen] Fix failed test
52725af [Hong Shen] Remove assert in SparkContext.killExecutors
5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors
a858fb5 [Hong Shen] A minor change in HeartbeatReceiver
3e221d9 [Hong Shen] A minor change in HeartbeatReceiver
6bab7aa [Hong Shen] Change a code style.
07952f3 [Hong Shen] Change configs name and code style.
ce9257e [Hong Shen] Fix test failed
bccd515 [Hong Shen] Fix test failed
8e77408 [Hong Shen] Fix test failed
c1dfda1 [Hong Shen] Fix test failed
e197e20 [Hong Shen] Fix test failed
fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages
b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor
c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18f20984
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18f20984
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18f20984

Branch: refs/heads/master
Commit: 18f2098433e0bfef9497bacd601fdf098ed03eab
Parents: fbc4694
Author: Hong Shen <hongshen@tencent.com>
Authored: Thu Feb 26 18:43:23 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu Feb 26 18:43:23 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/HeartbeatReceiver.scala    | 65 ++++++++++++++++++--
 .../scala/org/apache/spark/SparkContext.scala   | 15 +++--
 .../apache/spark/scheduler/TaskScheduler.scala  |  6 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  2 +-
 .../spark/storage/BlockManagerMasterActor.scala | 36 +----------
 .../spark/storage/BlockManagerMessages.scala    |  2 -
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +
 7 files changed, 79 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 83ae57b..69178da 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -17,33 +17,86 @@
 
 package org.apache.spark
 
-import akka.actor.Actor
+import scala.concurrent.duration._
+import scala.collection.mutable
+
+import akka.actor.{Actor, Cancellable}
+
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
 import org.apache.spark.util.ActorLogReceive
 
 /**
  * A heartbeat from executors to the driver. This is a shared message used by several internal
- * components to convey liveness or execution information for in-progress tasks.
+ * components to convey liveness or execution information for in-progress tasks. It will
also 
+ * expire the hosts that have not heartbeated for more than spark.network.timeout.
  */
 private[spark] case class Heartbeat(
     executorId: String,
     taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
     blockManagerId: BlockManagerId)
 
+private[spark] case object ExpireDeadHosts 
+    
 private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
 
 /**
  * Lives in the driver to receive heartbeats from executors..
  */
-private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
+private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
   extends Actor with ActorLogReceive with Logging {
 
+  // executor ID -> timestamp of when the last heartbeat from this executor was received
+  private val executorLastSeen = new mutable.HashMap[String, Long]
+  
+  private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout", 
+    sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000
+  
+  private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
+    sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000
+  
+  private var timeoutCheckingTask: Cancellable = null
+  
+  override def preStart(): Unit = {
+    import context.dispatcher
+    timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
+      checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
+    super.preStart()
+  }
+  
   override def receiveWithLogging = {
     case Heartbeat(executorId, taskMetrics, blockManagerId) =>
-      val response = HeartbeatResponse(
-        !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
+      val unknownExecutor = !scheduler.executorHeartbeatReceived(
+        executorId, taskMetrics, blockManagerId)
+      val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+      executorLastSeen(executorId) = System.currentTimeMillis()
       sender ! response
+    case ExpireDeadHosts =>
+      expireDeadHosts()
+  }
+
+  private def expireDeadHosts(): Unit = {
+    logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
+    val now = System.currentTimeMillis()
+    for ((executorId, lastSeenMs) <- executorLastSeen) {
+      if (now - lastSeenMs > executorTimeoutMs) {
+        logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+          s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
+        scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
+          "timed out after ${now - lastSeenMs} ms"))
+        if (sc.supportDynamicAllocation) {
+          sc.killExecutor(executorId)
+        }
+        executorLastSeen.remove(executorId)
+      }
+    }
+  }
+  
+  override def postStop(): Unit = {
+    if (timeoutCheckingTask != null) {
+      timeoutCheckingTask.cancel()
+    }
+    super.postStop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 930d4be..d3948d4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -351,7 +351,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private[spark] var (schedulerBackend, taskScheduler) =
     SparkContext.createTaskScheduler(this, master)
   private val heartbeatReceiver = env.actorSystem.actorOf(
-    Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
+    Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
   @volatile private[spark] var dagScheduler: DAGScheduler = _
   try {
     dagScheduler = new DAGScheduler(this)
@@ -398,7 +398,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing",
false)
   private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
     if (dynamicAllocationEnabled) {
-      assert(master.contains("yarn") || dynamicAllocationTesting,
+      assert(supportDynamicAllocation,
         "Dynamic allocation of executors is currently only supported in YARN mode")
       Some(new ExecutorAllocationManager(this, listenerBus, conf))
     } else {
@@ -1123,6 +1123,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   }
 
   /**
+   * Return whether dynamically adjusting the amount of resources allocated to
+   * this application is supported. This is currently only available for YARN.
+   */
+  private[spark] def supportDynamicAllocation = 
+    master.contains("yarn") || dynamicAllocationTesting
+
+  /**
    * :: DeveloperApi ::
    * Register a listener to receive up-calls from events that happen during execution.
    */
@@ -1155,7 +1162,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    */
   @DeveloperApi
   override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
-    assert(master.contains("yarn") || dynamicAllocationTesting,
+    assert(supportDynamicAllocation,
       "Requesting executors is currently only supported in YARN mode")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
@@ -1173,7 +1180,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    */
   @DeveloperApi
   override def killExecutors(executorIds: Seq[String]): Boolean = {
-    assert(master.contains("yarn") || dynamicAllocationTesting,
+    assert(supportDynamicAllocation,
       "Killing executors is currently only supported in YARN mode")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>

http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index f095915..ed34186 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -73,5 +73,9 @@ private[spark] trait TaskScheduler {
    * @return An application ID
    */
   def applicationId(): String = appId
-
+  
+  /**
+   * Process a lost executor
+   */
+  def executorLost(executorId: String, reason: ExecutorLossReason): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 54f8fcf..7a9cf1c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -436,7 +436,7 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
-  def executorLost(executorId: String, reason: ExecutorLossReason) {
+  override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
     var failedExecutor: Option[String] = None
 
     synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 6413346..787b0f9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
 import scala.concurrent.Future
 import scala.concurrent.duration._
 
-import akka.actor.{Actor, ActorRef, Cancellable}
+import akka.actor.{Actor, ActorRef}
 import akka.pattern.ask
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
@@ -52,19 +52,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
 
   private val akkaTimeout = AkkaUtils.askTimeout(conf)
 
-  val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)
-
-  val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs",
60000)
-
-  var timeoutCheckingTask: Cancellable = null
-
-  override def preStart() {
-    import context.dispatcher
-    timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
-      checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
-    super.preStart()
-  }
-
   override def receiveWithLogging = {
     case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
       register(blockManagerId, maxMemSize, slaveActor)
@@ -118,14 +105,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf,
listenerBus
 
     case StopBlockManagerMaster =>
       sender ! true
-      if (timeoutCheckingTask != null) {
-        timeoutCheckingTask.cancel()
-      }
       context.stop(self)
 
-    case ExpireDeadHosts =>
-      expireDeadHosts()
-
     case BlockManagerHeartbeat(blockManagerId) =>
       sender ! heartbeatReceived(blockManagerId)
 
@@ -207,21 +188,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf,
listenerBus
     logInfo(s"Removing block manager $blockManagerId")
   }
 
-  private def expireDeadHosts() {
-    logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
-    val now = System.currentTimeMillis()
-    val minSeenTime = now - slaveTimeout
-    val toRemove = new mutable.HashSet[BlockManagerId]
-    for (info <- blockManagerInfo.values) {
-      if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
-        logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart
beats: "
-          + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
-        toRemove += info.blockManagerId
-      }
-    }
-    toRemove.foreach(removeBlockManager)
-  }
-
   private def removeExecutor(execId: String) {
     logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
     blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)

http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 3f32099..4824745 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -109,6 +109,4 @@ private[spark] object BlockManagerMessages {
     extends ToBlockManagerMaster
 
   case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-  case object ExpireDeadHosts extends ToBlockManagerMaster
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4bf7f9e..30119ce 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -96,6 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     }
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
     override def defaultParallelism() = 2
+    override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
   }
 
   /** Length of time to wait while draining listener events. */
@@ -386,6 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with
LocalSpar
       override def defaultParallelism() = 2
       override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
         blockManagerId: BlockManagerId): Boolean = true
+      override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
     }
     val noKillScheduler = new DAGScheduler(
       sc,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message