spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongj...@apache.org
Subject [spark] branch master updated: [SPARK-26192][MESOS] Retrieve enableFetcherCache option from submission for driver URIs
Date Mon, 04 Mar 2019 20:11:21 GMT
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fd4d74  [SPARK-26192][MESOS] Retrieve enableFetcherCache option from submission
for driver URIs
5fd4d74 is described below

commit 5fd4d7499c9f2925268d84b5d74ecafaebe2113d
Author: mwlon <mloncaric@hmc.edu>
AuthorDate: Mon Mar 4 12:10:48 2019 -0800

    [SPARK-26192][MESOS] Retrieve enableFetcherCache option from submission for driver URIs
    
    ## What changes were proposed in this pull request?
    
    Retrieve enableFetcherCache option from submission conf rather than dispatcher conf. This
resolves some confusing behavior where Spark drivers currently get this conf from the dispatcher,
whereas Spark executors get this conf from the submission. After this change, the conf will
only need to be specified once.
    
    ## How was this patch tested?
    
    With (updated) existing tests.
    
    Closes #23924 from mwlon/SPARK-26192.
    
    Authored-by: mwlon <mloncaric@hmc.edu>
    Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
---
 .../cluster/mesos/MesosClusterScheduler.scala      |  3 ++-
 .../cluster/mesos/MesosClusterSchedulerSuite.scala | 28 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index cd5bd53..a527783 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -131,7 +131,6 @@ private[spark] class MesosClusterScheduler(
   private val queuedCapacity = conf.get(config.MAX_DRIVERS)
   private val retainedDrivers = conf.get(config.RETAINED_DRIVERS)
   private val maxRetryWaitTime = conf.get(config.CLUSTER_RETRY_WAIT_MAX_SECONDS)
-  private val useFetchCache = conf.get(config.ENABLE_FETCHER_CACHE)
   private val schedulerState = engineFactory.createEngine("scheduler")
   private val stateLock = new Object()
   // Keyed by submission id
@@ -428,6 +427,8 @@ private[spark] class MesosClusterScheduler(
   }
 
   private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
+    val useFetchCache = desc.conf.get(config.ENABLE_FETCHER_CACHE) ||
+        conf.get(config.ENABLE_FETCHER_CACHE)
     val confUris = (conf.get(config.URIS_TO_DOWNLOAD) ++
       desc.conf.get(config.URIS_TO_DOWNLOAD) ++
       desc.conf.get(SUBMIT_PYTHON_FILES)).toList
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index f26ff04..81b5250 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -256,6 +256,31 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext
wi
   }
 
   test("supports setting fetcher cache") {
+    setScheduler()
+
+    val mem = 1000
+    val cpu = 1
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", mem, cpu, true,
+        command,
+        Map(config.EXECUTOR_HOME.key -> "test",
+          config.ENABLE_FETCHER_CACHE.key -> "true",
+          "spark.app.name" -> "test"),
+        "s1",
+        new Date()))
+
+    assert(response.success)
+
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, List(offer).asJava)
+
+    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+    val uris = launchedTasks.head.getCommand.getUrisList
+    assert(uris.asScala.forall(_.getCache))
+  }
+
+  test("supports setting fetcher cache on the dispatcher") {
     setScheduler(Map(config.ENABLE_FETCHER_CACHE.key -> "true"))
 
     val mem = 1000
@@ -280,7 +305,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext
wi
   }
 
   test("supports disabling fetcher cache") {
-    setScheduler(Map(config.ENABLE_FETCHER_CACHE.key -> "false"))
+    setScheduler()
 
     val mem = 1000
     val cpu = 1
@@ -289,6 +314,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext
wi
       new MesosDriverDescription("d1", "jar", mem, cpu, true,
         command,
         Map(config.EXECUTOR_HOME.key -> "test",
+          config.ENABLE_FETCHER_CACHE.key -> "false",
           "spark.app.name" -> "test"),
         "s1",
         new Date()))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message