spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "jin xing (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19263) DAGScheduler should avoid sending conflicting task set.
Date Tue, 07 Feb 2017 10:59:41 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

jin xing updated SPARK-19263:
-----------------------------
    Description: 
In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is *Success*, it
will first do *stage.pendingPartitions -= task.partitionId*, which maybe a bug when *FetchFailed*
happens. Think about below:

1. Stage 0 runs and generates shuffle output data.
2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks:
ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The
driver marks executorA as lost and updates failedEpoch;
4. The driver resubmits stage 0 so the missing output can be re-generated, and then once it
completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA
and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove
partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to
the set of output locations (line 1192), because the task’s epoch is less than the failure
epoch for the executor (because of the earlier failure on executor A)
6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition
1 from stage.pendingPartitions. Combined with the previous step, this means that there are
no more pending partitions for the stage, so the DAGScheduler marks the stage as finished
(line 1196). However, the shuffle stage is not available (line 1215) because the completion
for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage.
7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for
the re-submitted stage, it throws an error, because there’s an existing active task set

To reproduce the bug:
1. We need to do some modification in *ShuffleBlockFetcherIterator*: check whether the task's
index in *TaskSetManager* and stage attempt equal to 0 at the same time, if so, throw FetchFailedException;
2. Rebuild spark then submit following job:
{code}
    val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1),
(3, 1)), 2)
    rdd.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2
      }
    }.map {
      keyAndValue => {
        (keyAndValue._1 % 2, keyAndValue._2)
      }
    }.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2

      }
    }.collect
{code}

  was:
In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is *Success*, it
will first do *stage.pendingPartitions -= task.partitionId*, which maybe a bug when *FetchFailed*
happens. Think about below:
1. There are 2 executors A and B, executorA got assigned with ShuffleMapTask1 and ShuffleMapTask2;
2. ShuffleMapTask1 want's to fetch blocks from local but failed;
3. Driver receives the *FetchFailed* caused by ShuffleMapTask1 on executorA and marks executorA
as lost and updates *failedEpoch*;
4. Driver resubmits stages, containing ShuffleMapTask1x and ShuffleMapTask2x;
5. ShuffleMapTask2 is successfully finished on executorA and sends *Success* back to driver;
6. Driver receives *Success* and do *stage.pendingPartitions -= task.partitionId*, but then
driver finds task's epoch is not big enough *<= failedEpoch(execId)* and just takes it
as bogus, does not add the *MapStatus* to stage;
7. ShuffleMapTask1x is successfully finished on executorB;
8. Driver receives *Success* from ShuffleMapTask1x on executorB and does *stage.pendingPartitions
-= task.partitionId*, thus no pending partitions, but then finds not all partitions are available
because of step 6;
9. Driver resubmits stage; but at this moment ShuffleMapTask2x is still running; in *TaskSchedulerImpl
submitTasks*, it finds *conflictingTaskSet*, then throw *IllegalStateException*
10. Failed.

To reproduce the bug:
1. We need to do some modification in *ShuffleBlockFetcherIterator*: check whether the task's
index in *TaskSetManager* and stage attempt equal to 0 at the same time, if so, throw FetchFailedException;
2. Rebuild spark then submit following job:
{code}
    val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1),
(3, 1)), 2)
    rdd.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2
      }
    }.map {
      keyAndValue => {
        (keyAndValue._1 % 2, keyAndValue._2)
      }
    }.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2

      }
    }.collect
{code}


> DAGScheduler should avoid sending conflicting task set.
> -------------------------------------------------------
>
>                 Key: SPARK-19263
>                 URL: https://issues.apache.org/jira/browse/SPARK-19263
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: jin xing
>
> In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is *Success*,
it will first do *stage.pendingPartitions -= task.partitionId*, which maybe a bug when *FetchFailed*
happens. Think about below:
> 1. Stage 0 runs and generates shuffle output data.
> 2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two
tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
> 3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver.
The driver marks executorA as lost and updates failedEpoch;
> 4. The driver resubmits stage 0 so the missing output can be re-generated, and then once
it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
> 5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA
and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove
partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to
the set of output locations (line 1192), because the task’s epoch is less than the failure
epoch for the executor (because of the earlier failure on executor A)
> 6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove
partition 1 from stage.pendingPartitions. Combined with the previous step, this means that
there are no more pending partitions for the stage, so the DAGScheduler marks the stage as
finished (line 1196). However, the shuffle stage is not available (line 1215) because the
completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits
the stage.
> 7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called
for the re-submitted stage, it throws an error, because there’s an existing active task
set
> To reproduce the bug:
> 1. We need to do some modification in *ShuffleBlockFetcherIterator*: check whether the
task's index in *TaskSetManager* and stage attempt equal to 0 at the same time, if so, throw
FetchFailedException;
> 2. Rebuild spark then submit following job:
> {code}
>     val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2,
1), (3, 1)), 2)
>     rdd.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.map {
>       keyAndValue => {
>         (keyAndValue._1 % 2, keyAndValue._2)
>       }
>     }.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message