flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sihua Zhou (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8918) Introduce Runtime Filter Join
Date Tue, 15 May 2018 09:36:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475549#comment-16475549

Sihua Zhou commented on FLINK-8918:

Hi [~fhueske], I updated the description to describe the motivation in detail, could you please
have a look at it (cause I notice that releasing of 1.5 is in its last stage now)? Do you
object this feature?

> Introduce Runtime Filter Join
> -----------------------------
>                 Key: FLINK-8918
>                 URL: https://issues.apache.org/jira/browse/FLINK-8918
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API &amp; SQL
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Major
>             Fix For: 1.6.0
> In general, stream join is one of the most performance cost task. For every record from
both side, we need to query the state from the other side, this will lead to poor performance
when the state size if huge. So, in production, we always need to spend a lot slots to handle
stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join
can be found in production. That's the `joined ratio` of the stream join is often very low,
for example.
>  - stream join in promotion analysis: Job need to join the promotion log with the action(click,
view, payment, collection, retweet) log with the `promotion_id` to analysis the effect of
the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click log with
the item payment log on the `click_id` to find which click of which AD that brings the payment
to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc viewed by
users) with the click log (doc clicked by users) to analysis the reason of the click and the
property of the users.
>  - ….so on
> All these cases have one common property, that is the _joined ratio_ is very low. Here
is a example to describe it, imagine that, we have 10000 records from the left stream, and
10000 records from the right stream, and we execute _select * from leftStream l join rightStream
r on l.id = r.id_ , we only got 100 record from the result, that is the case for low _joined
ratio_, this is an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most important
point I want to expressed is that, the low _joined ratio_ of stream join in production is
a very common phenomenon(maybe the almost common phenomenon in some companies, at least in
our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 10000 record join 10000 record we only got 100 result,
that means, we query the state 20000 times (10000 for the left stream and 10000 for the right
stream) but only 100 of them are meaningful!!! If we could reduce the useless query times,
then we can definitely improve the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_, the mainly
ideal is that, we build a _filter_ for the state on each side (left stream & right stream).
When we need to query the state on that side we first check the corresponding _filter_ whether
the _key_ is possible in the state, if the _filter_ say "not, it impossible in the state",
then we stop querying the state, if it say "hmm, it maybe in state", then we need to query
the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, it has all the
feature that we expected: _extremely good performance_, _non-existence of false negative_.
> A [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
(even though it not for stream join original, but we can easily refer it to `stream join`)
> *the simplest pseudo code for _Runtime Filter Join_(the comments inline are based on
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
>     Iterator<Record> rightIterator = rigthStreamState.iterator();
>     // perform the `seek()` on the RocksDB, and iterator one by one,
>     // this is an expensive operation especially when the key can't be found in RocksDB.
>     for (Record recordFromRightState : rightIterator) {
>         .......
>     }
> }
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
>     Iterator<Record> rightIterator = EMPTY_ITERATOR;
>     if (rigthStreamfilter.containsCurrentKey()) {
>         rightIterator = rigthStreamState.iterator();
>     }
>     // perform the `seek()` only when filter.containsCurrentKey() return true
>     for (Record recordFromRightState : rightIterator) {
>         .......
>     }
>     // add the current key into the filter of left stream.
> 	leftStreamFilter.addCurrentKey();
> }
> {code}
> A description of Runtime Filter Join for batch join can be found [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
(even though it not for stream join original, but we can easily refer it to `stream join`)

This message was sent by Atlassian JIRA

View raw message