Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A83DF200C4F for ; Sat, 18 Mar 2017 06:52:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A6AB5160B8C; Sat, 18 Mar 2017 05:52:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EE6C7160B80 for ; Sat, 18 Mar 2017 06:52:20 +0100 (CET) Received: (qmail 18828 invoked by uid 500); 18 Mar 2017 05:52:19 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 18817 invoked by uid 99); 18 Mar 2017 05:52:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 18 Mar 2017 05:52:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 34E5BDFD73; Sat, 18 Mar 2017 05:52:19 +0000 (UTC) From: squito To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat... Content-Type: text/plain Message-Id: <20170318055219.34E5BDFD73@git1-us-west.apache.org> Date: Sat, 18 Mar 2017 05:52:19 +0000 (UTC) archived-at: Sat, 18 Mar 2017 05:52:21 -0000 Github user squito commented on the issue: https://github.com/apache/spark/pull/17297 I'm a bit confused by the description: > 1. When a fetch failure happens, the task set manager ask the dag scheduler to abort all the non-running tasks. However, the running tasks in the task set are not killed. this is already true. when there is a fetch failure, the [TaskSetManager is marked as zombie](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala?squery=TaskSetManager#L755), and the DAGScheduler resubmits stages, but nothing actively kills running tasks. > re-launches all tasks in the stage with the fetch failure that hadn't completed when the fetch failure occurred (the DAGScheduler re-lanches all of the tasks whose output data is not available -- which is equivalent to the set of tasks that hadn't yet completed). I don't think its true that it relaunches all tasks that hadn't completed _when the fetch failure occurred_. it relaunches all the tasks haven't completed, by the time the stage gets resubmitted. More tasks can complete in between the time of the first failure, and the time the stage is resubmitted. But there are several other potential issues you may be trying to address. Say there is stage 0 and stage 1, each one has 10 tasks. Stage 0 completes fine on the first attempt, then stage 1 starts. Tasks 0 & 1 in stage 1 complete, but then there is a fetch failure in task 2. Lets also say we have an abundance of cluster resources so tasks 3 - 9 from stage 1, attempt 0 are still running. Stage 0 get resubmitted as attempt 1, just to regenerate the map output for whatever executor had the data for the fetch failure -- perhaps its just one task from stage 0 that needs to resubmitted. Now, lots of different scenarios are possible: (a) Tasks 3 - 9 from stage 1 attempt 0 all finish successfully while stage 0 attempt 1 is running. So when stage 0 attempt 1 finishes, then stage 1 attempt 1 is submitted, just with Task 2. If it completely succesfully, we're done (no wasted work). (b) stage 0 attempt 1 finishes, before tasks 3 - 9 from stage 1 attempt 0 have finished. So stage 1 gets submitted again as stage 1 attempt 1, with tasks 2 - 9. So there are now two copies running for tasks 3 - 9. Maybe all the tasks from attempt 0 actually finish shortly after attempt 1 starts. In this case, the stage is complete as soon as there is one complete attempt for each task. But even after the stage completes successfully, all the other tasks keep running anyway. (plenty of wasted work) (c) like (b), but shortly after stage 1 attempt 1 is submitted, we get another fetch failure in one of the old "zombie" tasks from stage 1 attempt 0. But the [DAGScheduler realizes it already has a more recent attempt for this stage](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1268), so it ignores the fetch failure. All the other tasks keep running as usual. If there aren't any other issues, the stage completes when there is one completed attempt for each task. (same amount of wasted work as (b)). (d) While stage 0 attempt 1 is running, we get another fetch failure from stage 1 attempt 0, say in Task 3, which has a failure from a *different executor*. Maybe its from a completely different host (just by chance, or there may be cluster maintenance where multiple hosts are serviced at once); or maybe its from another executor on the same host (at least, until we do something about your other pr on unregistering all shuffle files on a host). To be honest, I don't understand how things work in this scenario. We [mark stage 0 as failed](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1303), we [unregister some shuffle output](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1328), and [we resubmit stage 0](https://demo.fluentcode.com/source/spark/master/master/core/src/main/scala/org/apache/s park/scheduler/DAGScheduler.scala?squery=DAgScheduler#L1319). But stage 0 attempt 1 is still running, so I would have expected us to end up with conflicting task sets. Whatever the real behavior is here, it seems we're at risk of having even more duplicated work for yet another attempt for stage 1. etc. So I think in (b) and (c), you are trying to avoid resubmitting tasks 3-9 on stage 1 attempt 1. the thing is, there is a strong reason to believe that the original version of those tasks will fail. Most likely, those tasks needs map output from the same executor that caused the first fetch failure. So Kay is suggesting that we take the opposite approach, and instead actively kill the tasks from stage 1 attempt 0. OTOH, its possible that (i) the issue may have been transient or (ii) the tasks already finished fetching that data before the error occurred. We really have no idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org