spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juan Rodríguez Hortalá (JIRA) <>
Subject [jira] [Updated] (SPARK-22339) Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors
Date Mon, 23 Oct 2017 23:41:00 GMT


Juan Rodríguez Hortalá updated SPARK-22339:
    Attachment: push_epoch_update-WIP.diff

> Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors
> ---------------------------------------------------------------------------------------------
>                 Key: SPARK-22339
>                 URL:
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Juan Rodríguez Hortalá
>         Attachments: push_epoch_update-WIP.diff
> When a task finishes with error due to a fetch error, then DAGScheduler unregisters the
shuffle blocks hosted by the serving executor (or even all the executors in the failing host,
with external shuffle and spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the
shuffle block directory stored by MapOutputTracker, that then increments its epoch as a result.
This event is only signaled to the other executors when a new task with a new epoch starts
in each executor. This means that other executors reading from the failed executors will retry
fetching shuffle blocks from them, even though the driver already knows those executors are
lost and those blocks are now unavailable at those locations. This impacts job runtime, specially
for long shuffles and executor failures at the end of a stage, when the only pending tasks
are shuffle reads. 
> This could be improved by pushing the epoch update to the executors without having to
wait for a new task. In the attached patch I sketch a possible solution that sends the updated
epoch from the driver to the executors by piggybacking on the executor heartbeat response.
ShuffleBlockFetcherIterator, RetryingBlockFetcher and BlockFetchingListener are modified so
blocks locations are checked on each fetch retry. This doesn't introduce additional traffic,
as MapOutputTrackerWorker.mapStatuses is shared by all tasks running on the same Executor,
and the lookup of the new shuffle blocks directory was going to happen anyway when the new
epoch is detected during the start of the next task. 
> I would like to know the opinion of the community on this approach. 

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message