spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-20832][CORE] Standalone master should explicitly inform drivers of worker deaths and invalidate external shuffle service outputs
Date Thu, 22 Jun 2017 12:48:19 GMT
Repository: spark
Updated Branches:
  refs/heads/master 97b307c87 -> 2dadea95c


[SPARK-20832][CORE] Standalone master should explicitly inform drivers of worker deaths and
invalidate external shuffle service outputs

## What changes were proposed in this pull request?

In standalone mode, master should explicitly inform each active driver of any worker deaths,
so the invalid external shuffle service outputs on the lost host would be removed from the
shuffle mapStatus, thus we can avoid future `FetchFailure`s.

## How was this patch tested?
Manually tested by the following steps:
1. Start a standalone Spark cluster with one driver node and two worker nodes;
2. Run a Job with ShuffleMapStage, ensure the outputs distribute on each worker;
3. Run another Job to make all executors exit, but the workers are all alive;
4. Kill one of the workers;
5. Run rdd.collect(), before this change, we should see `FetchFailure`s and failed Stages,
while after the change, the job should complete without failure.

Before the change:
![image](https://user-images.githubusercontent.com/4784782/27335366-c251c3d6-55fe-11e7-99dd-d1fdcb429210.png)

After the change:
![image](https://user-images.githubusercontent.com/4784782/27335393-d1c71640-55fe-11e7-89ed-bd760f1f39af.png)

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18362 from jiangxb1987/removeWorker.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dadea95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dadea95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dadea95

Branch: refs/heads/master
Commit: 2dadea95c8e2c727e97fca91b0060f666fc0c65b
Parents: 97b307c
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Authored: Thu Jun 22 20:48:12 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Thu Jun 22 20:48:12 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala |  2 ++
 .../deploy/client/StandaloneAppClient.scala     |  4 +++
 .../client/StandaloneAppClientListener.scala    |  8 ++++--
 .../org/apache/spark/deploy/master/Master.scala | 15 ++++++----
 .../apache/spark/scheduler/DAGScheduler.scala   | 30 ++++++++++++++++++++
 .../spark/scheduler/DAGSchedulerEvent.scala     |  3 ++
 .../apache/spark/scheduler/TaskScheduler.scala  |  5 ++++
 .../spark/scheduler/TaskSchedulerImpl.scala     |  5 ++++
 .../cluster/CoarseGrainedClusterMessage.scala   |  3 ++
 .../cluster/CoarseGrainedSchedulerBackend.scala | 25 +++++++++++++---
 .../cluster/StandaloneSchedulerBackend.scala    |  5 ++++
 .../spark/deploy/client/AppClientSuite.scala    |  2 ++
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 ++
 .../scheduler/ExternalClusterManagerSuite.scala |  1 +
 14 files changed, 98 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c1a91c2..49a319a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -158,6 +158,8 @@ private[deploy] object DeployMessages {
 
   case class ApplicationRemoved(message: String)
 
+  case class WorkerRemoved(id: String, host: String, message: String)
+
   // DriverClient <-> Master
 
   case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index 93f58ce..757c930 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -182,6 +182,10 @@ private[spark] class StandaloneAppClient(
           listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
         }
 
+      case WorkerRemoved(id, host, message) =>
+        logInfo("Master removed worker %s: %s".format(id, message))
+        listener.workerRemoved(id, host, message)
+
       case MasterChanged(masterRef, masterWebUiUrl) =>
         logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
         master = Some(masterRef)

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index 64255ec..d8bc1a8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.deploy.client
 
 /**
- * Callbacks invoked by deploy client when various events happen. There are currently four
events:
- * connecting to the cluster, disconnecting, being given an executor, and having an executor
- * removed (either due to failure or due to revocation).
+ * Callbacks invoked by deploy client when various events happen. There are currently five
events:
+ * connecting to the cluster, disconnecting, being given an executor, having an executor
removed
+ * (either due to failure or due to revocation), and having a worker removed.
  *
  * Users of this API should *not* block inside the callback methods.
  */
@@ -38,4 +38,6 @@ private[spark] trait StandaloneAppClientListener {
 
   def executorRemoved(
       fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
+
+  def workerRemoved(workerId: String, host: String, message: String): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f10a412..c192a0c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -498,7 +498,7 @@ private[deploy] class Master(
   override def onDisconnected(address: RpcAddress): Unit = {
     // The disconnected client could've been either a worker or an app; remove whichever
it was
     logInfo(s"$address got disassociated, removing it.")
-    addressToWorker.get(address).foreach(removeWorker)
+    addressToWorker.get(address).foreach(removeWorker(_, s"${address} got disassociated"))
     addressToApp.get(address).foreach(finishApplication)
     if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery()
}
   }
@@ -544,7 +544,8 @@ private[deploy] class Master(
     state = RecoveryState.COMPLETING_RECOVERY
 
     // Kill off any workers and apps that didn't respond to us.
-    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
+    workers.filter(_.state == WorkerState.UNKNOWN).foreach(
+      removeWorker(_, "Not responding for recovery"))
     apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
 
     // Update the state of recovered apps to RUNNING
@@ -755,7 +756,7 @@ private[deploy] class Master(
       if (oldWorker.state == WorkerState.UNKNOWN) {
         // A worker registering from UNKNOWN implies that the worker was restarted during
recovery.
         // The old worker must thus be dead, so we will remove it and accept the new worker.
-        removeWorker(oldWorker)
+        removeWorker(oldWorker, "Worker replaced by a new worker with same address")
       } else {
         logInfo("Attempted to re-register worker at same address: " + workerAddress)
         return false
@@ -771,7 +772,7 @@ private[deploy] class Master(
     true
   }
 
-  private def removeWorker(worker: WorkerInfo) {
+  private def removeWorker(worker: WorkerInfo, msg: String) {
     logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
     worker.setState(WorkerState.DEAD)
     idToWorker -= worker.id
@@ -795,6 +796,10 @@ private[deploy] class Master(
         removeDriver(driver.id, DriverState.ERROR, None)
       }
     }
+    logInfo(s"Telling app of lost worker: " + worker.id)
+    apps.filterNot(completedApps.contains(_)).foreach { app =>
+      app.driver.send(WorkerRemoved(worker.id, worker.host, msg))
+    }
     persistenceEngine.removeWorker(worker)
   }
 
@@ -979,7 +984,7 @@ private[deploy] class Master(
       if (worker.state != WorkerState.DEAD) {
         logWarning("Removing %s because we got no heartbeat in %d seconds".format(
           worker.id, WORKER_TIMEOUT_MS / 1000))
-        removeWorker(worker)
+        removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
       } else {
         if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS))
{
           workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough;
cull it

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fafe9ca..3422a5f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -260,6 +260,13 @@ class DAGScheduler(
   }
 
   /**
+   * Called by TaskScheduler implementation when a worker is removed.
+   */
+  def workerRemoved(workerId: String, host: String, message: String): Unit = {
+    eventProcessLoop.post(WorkerRemoved(workerId, host, message))
+  }
+
+  /**
    * Called by TaskScheduler implementation when a host is added.
    */
   def executorAdded(execId: String, host: String): Unit = {
@@ -1432,6 +1439,26 @@ class DAGScheduler(
     }
   }
 
+  /**
+   * Responds to a worker being removed. This is called inside the event loop, so it assumes
it can
+   * modify the scheduler's internal state. Use workerRemoved() to post a loss event from
outside.
+   *
+   * We will assume that we've lost all shuffle blocks associated with the host if a worker
is
+   * removed, so we will remove them all from MapStatus.
+   *
+   * @param workerId identifier of the worker that is removed.
+   * @param host host of the worker that is removed.
+   * @param message the reason why the worker is removed.
+   */
+  private[scheduler] def handleWorkerRemoved(
+      workerId: String,
+      host: String,
+      message: String): Unit = {
+    logInfo("Shuffle files lost for worker %s on host %s".format(workerId, host))
+    mapOutputTracker.removeOutputsOnHost(host)
+    clearCacheLocs()
+  }
+
   private[scheduler] def handleExecutorAdded(execId: String, host: String) {
     // remove from failedEpoch(execId) ?
     if (failedEpoch.contains(execId)) {
@@ -1727,6 +1754,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler:
DAGScheduler
       }
       dagScheduler.handleExecutorLost(execId, workerLost)
 
+    case WorkerRemoved(workerId, host, message) =>
+      dagScheduler.handleWorkerRemoved(workerId, host, message)
+
     case BeginEvent(task, taskInfo) =>
       dagScheduler.handleBeginEvent(task, taskInfo)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index cda0585..3f8d563 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -86,6 +86,9 @@ private[scheduler] case class ExecutorAdded(execId: String, host: String)
extend
 private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason)
   extends DAGSchedulerEvent
 
+private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String)
+  extends DAGSchedulerEvent
+
 private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
   extends DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 3de7d1f..90644fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -90,6 +90,11 @@ private[spark] trait TaskScheduler {
   def executorLost(executorId: String, reason: ExecutorLossReason): Unit
 
   /**
+   * Process a removed worker
+   */
+  def workerRemoved(workerId: String, host: String, message: String): Unit
+
+  /**
    * Get an application's attempt ID associated with the job.
    *
    * @return An application's Attempt ID

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 629cfc7..bba0b29 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -569,6 +569,11 @@ private[spark] class TaskSchedulerImpl private[scheduler](
     }
   }
 
+  override def workerRemoved(workerId: String, host: String, message: String): Unit = {
+    logInfo(s"Handle removed worker $workerId: $message")
+    dagScheduler.workerRemoved(workerId, host, message)
+  }
+
   private def logExecutorLoss(
       executorId: String,
       hostPort: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 6b49bd6..89a9ad6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -85,6 +85,9 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
     extends CoarseGrainedClusterMessage
 
+  case class RemoveWorker(workerId: String, host: String, message: String)
+    extends CoarseGrainedClusterMessage
+
   case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage
 
   // Exchanged between the driver and the AM in Yarn client mode

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index dc82bb7..0b396b7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -219,6 +219,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
         removeExecutor(executorId, reason)
         context.reply(true)
 
+      case RemoveWorker(workerId, host, message) =>
+        removeWorker(workerId, host, message)
+        context.reply(true)
+
       case RetrieveSparkAppConfig =>
         val reply = SparkAppConfig(sparkProperties,
           SparkEnv.get.securityManager.getIOEncryptionKey())
@@ -231,8 +235,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
       val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
         // Filter out executors under killing
         val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
-        val workOffers = activeExecutors.map { case (id, executorData) =>
-          new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
+        val workOffers = activeExecutors.map {
+          case (id, executorData) =>
+            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
         }.toIndexedSeq
         scheduler.resourceOffers(workOffers)
       }
@@ -331,6 +336,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
       }
     }
 
+    // Remove a lost worker from the cluster
+    private def removeWorker(workerId: String, host: String, message: String): Unit = {
+      logDebug(s"Asked to remove worker $workerId with reason $message")
+      scheduler.workerRemoved(workerId, host, message)
+    }
+
     /**
      * Stop making resource offers for the given executor. The executor is marked as lost
with
      * the loss reason still pending.
@@ -449,8 +460,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
    */
   protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
     // Only log the failure since we don't care about the result.
-    driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { case t =>
-      logError(t.getMessage, t)
+    driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure {
+      case t => logError(t.getMessage, t)
+    }(ThreadUtils.sameThread)
+  }
+
+  protected def removeWorker(workerId: String, host: String, message: String): Unit = {
+    driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure {
+      case t => logError(t.getMessage, t)
     }(ThreadUtils.sameThread)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 0529fe9..fd8e644 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -161,6 +161,11 @@ private[spark] class StandaloneSchedulerBackend(
     removeExecutor(fullId.split("/")(1), reason)
   }
 
+  override def workerRemoved(workerId: String, host: String, message: String): Unit = {
+    logInfo("Worker %s removed: %s".format(workerId, message))
+    removeWorker(workerId, host, message)
+  }
+
   override def sufficientResourcesRegistered(): Boolean = {
     totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 936639b..a1707e6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -214,6 +214,8 @@ class AppClientSuite
         id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
= {
       execRemovedList.add(id)
     }
+
+    def workerRemoved(workerId: String, host: String, message: String): Unit = {}
   }
 
   /** Create AppClient and supporting objects */

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index ddd3281..453be26 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -131,6 +131,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with
Timeou
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
     override def defaultParallelism() = 2
     override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+    override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
     override def applicationAttemptId(): Option[String] = None
   }
 
@@ -632,6 +633,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with
Timeou
           accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
           blockManagerId: BlockManagerId): Boolean = true
       override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+      override def workerRemoved(workerId: String, host: String, message: String): Unit =
{}
       override def applicationAttemptId(): Option[String] = None
     }
     val noKillScheduler = new DAGScheduler(

http://git-wip-us.apache.org/repos/asf/spark/blob/2dadea95/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index ba56af8..a4e4ea7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -84,6 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler {
   override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
   override def defaultParallelism(): Int = 2
   override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+  override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
   override def applicationAttemptId(): Option[String] = None
   def executorHeartbeatReceived(
       execId: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message