Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BC49A1828A for ; Wed, 27 Jan 2016 16:34:52 +0000 (UTC) Received: (qmail 97511 invoked by uid 500); 27 Jan 2016 16:34:40 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 97460 invoked by uid 500); 27 Jan 2016 16:34:40 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 97434 invoked by uid 99); 27 Jan 2016 16:34:40 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jan 2016 16:34:40 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id E03812C14F0 for ; Wed, 27 Jan 2016 16:34:39 +0000 (UTC) Date: Wed, 27 Jan 2016 16:34:39 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-3179) Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15119716#comment-15119716 ] ASF GitHub Bot commented on FLINK-3179: --------------------------------------- GitHub user ramkrish86 opened a pull request: https://github.com/apache/flink/pull/1553 FLINK-3179 Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned (Ram) Followed the guidance given in the description in order to fix this. Is the approach correct here? Also using this to learn the code. Once we see that a partition node is the input of a reduce node or group reduce node - we try to inject the combiner to the source node (the data source node) and the reducer node will take the actual partition node as the input. So now the structure would be DataSource->Combine->Partition->Reduce. Suggestions and feedback welcome as am not sure if I have covered all the cases here. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ramkrish86/flink FLINK-3179 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1553 ---- commit ccf6e18418ac9cae1c27a5ff4399ea188b03bc0b Author: ramkrishna Date: 2016-01-27T16:27:17Z FLINK-3179 Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned (Ram) ---- > Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned > --------------------------------------------------------------------------------- > > Key: FLINK-3179 > URL: https://issues.apache.org/jira/browse/FLINK-3179 > Project: Flink > Issue Type: Bug > Components: Optimizer > Affects Versions: 0.10.1 > Reporter: Fabian Hueske > Assignee: ramkrishna.s.vasudevan > Priority: Critical > Fix For: 1.0.0, 0.10.2 > > > The optimizer does not inject a combiner if the input of a Reducer or GroupReducer is explicitly partitioned as in the following example > {code} > DataSet> words = ... > DataSet> counts = words > .partitionByHash(0) > .groupBy(0) > .sum(1); > {code} > Explicit partitioning can be useful to enforce partitioning on a subset of keys or to use a different partitioning method (custom or range partitioning). > This issue should be fixed by changing the {{instantiate()}} methods of the {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such that a combine is injected in front of a {{PartitionPlanNode}} if it is the input of a Reduce or GroupReduce operator. This should only happen, if the Reducer is the only successor of the Partition operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)