tez-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bikas Saha (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (TEZ-338) Determine reduce task parallelism
Date Mon, 05 Aug 2013 08:38:48 GMT

    [ https://issues.apache.org/jira/browse/TEZ-338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13729301#comment-13729301
] 

Bikas Saha edited comment on TEZ-338 at 8/5/13 8:38 AM:
--------------------------------------------------------

Typical approach that has worked elsewhere is for the user to over-partition and allow the
framework to reduce the number of partitions at runtime. After the partition stage has executed,
reducing the number of partitions is easier than increasing them because increasing is difficult
without re-running the partition stage again.

Approach could be the following
1) Record data output from each task and track the aggregate for the partition stage
2) Enhance the BipartiteSlowStartVertexScheduler to track the above data for the source (partition
stage). BipartiteSlowStartVertexScheduler is the object that understands shuffle-based partitioning
semantics inside Tez that is otherwise agnostic to logical data.
3) After say 25% of the source tasks have finished, extrapolate the data output to predict
the expected total data output of the source stage
4) Divide the expected total data output with desired data input per reduce vertex to determine
the number of reduce vertices. Create those many and start them when appropriate.
5) Run "user code" inside the BipartiteSlowStartScheduler to generate additional user payload
that sends information to the reduce task about the range of indices it needs to read from
the shuffle service. The index range is determined by load balancing across the reduce vertex
count calculated above. Assigning consecutive indices to a reduce task may be better for sequential
reads during shuffle. The additional user payload index information may be used to override
the initial index information set by the user.
6) In the reduce task, use the additional configuration, to read data from the shuffle service.
This may need tinkering with the existing shuffle input reader so that it reads a range of
indices instead of 1.

                
      was (Author: bikassaha):
    Typical approach that has worked elsewhere is for the user to over-partition and allow
the framework to reduce the number of partitions at runtime. After the partition stage has
executed, reducing the number of partitions is easier than increasing them because increasing
is difficult without re-running the partition stage again.

Approach could be the following
1) Record data output from each task and track the aggregate for the partition stage
2) Enhance the BipartiteSlowStartVertexScheduler to track the above data for the source (partition
stage). BipartiteSlowStartVertexScheduler is the object that understands shuffle-based partitioning
semantics inside Tez that is otherwise agnostic to logical data.
3) After say 25% of the source tasks have finished, extrapolate the data output to predict
the expected total data output of the source stage
4) Divide the expected total data output with desired data input per reduce vertex to determine
the number of reduce vertices. Create those many and start them when appropriate.
5) Run "user code" inside the BipartiteSlowStartScheduler to generate additional user payload
that sends information to the reduce task about the range of indices it needs to read from
the shuffle service. The index range is determined by load balancing across the reduce vertex
count calculated above. Assigning consecutive indices to a reduce task may be better for sequential
reads during shuffle. The additional user payload index information may be used to override
the initial index information set by the user.
6) In the reduce task, use the additional configuration, to read data from the shuffle service.
This may need tinkering with the existing shuffle input reader so that it reads a range of
indices instead of 1.

                  
> Determine reduce task parallelism
> ---------------------------------
>
>                 Key: TEZ-338
>                 URL: https://issues.apache.org/jira/browse/TEZ-338
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Bikas Saha
>              Labels: TEZ-0.2.0
>
> Determine the parallelism of reduce tasks at runtime. This is important because its difficult
to determine this accurately before the job actually runs due to unknown data reduction ratios
in the intermediate stages.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message