spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From squito <...@git.apache.org>
Subject [GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat...
Date Sat, 18 Mar 2017 05:52:19 GMT
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/17297
  
    I'm a bit confused by the description:
    
    > 1. When a fetch failure happens, the task set manager ask the dag scheduler to abort
all the non-running tasks. However, the running tasks in the task set are not killed.
    
    this is already true.  when there is a fetch failure, the [TaskSetManager is marked as
zombie](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala?squery=TaskSetManager#L755),
and the DAGScheduler resubmits stages, but nothing actively kills running tasks.
    
    >  re-launches all tasks in the stage with the fetch failure that hadn't completed
when the fetch failure occurred (the DAGScheduler re-lanches all of the tasks whose output
data is not available -- which is equivalent to the set of tasks that hadn't yet completed).
    
    I don't think its true that it relaunches all tasks that hadn't completed _when the fetch
failure occurred_.  it relaunches all the tasks haven't completed, by the time the stage gets
resubmitted.  More tasks can complete in between the time of the first failure, and the time
the stage is resubmitted.
    
    But there are several other potential issues you may be trying to address.
    
    Say there is stage 0 and stage 1, each one has 10 tasks.  Stage 0 completes fine on the
first attempt, then stage 1 starts.  Tasks 0 & 1 in stage 1 complete, but then there is
a fetch failure in task 2.  Lets also say we have an abundance of cluster resources so tasks
3 - 9 from stage 1, attempt 0 are still running.
    
    Stage 0 get resubmitted as attempt 1, just to regenerate the map output for whatever executor
had the data for the fetch failure -- perhaps its just one task from stage 0 that needs to
resubmitted.  Now, lots of different scenarios are possible:
    
    (a) Tasks 3 - 9 from stage 1 attempt 0 all finish successfully while stage 0 attempt 1
is running.  So when stage 0 attempt 1 finishes, then stage 1 attempt 1 is submitted, just
with Task 2.  If it completely succesfully, we're done (no wasted work).
    
    (b) stage 0 attempt 1 finishes, before tasks 3 - 9 from stage 1 attempt 0 have finished.
 So stage 1 gets submitted again as stage 1 attempt 1, with tasks 2 - 9.  So there are now
two copies running for tasks 3 - 9. Maybe all the tasks from attempt 0 actually finish shortly
after attempt 1 starts.  In this case, the stage is complete as soon as there is one complete
attempt for each task.  But even after the stage completes successfully, all the other tasks
keep running anyway.  (plenty of wasted work)
    
    (c) like (b), but shortly after stage 1 attempt 1 is submitted, we get another fetch failure
in one of the old "zombie" tasks from stage 1 attempt 0.  But the [DAGScheduler realizes it
already has a more recent attempt for this stage](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1268),
so it ignores the fetch failure.  All the other tasks keep running as usual.  If there aren't
any other issues, the stage completes when there is one completed attempt for each task. 
(same amount of wasted work as (b)).
    
    (d) While stage 0 attempt 1 is running, we get another fetch failure from stage 1 attempt
0, say in Task 3, which has a failure from a *different executor*.  Maybe its from a completely
different host (just by chance, or there may be cluster maintenance where multiple hosts are
serviced at once); or maybe its from another executor on the same host (at least, until we
do something about your other pr on unregistering all shuffle files on a host).  To be honest,
I don't understand how things work in this scenario.  We [mark stage 0 as failed](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1303),
we [unregister some shuffle output](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1328),
and [we resubmit stage 0](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/s
 park/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1319).  But stage 0 attempt 1 is still
running, so I would have expected us to end up with conflicting task sets.  Whatever the real
behavior is here, it seems we're at risk of having even more duplicated work for yet another
attempt for stage 1.
    
    etc.
    
    So I think in (b) and (c), you are trying to avoid resubmitting tasks 3-9 on stage 1 attempt
1.  the thing is, there is a strong reason to believe that the original version of those tasks
will fail.  Most likely, those tasks needs map output from the same executor that caused the
first fetch failure.  So Kay is suggesting that we take the opposite approach, and instead
actively kill the tasks from stage 1 attempt 0.  OTOH, its possible that  (i) the issue may
have been transient or (ii) the tasks already finished fetching that data before the error
occurred.  We really have no idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message