flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3179) Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned
Date Mon, 22 Feb 2016 17:28:18 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15157338#comment-15157338
] 

ASF GitHub Bot commented on FLINK-3179:
---------------------------------------

Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1553#discussion_r53659650
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
---
    @@ -87,19 +92,39 @@ public DriverStrategy getStrategy() {
     		return DriverStrategy.SORTED_GROUP_REDUCE;
     	}
     
    -	@Override
     	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
     		if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
     			// adjust a sort (changes grouping, so it must be for this driver to combining sort
    -			if (in.getLocalStrategy() == LocalStrategy.SORT) {
    -				if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
    -					throw new RuntimeException("Bug: Inconsistent sort for group strategy.");
    +			if(in.getSource().getOptimizerNode() instanceof PartitionNode) {
    +				// Inject a combiner before the partition node
    +				Channel toCombiner = new Channel(in.getSource());
    +				toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
    +				GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
    +				combinerNode.setParallelism(in.getSource().getParallelism());
    +				if(toCombiner.getSource().getInputs().iterator().hasNext()) {
    +					Channel source = toCombiner.getSource().getInputs().iterator().next();
    +					// A combiner plan node is created with the map as the input
    +					SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
    --- End diff --
    
    Thanks for all the comments. Checking one by one. I have few questions. Let me get back
to you after fixing the other comments. Sorry about the formatting things. I will take care
from next time so that they are not repeated. 


> Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-3179
>                 URL: https://issues.apache.org/jira/browse/FLINK-3179
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.1
>            Reporter: Fabian Hueske
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or GroupReducer is
explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of keys or to
use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the {{ReduceProperties}}
and {{GroupReduceWithCombineProperties}} classes such that a combine is injected in front
of a {{PartitionPlanNode}} if it is the input of a Reduce or GroupReduce operator. This should
only happen, if the Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message