tez-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rohini Palaniswamy (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (TEZ-3126) Log reason for not reducing parallelism
Date Tue, 23 Feb 2016 17:51:18 GMT

    [ https://issues.apache.org/jira/browse/TEZ-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15159291#comment-15159291
] 

Rohini Palaniswamy commented on TEZ-3126:
-----------------------------------------

bq. Wouldn't that break the max data per reducer limit? Ignoring the min data hint may be
fine but ignoring the max data limit could result in failure because it may break operator
assumptions (eg. size of hash table etc.). Say the reducer was designed to handle 1G of data
and we send it 1.7G instead.
  Currently code is blinding assuming all reducers have equal data and combining consecutive
basePartitionRange number of reducers into one reducer. This already is sending more than
desiredTaskSize data to some reducers ignoring the max data limit and empty data to other
reducers when filters are used and data is skewed which is very inefficient. Proper fix for
this is to bucket according to size as discussed in previous comments and combine reducers
based on that. basePartitionRange should allowed to be a fraction to group better, but if
we are bucketing and grouping on size basePartitionRange will not be required anymore as partitioning
is not based on ranges.

> Log reason for not reducing parallelism
> ---------------------------------------
>
>                 Key: TEZ-3126
>                 URL: https://issues.apache.org/jira/browse/TEZ-3126
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Jonathan Eagles
>            Assignee: Jonathan Eagles
>            Priority: Critical
>             Fix For: 0.7.1, 0.8.3
>
>         Attachments: TEZ-3126.1.patch, TEZ-3126.2.patch
>
>
> For example, when reducing parallelism from 36 to 22. The basePartitionRange will be
1 and will not re-configure the vertex.
> {code:java|title=ShuffleVertexManager#determineParallelismAndApply|borderStyle=dashed|bgColor=lightgrey}
>     int desiredTaskParallelism = 
>         (int)(
>             (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
>             desiredTaskInputDataSize);
>     if(desiredTaskParallelism < minTaskParallelism) {
>       desiredTaskParallelism = minTaskParallelism;
>     }
>     
>     if(desiredTaskParallelism >= currentParallelism) {
>       return true;
>     }
>     
>     // most shufflers will be assigned this range
>     basePartitionRange = currentParallelism/desiredTaskParallelism;
>     
>     if (basePartitionRange <= 1) {
>       // nothing to do if range is equal 1 partition. shuffler does it by default
>       return true;
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message