flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2237) Add hash-based Aggregation
Date Tue, 16 Feb 2016 14:15:18 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148641#comment-15148641

ASF GitHub Bot commented on FLINK-2237:

Github user ggevay commented on the pull request:

    Hello Fabian,
    I did 3. (https://github.com/apache/flink/commit/ef644821d5417b992cfda366225bd5622faa9b2b),
because the machinery for that was already in place (see the condition in `compactOrThrow`).
I chose the threshold to be 5%. (This can probably be the same with the solution set case,
because if lengths change a lot then we get very slow as memory load gets near the total memory,
so it is probably better to indicate the memory problem to the user with an exception than
to silently be very slow.)
    I also did some changes to the tests.
    For 2., the situation doesn't seem straightforward to me. For example, if there are not
many length changes, then exactly the opposite should be done: we should emit from the end
of the record area (rather than the beginning), because if there is skew in the data, then
the more common keys will appear sooner, so they tend to appear near the beginning of the
record area.
    The other ideas are also interesting, and I would love to experiment with them, but unfortunately
I don't really have that much time for this at the moment. So I would suggest to merge the
non-partitioned version, and then the partitioned version can be implemented later when I
or someone else has a lot of free time on their hands.
    (Btw., it would be very interesting to try machine learning techniques for dynamically
making these decisions that involve complicated trade-offs, based on the actual data:
    - Have some switches which control these things like
      - what part of the record area to emit (begin or end; how much)
      - at how much fragmentation should we do compacting instead of emitting
      - what load factor should trigger a resize
      - size of bucket area
      - how to choose which partition to emit
      - maybe even do spilling also in the combiner
      - whether to insert prefetch instructions for the random memory accesses that will probably
involve a CPU cache miss (the trade-off here is that then you have to work with multiple consecutive
input records at the same time, so you have to do extra copies if object reuse is enabled,
which might cost a lot) (I have actually experimented with this a little, and there were 20-35%
speedups, if copies are cheap)
      - ... (it's easy to come up with many more)
    - Gather some statistics about what is happening, and turn them into features
      - avg. record size
      - #keys / #elements ratio
      - skew
      - time it takes to serialize a record
      - time it takes to run the ReduceFunction
      - ratio of updates that involve size changes
      - size is changing up or down on average
      - backpressure
        - that we are generating
        - that we get from our outputs (if this is large (eg. because of a saturated network),
then we should set the switches to do more aggressive combining)
      - how many CPU cache misses occur while looking up keys (eg. for recognizing the situation
where records with matching keys are often close to each other for some reason)
      - hash collisions (so that we can start with a more simple hash function (few percent
speedup), and change it, if it is bad)
      - ... (it's easy to come up with many more)
    - Train some machine learning model which will figure out how to set the switches based
on the features
    I think a pretty good speedup could result from tuning all these things to the actual
data at hand.
    Maybe in a few years, when data flow systems get more mature, then this can become a reality.)

> Add hash-based Aggregation
> --------------------------
>                 Key: FLINK-2237
>                 URL: https://issues.apache.org/jira/browse/FLINK-2237
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Rafiullah Momand
>            Assignee: Gabor Gevay
>            Priority: Minor
> Aggregation functions at the moment are implemented in a sort-based way.
> How can we implement hash based Aggregation for Flink?

This message was sent by Atlassian JIRA

View raw message