flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3179) Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned
Date Fri, 29 Jan 2016 13:30:39 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15123471#comment-15123471

ASF GitHub Bot commented on FLINK-3179:

Github user fhueske commented on the pull request:

    You identified the right classes and methods for the fix, but the place within the method
is not correct. Let me explain the issue.
    In the common case as for example in a WordCount program, the operator order looks like
    [Map] --hash-partition--> [Reduce]
    in this case, a combiner will be append to the Map to reduce the data before it is partitioned
over the network. This looks like:
    [Map] --local-fwd--> [Combine] --hash-partition--> [Reduce]
    In some cases, Flink knows that the data is already appropriately partitioned (e.g. after
a join):
    [Join] --local-fwd--> [Reduce]
    in this case, the data is already local and no combiner needs to injected. The check is
based on the shipping strategy of the input channel (this is the `if` case in `instantiate()`).
    In case of an explicit partition operator, the operators look as follows:
    [Map] --partition--> [Partition] --local-fwd--> [Reduce]
    hence, the code enters the `if` case, because the input shipping strategy is `FORWARD`.
    Instead we would like to inject a combiner between Map and Partition as follows:
    [Map] --local-fwd--> [Combine] --partition--> [Partition] --local-fwd--> [Reduce]
    Hence, we should adapt the condition to inject a combiner if the input strategy of Reduce
is `FORWARD` and the input operator is a `PartitionNode`.
    We should add appropriate tests for this feature. I suggest:
    - a unit test case in `GroupReduceCompilationTest`
    - a unit test case in `ReduceCompilationTest`
    - an end-to-end integration test in `javaApiOperators.GroupReduceITCase` 
    - an end-to-end integration test in `javaApiOperators.ReduceITCase` 

> Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned
> ---------------------------------------------------------------------------------
>                 Key: FLINK-3179
>                 URL: https://issues.apache.org/jira/browse/FLINK-3179
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.1
>            Reporter: Fabian Hueske
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.0.0, 0.10.2
> The optimizer does not inject a combiner if the input of a Reducer or GroupReducer is
explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of keys or to
use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the {{ReduceProperties}}
and {{GroupReduceWithCombineProperties}} classes such that a combine is injected in front
of a {{PartitionPlanNode}} if it is the input of a Reduce or GroupReduce operator. This should
only happen, if the Reducer is the only successor of the Partition operator.

This message was sent by Atlassian JIRA

View raw message