spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iras...@apache.org
Subject [spark] branch master updated: [SPARK-32003][CORE] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost
Date Wed, 22 Jul 2020 14:53:57 GMT
This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e8c06af  [SPARK-32003][CORE] When external shuffle service is used, unregister outputs
for executor on fetch failure after executor is lost
e8c06af is described below

commit e8c06af7d1ab19b6478ac542cf876259cac5e13a
Author: Wing Yew Poon <wypoon@cloudera.com>
AuthorDate: Wed Jul 22 09:53:16 2020 -0500

    [SPARK-32003][CORE] When external shuffle service is used, unregister outputs for executor
on fetch failure after executor is lost
    
    ### What changes were proposed in this pull request?
    
    If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor
but does not unregister its outputs if the external shuffle service is used. However, if the
node on which the executor runs is lost, the shuffle service may not be able to serve the
shuffle files.
    In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler`
again removes the executor and by right, should unregister its outputs. It doesn't because
the epoch used to track the executor failure has not increased.
    
    We track the epoch for failed executors that result in lost file output separately, so
we can unregister the outputs in this scenario. The idea to track a second epoch is due to
Attila Zsolt Piros.
    
    ### Why are the changes needed?
    
    Without the changes, the loss of a node could require two stage attempts to recover instead
of one.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test. This test fails without the change and passes with it.
    
    Closes #28848 from wypoon/SPARK-32003.
    
    Authored-by: Wing Yew Poon <wypoon@cloudera.com>
    Signed-off-by: Imran Rashid <irashid@cloudera.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 100 ++++++++++++++-------
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |  98 ++++++++++++++++----
 2 files changed, 148 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index cb024d0..73c95d1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -170,13 +170,34 @@ private[spark] class DAGScheduler(
    */
   private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
 
-  // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent
with
-  // every task. When we detect a node failing, we note the current epoch number and failed
-  // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
-  //
-  // TODO: Garbage collect information about failure epochs when we know there are no more
-  //       stray messages to detect.
-  private val failedEpoch = new HashMap[String, Long]
+  /**
+   * Tracks the latest epoch of a fully processed error related to the given executor. (We
use
+   * the MapOutputTracker's epoch number, which is sent with every task.)
+   *
+   * When an executor fails, it can affect the results of many tasks, and we have to deal
with
+   * all of them consistently. We don't simply ignore all future results from that executor,
+   * as the failures may have been transient; but we also don't want to "overreact" to follow-
+   * on errors we receive. Furthermore, we might receive notification of a task success,
after
+   * we find out the executor has actually failed; we'll assume those successes are, in fact,
+   * simply delayed notifications and the results have been lost, if the tasks started in
the
+   * same or an earlier epoch. In particular, we use this to control when we tell the
+   * BlockManagerMaster that the BlockManager has been lost.
+   */
+  private val executorFailureEpoch = new HashMap[String, Long]
+
+  /**
+   * Tracks the latest epoch of a fully processed error where shuffle files have been lost
from
+   * the given executor.
+   *
+   * This is closely related to executorFailureEpoch. They only differ for the executor when
+   * there is an external shuffle service serving shuffle files and we haven't been notified
that
+   * the entire worker has been lost. In that case, when an executor is lost, we do not update
+   * the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor
+   * fails, we do not unregister the shuffle data as it can still be served; but if there
is
+   * a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle
+   * data only once, even if we get many fetch failures.
+   */
+  private val shuffleFileLostEpoch = new HashMap[String, Long]
 
   private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
 
@@ -1566,7 +1587,8 @@ private[spark] class DAGScheduler(
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
-            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId))
{
+            if (executorFailureEpoch.contains(execId) &&
+                smt.epoch <= executorFailureEpoch(execId)) {
               logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
             } else {
               // The epoch of the task is acceptable (i.e., the task was launched after the
most
@@ -1912,12 +1934,8 @@ private[spark] class DAGScheduler(
    * modify the scheduler's internal state. Use executorLost() to post a loss event from
outside.
    *
    * We will also assume that we've lost all shuffle blocks associated with the executor
if the
-   * executor serves its own blocks (i.e., we're not using external shuffle), the entire
executor
-   * process is lost (likely including the shuffle service), or a FetchFailed occurred, in
which
-   * case we presume all shuffle data related to this executor to be lost.
-   *
-   * Optionally the epoch during which the failure was caught can be passed to avoid allowing
-   * stray fetch failures from possibly retriggering the detection of a node as lost.
+   * executor serves its own blocks (i.e., we're not using an external shuffle service),
or the
+   * entire Standalone worker is lost.
    */
   private[scheduler] def handleExecutorLost(
       execId: String,
@@ -1933,29 +1951,44 @@ private[spark] class DAGScheduler(
       maybeEpoch = None)
   }
 
+  /**
+   * Handles removing an executor from the BlockManagerMaster as well as unregistering shuffle
+   * outputs for the executor or optionally its host.
+   *
+   * @param execId executor to be removed
+   * @param fileLost If true, indicates that we assume we've lost all shuffle blocks associated
+   *   with the executor; this happens if the executor serves its own blocks (i.e., we're
not
+   *   using an external shuffle service), the entire Standalone worker is lost, or a FetchFailed
+   *   occurred (in which case we presume all shuffle data related to this executor to be
lost).
+   * @param hostToUnregisterOutputs (optional) executor host if we're unregistering all the
+   *   outputs on the host
+   * @param maybeEpoch (optional) the epoch during which the failure was caught (this prevents
+   *   reprocessing for follow-on fetch failures)
+   */
   private def removeExecutorAndUnregisterOutputs(
       execId: String,
       fileLost: Boolean,
       hostToUnregisterOutputs: Option[String],
       maybeEpoch: Option[Long] = None): Unit = {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
-    if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
-      failedEpoch(execId) = currentEpoch
-      logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
+    logDebug(s"Considering removal of executor $execId; " +
+      s"fileLost: $fileLost, currentEpoch: $currentEpoch")
+    if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch)
{
+      executorFailureEpoch(execId) = currentEpoch
+      logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
       blockManagerMaster.removeExecutor(execId)
-      if (fileLost) {
-        hostToUnregisterOutputs match {
-          case Some(host) =>
-            logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
-            mapOutputTracker.removeOutputsOnHost(host)
-          case None =>
-            logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
-            mapOutputTracker.removeOutputsOnExecutor(execId)
-        }
-        clearCacheLocs()
-
-      } else {
-        logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
+      clearCacheLocs()
+    }
+    if (fileLost &&
+        (!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch))
{
+      shuffleFileLostEpoch(execId) = currentEpoch
+      hostToUnregisterOutputs match {
+        case Some(host) =>
+          logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
+          mapOutputTracker.removeOutputsOnHost(host)
+        case None =>
+          logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)")
+          mapOutputTracker.removeOutputsOnExecutor(execId)
       }
     }
   }
@@ -1981,11 +2014,12 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = {
-    // remove from failedEpoch(execId) ?
-    if (failedEpoch.contains(execId)) {
+    // remove from executorFailureEpoch(execId) ?
+    if (executorFailureEpoch.contains(execId)) {
       logInfo("Host added was in lost list earlier: " + host)
-      failedEpoch -= execId
+      executorFailureEpoch -= execId
     }
+    shuffleFileLostEpoch -= execId
   }
 
   private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit
= {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 70138327..664cfc8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -25,6 +25,9 @@ import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 import scala.util.control.NonFatal
 
+import org.mockito.Mockito.spy
+import org.mockito.Mockito.times
+import org.mockito.Mockito.verify
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.exceptions.TestFailedException
 import org.scalatest.time.SpanSugar._
@@ -235,6 +238,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with
TimeLi
 
   var sparkListener: EventInfoRecordingListener = null
 
+  var blockManagerMaster: BlockManagerMaster = null
   var mapOutputTracker: MapOutputTrackerMaster = null
   var broadcastManager: BroadcastManager = null
   var securityMgr: SecurityManager = null
@@ -248,17 +252,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext
with TimeLi
    */
   val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
   // stub out BlockManagerMaster.getLocations to use our cacheLocations
-  val blockManagerMaster = new BlockManagerMaster(null, null, conf, true) {
-      override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]]
= {
-        blockIds.map {
-          _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
-            getOrElse(Seq())
-        }.toIndexedSeq
-      }
-      override def removeExecutor(execId: String): Unit = {
-        // don't need to propagate to the driver, which we don't have
-      }
+  class MyBlockManagerMaster(conf: SparkConf) extends BlockManagerMaster(null, null, conf,
true) {
+    override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]]
= {
+      blockIds.map {
+        _.asRDDId.map { id => (id.rddId -> id.splitIndex)
+        }.flatMap { key => cacheLocations.get(key)
+        }.getOrElse(Seq())
+      }.toIndexedSeq
     }
+    override def removeExecutor(execId: String): Unit = {
+      // don't need to propagate to the driver, which we don't have
+    }
+  }
 
   /** The list of results that DAGScheduler has collected. */
   val results = new HashMap[Int, Any]()
@@ -276,6 +281,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext
with TimeLi
     override def jobFailed(exception: Exception): Unit = { failure = exception }
   }
 
+  class MyMapOutputTrackerMaster(
+      conf: SparkConf,
+      broadcastManager: BroadcastManager)
+    extends MapOutputTrackerMaster(conf, broadcastManager, true) {
+
+    override def sendTracker(message: Any): Unit = {
+      // no-op, just so we can stop this to avoid leaking threads
+    }
+  }
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     init(new SparkConf())
@@ -293,11 +308,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext
with TimeLi
     results.clear()
     securityMgr = new SecurityManager(conf)
     broadcastManager = new BroadcastManager(true, conf, securityMgr)
-    mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
-      override def sendTracker(message: Any): Unit = {
-        // no-op, just so we can stop this to avoid leaking threads
-      }
-    }
+    mapOutputTracker = spy(new MyMapOutputTrackerMaster(conf, broadcastManager))
+    blockManagerMaster = spy(new MyBlockManagerMaster(conf))
     scheduler = new DAGScheduler(
       sc,
       taskScheduler,
@@ -548,6 +560,56 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext
with TimeLi
     assert(mapStatus2(2).location.host === "hostB")
   }
 
+  test("SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure")
{
+    // reset the test context with the right shuffle service config
+    afterEach()
+    val conf = new SparkConf()
+    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
+    init(conf)
+
+    val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker)
+
+    submit(reduceRdd, Array(0, 1, 2))
+    // Map stage completes successfully,
+    // two tasks are run on an executor on hostA and one on an executor on hostB
+    completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", "hostB"))
+    // Now the executor on hostA is lost
+    runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as
failed")))
+    // Executor is removed but shuffle files are not unregistered
+    verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+    verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
+
+    // The MapOutputTracker has all the shuffle files
+    val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
+    assert(mapStatuses.count(_ != null) === 3)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec")
=== 2)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec")
=== 1)
+
+    // Now a fetch failure from the lost executor occurs
+    complete(taskSets(1), Seq(
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)
+    ))
+    // blockManagerMaster.removeExecutor is not called again
+    // but shuffle files are unregistered
+    verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+    verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
+
+    // Shuffle files for hostA-exec should be lost
+    assert(mapStatuses.count(_ != null) === 1)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec")
=== 0)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec")
=== 1)
+
+    // Additional fetch failure from the executor does not result in further call to
+    // mapOutputTracker.removeOutputsOnExecutor
+    complete(taskSets(1), Seq(
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 1, 0, "ignored"), null)
+    ))
+    verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
+  }
+
   test("zero split job") {
     var numResults = 0
     var failureReason: Option[Exception] = None
@@ -765,8 +827,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with
TimeLi
     complete(taskSets(1), Seq(
       (Success, 42),
       (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)))
-    // this will get called
-    // blockManagerMaster.removeExecutor("hostA-exec")
+    verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
     // ask the scheduler to try it again
     scheduler.resubmitFailedStages()
     // have the 2nd attempt pass
@@ -806,11 +867,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext
with TimeLi
       submit(reduceRdd, Array(0))
       completeShuffleMapStageSuccessfully(0, 0, 1)
       runEvent(ExecutorLost("hostA-exec", event))
+      verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
       if (expectFileLoss) {
+        verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
         intercept[MetadataFetchFailedException] {
           mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
         }
       } else {
+        verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
         assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
           HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message