spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue
Date Mon, 16 Nov 2015 19:43:35 GMT
Repository: spark
Updated Branches:
  refs/heads/master ace0db471 -> 24477d270


[SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue

Currently if dynamic allocation is enabled, explicitly killing executor will not get response,
so the executor metadata is wrong in driver side. Which will make dynamic allocation on Yarn
fail to work.

The problem is  `disableExecutor` returns false for pending killing executors when `onDisconnect`
is detected, so no further implementation is done.

One solution is to bypass these explicitly killed executors to use `super.onDisconnect` to
remove executor. This is simple.

Another solution is still querying the loss reason for these explicitly kill executors. Since
executor may get killed and informed in the same AM-RM communication, so current way of adding
pending loss reason request is not worked (container complete is already processed), here
we should store this loss reason for later query.

Here this PR chooses solution 2.

Please help to review. vanzin I think this part is changed by you previously, would you please
help to review? Thanks a lot.

Author: jerryshao <sshao@hortonworks.com>

Closes #9684 from jerryshao/SPARK-11718.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24477d27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24477d27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24477d27

Branch: refs/heads/master
Commit: 24477d2705bcf2a851acc241deb8376c5450dc73
Parents: ace0db4
Author: jerryshao <sshao@hortonworks.com>
Authored: Mon Nov 16 11:43:18 2015 -0800
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Mon Nov 16 11:43:18 2015 -0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  1 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |  6 ++--
 .../spark/deploy/yarn/YarnAllocator.scala       | 30 ++++++++++++++++----
 3 files changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 43d7d80..5f13669 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl(
              // If the host mapping still exists, it means we don't know the loss reason
for the
              // executor. So call removeExecutor() to update tasks running on that executor
when
              // the real loss reason is finally known.
+             logError(s"Actual reason for lost executor $executorId: ${reason.message}")
              removeExecutor(executorId, reason)
 
            case None =>

http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f71d98f..3373caf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -269,7 +269,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
      * Stop making resource offers for the given executor. The executor is marked as lost
with
      * the loss reason still pending.
      *
-     * @return Whether executor was alive.
+     * @return Whether executor should be disabled
      */
     protected def disableExecutor(executorId: String): Boolean = {
       val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
@@ -277,7 +277,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
           executorsPendingLossReason += executorId
           true
         } else {
-          false
+          // Returns true for explicitly killed executors, we also need to get pending loss
reasons;
+          // For others return false.
+          executorsPendingToRemove.contains(executorId)
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4d9e777..7e39c3e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.RackResolver
 
 import org.apache.log4j.{Level, Logger}
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
@@ -96,6 +96,10 @@ private[yarn] class YarnAllocator(
   // was lost.
   private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
 
+  // Maintain loss reasons for already released executors, it will be added when executor
loss
+  // reason is got from AM-RM call, and be removed after querying this loss reason.
+  private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
+
   // Keep track of which container is running which executor to remove the executors later
   // Visible for testing.
   private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -202,8 +206,7 @@ private[yarn] class YarnAllocator(
    */
   def killExecutor(executorId: String): Unit = synchronized {
     if (executorIdToContainer.contains(executorId)) {
-      val container = executorIdToContainer.remove(executorId).get
-      containerIdToExecutorId.remove(container.getId)
+      val container = executorIdToContainer.get(executorId).get
       internalReleaseContainer(container)
       numExecutorsRunning -= 1
     } else {
@@ -514,9 +517,18 @@ private[yarn] class YarnAllocator(
 
       containerIdToExecutorId.remove(containerId).foreach { eid =>
         executorIdToContainer.remove(eid)
-        pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
-          // Notify application of executor loss reasons so it can decide whether it should
abort
-          pendingRequests.foreach(_.reply(exitReason))
+        pendingLossReasonRequests.remove(eid) match {
+          case Some(pendingRequests) =>
+            // Notify application of executor loss reasons so it can decide whether it should
abort
+            pendingRequests.foreach(_.reply(exitReason))
+
+          case None =>
+            // We cannot find executor for pending reasons. This is because completed container
+            // is processed before querying pending result. We should store it for later
query.
+            // This is usually happened when explicitly killing a container, the result will
be
+            // returned in one AM-RM communication. So query RPC will be later than this
completed
+            // container process.
+            releasedExecutorLossReasons.put(eid, exitReason)
         }
         if (!alreadyReleased) {
           // The executor could have gone away (like no route to host, node failure, etc)
@@ -538,8 +550,14 @@ private[yarn] class YarnAllocator(
     if (executorIdToContainer.contains(eid)) {
       pendingLossReasonRequests
         .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+    } else if (releasedExecutorLossReasons.contains(eid)) {
+      // Executor is already released explicitly before getting the loss reason, so directly
send
+      // the pre-stored lost reason
+      context.reply(releasedExecutorLossReasons.remove(eid).get)
     } else {
       logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+      context.sendFailure(
+        new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message