Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 95A4118E24 for ; Tue, 19 Jan 2016 18:19:32 +0000 (UTC) Received: (qmail 9941 invoked by uid 500); 19 Jan 2016 18:19:32 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 9900 invoked by uid 500); 19 Jan 2016 18:19:32 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 9891 invoked by uid 99); 19 Jan 2016 18:19:32 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jan 2016 18:19:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id DACC6C32A6 for ; Tue, 19 Jan 2016 18:19:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.98 X-Spam-Level: X-Spam-Status: No, score=0.98 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 6-S7xg6fjkA0 for ; Tue, 19 Jan 2016 18:19:25 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id A7A9342A21 for ; Tue, 19 Jan 2016 18:19:24 +0000 (UTC) Received: (qmail 9213 invoked by uid 99); 19 Jan 2016 18:19:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jan 2016 18:19:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7FF19DFEFE; Tue, 19 Jan 2016 18:19:23 +0000 (UTC) From: ggevay To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner... Content-Type: text/plain Message-Id: <20160119181923.7FF19DFEFE@git1-us-west.apache.org> Date: Tue, 19 Jan 2016 18:19:23 +0000 (UTC) Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-172939642 Hello @fhueske, thanks for the comment and shepherding the PR! > You said, the combiner flushes the full hash table if it runs out of memory. Do you think it would be possible to track the update frequency of buckets and only flush the bucket with the least updates (or n buckets with least updates)? This might improve the performance for skewed input data. If we flush only some of the elements when the memory is full, then the append position of the record area doesn't change, but only some holes appear in the record area, so we immediately need to do a compaction. Since the compaction traverses the entire table, this idea could only work if we flush a substantial amount of the elements (like half of them). This is an interesting idea, but I'm finding it quite hard to form even just an intuitive understanding about its performance effects. I mean doing this would have some overhead, and at the moment I totally can't see how much skew in the data would make this worth it (or what percentage of the data should be flushed at a time, etc.). I'll think about this some more, but getting right the trade-offs here would probably require quite a lot of work: experimentation with different variants of the algorithm, with differently skewed data, with different `distinct keys / total number of input elements` ratios, so we should probably postpone this to some later time, after this basic version is merged. > I agree that the sort-based strategy for reduce combiners can be removed eventually, when the hash-based strategy proves to work well. I would like to give it a bit more exposure though, before we drop the code. OK. > Porting the built-in aggregation functions, distinct, etc. from GroupReduceFunctions to ReduceFunctions sounds good. I think the reason for this design decision was that for the sort-based strategies ReduceFunctions did not had a benefit over GroupReduceFunctions. Instead they caused more method invocations (once for each input record) compared to once per key. I see, thanks! I'll keep this in mind for now, and open a JIRA when the dust settles around the hash-based combiner, and we see the performance differences more clearly. > It would be great if you could open a JIRA and add a short design document / API proposal for the changes on the serializers that you talked about. This would allow the community to review and discuss your proposal. OK, I will. Best, Gábor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---