Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 898BF18A9B for ; Tue, 16 Feb 2016 14:15:18 +0000 (UTC) Received: (qmail 39590 invoked by uid 500); 16 Feb 2016 14:15:18 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 39549 invoked by uid 500); 16 Feb 2016 14:15:18 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 39521 invoked by uid 99); 16 Feb 2016 14:15:18 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Feb 2016 14:15:18 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 200812C14F2 for ; Tue, 16 Feb 2016 14:15:18 +0000 (UTC) Date: Tue, 16 Feb 2016 14:15:18 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-2237) Add hash-based Aggregation MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148641#comment-15148641 ] ASF GitHub Bot commented on FLINK-2237: --------------------------------------- Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-184696741 Hello Fabian, I did 3. (https://github.com/apache/flink/commit/ef644821d5417b992cfda366225bd5622faa9b2b), because the machinery for that was already in place (see the condition in `compactOrThrow`). I chose the threshold to be 5%. (This can probably be the same with the solution set case, because if lengths change a lot then we get very slow as memory load gets near the total memory, so it is probably better to indicate the memory problem to the user with an exception than to silently be very slow.) I also did some changes to the tests. For 2., the situation doesn't seem straightforward to me. For example, if there are not many length changes, then exactly the opposite should be done: we should emit from the end of the record area (rather than the beginning), because if there is skew in the data, then the more common keys will appear sooner, so they tend to appear near the beginning of the record area. The other ideas are also interesting, and I would love to experiment with them, but unfortunately I don't really have that much time for this at the moment. So I would suggest to merge the non-partitioned version, and then the partitioned version can be implemented later when I or someone else has a lot of free time on their hands. (Btw., it would be very interesting to try machine learning techniques for dynamically making these decisions that involve complicated trade-offs, based on the actual data: - Have some switches which control these things like - what part of the record area to emit (begin or end; how much) - at how much fragmentation should we do compacting instead of emitting - what load factor should trigger a resize - size of bucket area - how to choose which partition to emit - maybe even do spilling also in the combiner - whether to insert prefetch instructions for the random memory accesses that will probably involve a CPU cache miss (the trade-off here is that then you have to work with multiple consecutive input records at the same time, so you have to do extra copies if object reuse is enabled, which might cost a lot) (I have actually experimented with this a little, and there were 20-35% speedups, if copies are cheap) - ... (it's easy to come up with many more) - Gather some statistics about what is happening, and turn them into features - avg. record size - #keys / #elements ratio - skew - time it takes to serialize a record - time it takes to run the ReduceFunction - ratio of updates that involve size changes - size is changing up or down on average - backpressure - that we are generating - that we get from our outputs (if this is large (eg. because of a saturated network), then we should set the switches to do more aggressive combining) - how many CPU cache misses occur while looking up keys (eg. for recognizing the situation where records with matching keys are often close to each other for some reason) - hash collisions (so that we can start with a more simple hash function (few percent speedup), and change it, if it is bad) - ... (it's easy to come up with many more) - Train some machine learning model which will figure out how to set the switches based on the features I think a pretty good speedup could result from tuning all these things to the actual data at hand. Maybe in a few years, when data flow systems get more mature, then this can become a reality.) > Add hash-based Aggregation > -------------------------- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature > Reporter: Rafiullah Momand > Assignee: Gabor Gevay > Priority: Minor > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)