beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-912) Range join in Beam
Date Wed, 09 Nov 2016 21:24:58 GMT

    [ https://issues.apache.org/jira/browse/BEAM-912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652072#comment-15652072
] 

Kenneth Knowles commented on BEAM-912:
--------------------------------------

After conversation with a colleague, we have another suggestion that I think is quite simple
and effective: Use a variation on non-merging {{FixedWindows}} of some small granularity,
replicating your B items into each window that they overlap, then do the naive full cartesian
product and filter.

{code}
class OverlapWindowFn extends WindowFn {

  private final Duration granularity = Duration.millis(GRANULARITY_TUNED_FOR_PERFORMANCE);

  void assignWindows(...) {
    ... assign elements of B to every window that they overlap with ...
    ... assign elements of A to the window that they fall into ...
  }
}
{code}

Note that you don't need to merge, so that is also much more efficient and allows optimizations.
This is also much higher level than implementing a join yourself with state.

This separates our classic questions "what are you computing?" (range join via filtered cross
product) and the "where is your data in event time?" (when do we think we can garbage collect
it)

> Range join in Beam
> ------------------
>
>                 Key: BEAM-912
>                 URL: https://issues.apache.org/jira/browse/BEAM-912
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Jingsong Lee
>            Assignee: Kenneth Knowles
>         Attachments: betweenJoin.png
>
>
> 1.We can support some data-driven trigger, so we need expose data in OnElementContext
of onElement method. 
> 2.We can support more flexible join, so we need expose buffer tag in TriggerContext,
now this buffer tag is in SystemReduceFn.
> for example: SELECT STREAM * FROM Orders AS o JOIN Shipments AS s
> ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1'
HOUR;
> link: https://issues.apache.org/jira/browse/BEAM-101



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message