spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject spark git commit: [SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl
Date Wed, 05 Sep 2018 21:10:53 GMT
Repository: spark
Updated Branches:
  refs/heads/master 925449283 -> 559b899ac


[SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl

Running a large Spark job with speculation turned on was causing executor heartbeats to time
out on the driver end after sometime and eventually, after hitting the max number of executor
failures, the job would fail.

## What changes were proposed in this pull request?

The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread
was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop
threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further
analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl
class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object,
we can remove that lock and make the operations to the global variables inside the code block
to be atomic. The block of code in that method only uses  one global HashMap taskIdToTaskSetManager.
Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding
up the heartbeat receiver thread operation.

## How was this patch tested?

Screenshots of the thread dump have been attached below:
**heartbeat-receiver-event-loop-thread:**

<img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png">

**dispatcher-event-loop-thread:**

<img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png">

Closes #22221 from pgandhi999/SPARK-25231.

Authored-by: pgandhi <pgandhi@oath.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/559b899a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/559b899a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/559b899a

Branch: refs/heads/master
Commit: 559b899aceb160fcec3a57109c0b60a0ae40daeb
Parents: 9254492
Author: pgandhi <pgandhi@oath.com>
Authored: Wed Sep 5 16:10:49 2018 -0500
Committer: Thomas Graves <tgraves@apache.org>
Committed: Wed Sep 5 16:10:49 2018 -0500

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/TaskSchedulerImpl.scala  | 12 ++++++------
 .../cluster/CoarseGrainedSchedulerBackend.scala         |  2 +-
 .../spark/scheduler/SchedulerIntegrationSuite.scala     |  3 ++-
 .../apache/spark/scheduler/TaskSchedulerImplSuite.scala |  6 +++---
 4 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8992d7e..8b71170 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 import java.util.{Locale, Timer, TimerTask}
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.Set
@@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl(
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
 
   // Protected by `this`
-  private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]
+  private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
   val taskIdToExecutorId = new HashMap[Long, String]
 
   @volatile private var hasReceivedTask = false
@@ -315,7 +315,7 @@ private[spark] class TaskSchedulerImpl(
           for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
             tasks(i) += task
             val tid = task.taskId
-            taskIdToTaskSetManager(tid) = taskSet
+            taskIdToTaskSetManager.put(tid, taskSet)
             taskIdToExecutorId(tid) = execId
             executorIdToRunningTaskIds(execId).add(tid)
             availableCpus(i) -= CPUS_PER_TASK
@@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
     var reason: Option[ExecutorLossReason] = None
     synchronized {
       try {
-        taskIdToTaskSetManager.get(tid) match {
+        Option(taskIdToTaskSetManager.get(tid)) match {
           case Some(taskSet) =>
             if (state == TaskState.LOST) {
               // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling
mode,
@@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl(
       accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
       blockManagerId: BlockManagerId): Boolean = {
     // (taskId, stageId, stageAttemptId, accumUpdates)
-    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized
{
+    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = {
       accumUpdates.flatMap { case (id, updates) =>
         val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
-        taskIdToTaskSetManager.get(id).map { taskSetMgr =>
+        Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
           (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 747e8c7..de7c0d8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -290,7 +290,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
       for (task <- tasks.flatten) {
         val serializedTask = TaskDescription.encode(task)
         if (serializedTask.limit() >= maxRpcMessageSize) {
-          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
+          Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr
=>
             try {
               var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: "
+
                 "spark.rpc.message.maxSize (%d bytes). Consider increasing " +

http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index cea7f17..2d409d9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -400,7 +400,8 @@ private[spark] abstract class MockBackend(
       // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
       // tests from introducing a race if they need it.
       val newTasks = newTaskDescriptions.map { taskDescription =>
-        val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
+        val taskSet =
+          Option(taskScheduler.taskIdToTaskSetManager.get(taskDescription.taskId).taskSet).get
         val task = taskSet.tasks(taskDescription.index)
         (taskDescription, task)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 7a457a0..9e1d13e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -248,7 +248,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
with B
     taskScheduler.submitTasks(attempt2)
     val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten
     assert(1 === taskDescriptions3.length)
-    val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get
+    val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId)).get
     assert(mgr.taskSet.stageAttemptId === 1)
     assert(!failedTaskSet)
   }
@@ -286,7 +286,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
with B
     assert(10 === taskDescriptions3.length)
 
     taskDescriptions3.foreach { task =>
-      val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get
+      val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(task.taskId)).get
       assert(mgr.taskSet.stageAttemptId === 1)
     }
     assert(!failedTaskSet)
@@ -724,7 +724,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
with B
     // only schedule one task because of locality
     assert(taskDescs.size === 1)
 
-    val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get
+    val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId)).get
     assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY))
     // we should know about both executors, even though we only scheduled tasks on one of
them
     assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0")))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message