spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-10124] [MESOS] Fix removing queued driver in mesos cluster mode.
Date Thu, 20 Aug 2015 02:43:37 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 16414dae0 -> a3ed2c31e


[SPARK-10124] [MESOS] Fix removing queued driver in mesos cluster mode.

Currently the spark applications can be queued to the Mesos cluster dispatcher, but when multiple
jobs are in queue we don't handle removing jobs from the buffer correctly while iterating
and causes null pointer exception.

This patch copies the buffer before iterating them, so exceptions aren't thrown when the jobs
are removed.

Author: Timothy Chen <tnachen@gmail.com>

Closes #8322 from tnachen/fix_cluster_mode.

(cherry picked from commit 73431d8afb41b93888d2642a1ce2d011f03fb740)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3ed2c31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3ed2c31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3ed2c31

Branch: refs/heads/branch-1.5
Commit: a3ed2c31e60b11c09f815b42c0cd894be3150c67
Parents: 16414da
Author: Timothy Chen <tnachen@gmail.com>
Authored: Wed Aug 19 19:43:26 2015 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Wed Aug 19 19:43:34 2015 -0700

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala    | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3ed2c31/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 64ec2b8..1206f18 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler(
       val driversToRetry = pendingRetryDrivers.filter { d =>
         d.retryState.get.nextRetry.before(currentTime)
       }
+
       scheduleTasks(
-        driversToRetry,
+        copyBuffer(driversToRetry),
         removeFromPendingRetryDrivers,
         currentOffers,
         tasks)
+
       // Then we walk through the queued drivers and try to schedule them.
       scheduleTasks(
-        queuedDrivers,
+        copyBuffer(queuedDrivers),
         removeFromQueuedDrivers,
         currentOffers,
         tasks)
@@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler(
       .foreach(o => driver.declineOffer(o.getId))
   }
 
+  private def copyBuffer(
+      buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] =
{
+    val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
+    buffer.copyToBuffer(newBuffer)
+    newBuffer
+  }
+
   def getSchedulerState(): MesosClusterSchedulerState = {
-    def copyBuffer(
-        buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription]
= {
-      val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
-      buffer.copyToBuffer(newBuffer)
-      newBuffer
-    }
     stateLock.synchronized {
       new MesosClusterSchedulerState(
         frameworkId,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message