flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kurt Young <ykt...@gmail.com>
Subject Re: Controlling the Materialization of JOIN updates
Date Mon, 06 Jan 2020 01:36:18 GMT
Good to hear that the patch resolved your issue, looking forward to hearing
more feedback from you!


On Mon, Jan 6, 2020 at 5:56 AM Benoît Paris <
benoit.paris@centraliens-lille.org> wrote:

> Hi Kurt,
> Thank you for your answer.
> Yes both fact tables and dimension tables are changing over time; it was
> to illustrate that they could change at the same time but that we could
> still make a JOIN basically ignore updates from one specified side. The SQL
> is not the actual one I'm using, and as you have said later on, I indeed
> don't deal with a time attribute and just want what's in the table at
> processing time.
> At the moment my problem seems to be in good way of being resolved, and it
> is going to be Option 4: "LATERAL TABLE table_function" on the Blink
> planner; as Jark Wu seems to be -elegantly- providing a patch for the
> FLINK-14200 NPE bug:
> https://github.com/apache/flink/pull/10763
> It was indeed about shenanigans on finding the proper RelOptSchema;  Ah,
> I wish I had dived sooner in the source code, and I could have had the
> pleasure opportunity to contribute to the Flink codebase.
> Anyway, shout out to Jark for resolving the bug and providing a patch! I
> believe this will be a real enabler for CQRS architectures on Flink (we had
> subscriptions with regular joins, and this patch enables querying the same
> thing with very minor SQL modifications)
> Kind regards
> Benoît
> On Sat, Jan 4, 2020 at 4:22 AM Kurt Young <ykt836@gmail.com> wrote:
>> Hi Benoît,
>> Before discussing all the options you listed, I'd like understand more
>> about your requirements.
>> The part I don't fully understand is, both your fact (Event) and
>> dimension (DimensionAtJoinTimeX) tables are
>> coming from the same table, Event or EventRawInput in your case. So it
>> will result that both your fact and
>> dimension tables are changing with time.
>> My understanding is, when your DimensionAtJoinTimeX table emit the
>> results, you don't want to change the
>> result again. You want the fact table only join whatever data currently
>> the dimension table have? I'm asking
>> because your dimension table was calculated with a window aggregation,
>> but your join logic seems doesn't
>> care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid
>> = d1.uid). It's possible that
>> when a record with uid=x comes from Event table, but the dimension table
>> doesn't have any data around
>> uid=x yet due to the window aggregation. In this case, you don't want
>> them to join?
>> Best,
>> Kurt
>> On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <
>> benoit.paris@centraliens-lille.org> wrote:
>>> Hello all!
>>> I'm trying to design a stream pipeline, and have trouble controlling
>>> when a JOIN is triggering an update:
>>> Setup:
>>>    - The Event table; "probe side", "query side", the result of earlier
>>>    stream processing
>>>    - The DimensionAtJoinTimeX tables; of updating nature, "build side",
>>>    the results of earlier stream processing
>>> Joining them:
>>> SELECT    *
>>> FROM      Event e
>>> LEFT JOIN DimensionAtJoinTime1 d1
>>>   ON      e.uid = d1.uid
>>> LEFT JOIN DimensionAtJoinTime2 d2
>>>   ON      e.uid = d2.uid
>>> The DimensionAtJoinTimeX Tables being the result of earlier stream
>>> processing, possibly from the same Event table:
>>> SELECT   uid,
>>>          hop_start(...),
>>>          sum(...)
>>> FROM     Event e
>>> GROUP BY uid,
>>>          hop(...)
>>> The Event Table being:
>>> SELECT ...
>>> FROM   EventRawInput i
>>> WHERE  i.some_field = 'some_value'
>>> Requirements:
>>>    - I need the JOINs to only be executed once, only when a new line is
>>>    appended to the probe / query / Event table.
>>>    - I also need the full pipeline to be defined in SQL.
>>>    - I very strongly prefer the Blink planner (mainly for
>>>    Deduplication, TopN and LAST_VALUE features).
>>> Problem exploration so far:
>>>    - Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution
>>>    in SQL: it doesn't work out. But I might explore the following: insert
>>>    DimensionAtJoinTimeX into a special Sink, and use it in a
>>>    LookupableTableSource (I'm at a loss on how to do that, though. Do I need
>>>    an external kv store?).
>>>    - Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a
>>>    version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have
>>>    missed something in the documentation.
>>>    - Option 3, "LATERAL TABLE table_function" [2], on the Legacy
>>>    planner: It does not work with two tables [3], and I don't get to have the
>>>    Blink planner features.
>>>    - Option 4, "LATERAL TABLE table_function" [2], on the Blink
>>>    planner: It does not work with the "probe side" being the results of
>>>    earlier stream processing [4].
>>>    - Option 5, let a regular JOIN materialize the updates, and somehow
>>>    find how to filter the ones coming from the build sides (I'm at a loss on
>>>    how to do that).
>>>    - Option 6, "TVR": I read this paper [5], which mentions
>>>    "Time-Varying Relation"s; Speculating here: could there be a way, to say
>>>    that the build side is not a TVR. Aka declare the stream as being somehow
>>>    "static", while still being updated (but I guess we're back to "FOR
>>>    - Option 7: Is there some features being developed, or hints, or
>>>    workarounds to control the JOIN updates that I have not considered so far?
>>>    - Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the
>>>    same bug nature, even though they occur in different situations on
>>>    different planners (same Exception Stack Trace on files that have the same
>>>    historical parent before the Blink fork). FLINK-15112 has a workaround, but
>>>    FLINK-14200 does not. The existence of that workaround IMHO signals that
>>>    there is a simple fix for both bugs. I have tried to find it in Flink for
>>>    few days, but no success so far. If you guys have pointers helping me
>>>    provide a fix, I'll gladly listen. So far I have progressed to: It revolves
>>>    around Calcite-based Flink streaming rules transforming a temporal table
>>>    function correlate into a Join on 2*Scan, and crashes when it encounters
>>>    something that is not a table that can be readily scanned. Also, there are
>>>    shenanigans on trying to find the right schema in the Catalog. But I am
>>>    blocked now, and not accustomed to the Flink internal code (would like to
>>>    though, if Alibaba/Ververica are recruiting remote workers, wink wink,
>>>    nudge nudge).
>>> All opinions very much welcomed on all Options and Remarks!
>>> Cheers, and a happy new year to all,
>>> Benoît
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#processing-time-temporal-joins
>>> [3] https://issues.apache.org/jira/browse/FLINK-15112
>>> [4] https://issues.apache.org/jira/browse/FLINK-14200
>>> [5] https://arxiv.org/pdf/1905.12133.pdf
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml

View raw message