beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (BEAM-912) Range join in Beam
Date Tue, 08 Nov 2016 05:11:58 GMT

    [ https://issues.apache.org/jira/browse/BEAM-912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646524#comment-15646524
] 

Kenneth Knowles edited comment on BEAM-912 at 11/8/16 5:11 AM:
---------------------------------------------------------------

Very good point. My idea does not work for you. Actually the difficult part of your problem
isn't really windowing. Windows are how we decide to trigger and evict data when allowed lateness
is past - that is why {{BoundedWindow}} only has a maximum timestamp. But your problem is
challenging if we just talk about doing a range join in Beam, where only equi-joins on shared
keys are in the SDK/model. A naive way to do it is a full cross product and then filter the
data that matches. Some prominent big data products still do this, even though it is very
inefficient.

In Beam one way we could make this more efficient is with a side input (see https://s.apache.org/beam-side-inputs-1-pager)
that supports range queries. Then collection B would be the main input, and collection A would
be the side inputs. The windowing of each of them would be just like the {{assignWindows}}
above, but with no merging. Then, because side inputs are only eventually consistent, you
would want to trigger collection B after some allowed lateness, and then downstream join it
with A and whatever elements from A had arrived would be the ones that are joined. In cases
where the side input is small or bounded, you may be able to do this in memory or on local
disk.

Triggers cannot solve this problem, and they are not designed to do your main computation
like this - they only control when results get output (sometimes we say "materialized") from
a {{GroupByKey}} or {{Combine}} - see https://s.apache.org/beam-triggers.

The solution you describe is how you might use a stateful {{ParDo}} to implement a per-key
range join, though I think the need for a data structure that supports range queries will
be the same to support out of order data. Your {{onElement}} becomes {{@ProcessElement}} and
{{BagState}} is the same. The state will be partitioned by key and window, so your example
would be globally windowed. Using state in your {{DoFn}} is something I am working on right
now, so you should follow BEAM-25 for updates on when it is ready for you to use, and check
out https://s.apache.org/beam-state.


was (Author: kenn):
Very good point. My idea does not work for you. Actually the difficult part of your problem
isn't really windowing. Windows are how we decide to trigger and evict data when allowed lateness
is past - that is why {{BoundedWindow}} only has a maximum timestamp. But your problem is
challenging if we just talk about doing a range join in Beam, where only equi-joins on shared
keys are in the SDK/model. A naive way to do it is a full cross product and then filter the
data that matches. Some prominent big data products still do this, even though it is very
inefficient.

In Beam one way we could make this more efficient is with a side input (see https://s.apache.org/beam-side-inputs-1-pager)
that supports range queries. Then collection B would be the main input, and collection A would
be the side inputs. The windowing of each of them would be just like the {{assignWindows}}
above, but with no merging. Then, because side inputs are only eventually consistent, you
would want to trigger collection B after some allowed lateness, and then downstream join it
with A and whatever elements from A had arrived would be the ones that are joined.

Triggers cannot solve this problem, and they are not designed to do your main computation
like this - they only control when results get output (sometimes we say "materialized") from
a {{GroupByKey}} or {{Combine}} - see https://s.apache.org/beam-triggers.

The solution you describe is how you might use a stateful {{ParDo}} to implement a per-key
range join, though I think the need for a data structure that supports range queries will
be the same to support out of order data. Your {{onElement}} becomes {{@ProcessElement}} and
{{BagState}} is the same. The state will be partitioned by key and window, so your example
would be globally windowed. Using state in your {{DoFn}} is something I am working on right
now, so you should follow BEAM-25 for updates on when it is ready for you to use, and check
out https://s.apache.org/beam-state.

> Range join in Beam
> ------------------
>
>                 Key: BEAM-912
>                 URL: https://issues.apache.org/jira/browse/BEAM-912
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Jingsong Lee
>            Assignee: Kenneth Knowles
>         Attachments: betweenJoin.png
>
>
> 1.We can support some data-driven trigger, so we need expose data in OnElementContext
of onElement method. 
> 2.We can support more flexible join, so we need expose buffer tag in TriggerContext,
now this buffer tag is in SystemReduceFn.
> for example: SELECT STREAM * FROM Orders AS o JOIN Shipments AS s
> ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1'
HOUR;
> link: https://issues.apache.org/jira/browse/BEAM-101



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message