spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stavros Kontopoulos (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-23423) Application declines any offers when killed+active executors rich spark.dynamicAllocation.maxExecutors
Date Wed, 14 Feb 2018 23:57:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-23423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364861#comment-16364861
] 

Stavros Kontopoulos edited comment on SPARK-23423 at 2/14/18 11:56 PM:
-----------------------------------------------------------------------

Hi [~igor.berman]. Looking at the code again I think when there is a status update tasksIds
of dead tasks are removed:

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L732]

Slaves are not removed but task Ids are, maybe something else is not working. Do you have
a log at the time of the issue to attach?

The test you have is ok but I think it does not trigger deletion for the tasks in the case
of a failure. I think you need to update the backend with task status:

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala#L102-L103]

 

The following test passes:

{{{code:scala}}}

test("max executors registered stops to accept offers when dynamic allocation enabled")

{ setBackend(Map( "spark.dynamicAllocation.maxExecutors" -> "1", "spark.dynamicAllocation.enabled"
-> "true", "spark.dynamicAllocation.testing" -> "true")) backend.doRequestTotalExecutors(1)
val (mem, cpu) = (backend.executorMemory(sc), 4) val offer1 = createOffer("o1", "s1", mem,
cpu) backend.resourceOffers(driver, List(offer1).asJava) verifyTaskLaunched(driver, "o1")
backend.doKillExecutors(List("0")) verify(driver, times(1)).killTask(createTaskId("0")) val
status = createTaskStatus("0", "s1", TaskState.TASK_KILLED) backend.statusUpdate(driver, status)
val offer2 = createOffer("o2", "s2", mem, cpu) backend.resourceOffers(driver, List(offer2).asJava)
// verify(driver, times(1)).declineOffer(offer2.getId) val taskInfos = verifyTaskLaunched(driver,
"o2") assert(taskInfos.length == 1) }

{{{code}}}

Btw the behavior for checking the upper limit of the num of the executors you are referring
to is defined in different places: 

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L354]

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573]

The latter exists for very long time. The former was added with Spark-16944. Essentially they
do check the same thing. 

 


was (Author: skonto):
Hi [~igor.berman]. Looking at the code again I think when there is a status update tasksIds
of dead tasks are removed:

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L732]

Slaves are not removed but task Ids are, maybe something else is not working. Do you have
a log at the time of the issue to attach?

The test you have is ok but I think it does not trigger deletion for the tasks in the case
of a failure. I think you need to update the backend with task status:

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala#L102-L103]

 

The following test passes:

```

test("max executors registered stops to accept offers when dynamic allocation enabled") {
 setBackend(Map(
 "spark.dynamicAllocation.maxExecutors" -> "1",
 "spark.dynamicAllocation.enabled" -> "true",
 "spark.dynamicAllocation.testing" -> "true"))

 backend.doRequestTotalExecutors(1)

 val (mem, cpu) = (backend.executorMemory(sc), 4)

 val offer1 = createOffer("o1", "s1", mem, cpu)
 backend.resourceOffers(driver, List(offer1).asJava)
 verifyTaskLaunched(driver, "o1")

 backend.doKillExecutors(List("0"))
 verify(driver, times(1)).killTask(createTaskId("0"))

 val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
 backend.statusUpdate(driver, status)

 val offer2 = createOffer("o2", "s2", mem, cpu)
 backend.resourceOffers(driver, List(offer2).asJava)
// verify(driver, times(1)).declineOffer(offer2.getId)
 val taskInfos = verifyTaskLaunched(driver, "o2")
 assert(taskInfos.length == 1)
 }

```

Btw the behavior for checking the upper limit of the num of the executors you are referring
to is defined in different places: 

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L354]

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573]

The latter exists for very long time. The former was added with Spark-16944. Essentially they
do check the same thing. 

 

> Application declines any offers when killed+active executors rich spark.dynamicAllocation.maxExecutors
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23423
>                 URL: https://issues.apache.org/jira/browse/SPARK-23423
>             Project: Spark
>          Issue Type: Bug
>          Components: Mesos, Spark Core
>    Affects Versions: 2.2.1
>            Reporter: Igor Berman
>            Priority: Major
>
> Hi
> I've noticed rather strange behavior of MesosCoarseGrainedSchedulerBackend when running
on Mesos with dynamic allocation on and limiting number of max executors by spark.dynamicAllocation.maxExecutors.
> Suppose we have long running driver that has cyclic pattern of resource consumption(with
some idle times in between), due to dyn.allocation it receives offers and then releases them
after current chunk of work processed.
> Since at [https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573] the
backend compares numExecutors < executorLimit and 
> numExecutors is defined as slaves.values.map(_.taskIDs.size).sum and slaves holds all slaves
ever "met", i.e. both active and killed (see comment [https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L122)] 
> On the other hand, number of taskIds should be updated due to statusUpdate, but suppose
this update is lost(actually I don't see logs of 'is now TASK_KILLED') so this number of executors
might be wrong
>  
> I've created test that "reproduces" this behavior, not sure how good it is:
> {code:java}
> //MesosCoarseGrainedSchedulerBackendSuite
> test("max executors registered stops to accept offers when dynamic allocation enabled")
{
>   setBackend(Map(
>     "spark.dynamicAllocation.maxExecutors" -> "1",
>     "spark.dynamicAllocation.enabled" -> "true",
>     "spark.dynamicAllocation.testing" -> "true"))
>   backend.doRequestTotalExecutors(1)
>   val (mem, cpu) = (backend.executorMemory(sc), 4)
>   val offer1 = createOffer("o1", "s1", mem, cpu)
>   backend.resourceOffers(driver, List(offer1).asJava)
>   verifyTaskLaunched(driver, "o1")
>   backend.doKillExecutors(List("0"))
>   verify(driver, times(1)).killTask(createTaskId("0"))
>   val offer2 = createOffer("o2", "s2", mem, cpu)
>   backend.resourceOffers(driver, List(offer2).asJava)
>   verify(driver, times(1)).declineOffer(offer2.getId)
> }{code}
>  
>  
> Workaround: Don't set maxExecutors with dynamicAllocation on
>  
> Please advice
> Igor
> marking you friends since you were last to touch this piece of code and probably can
advice something([~vanzin], [~skonto], [~susanxhuynh])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message