flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arvid Heise <ar...@ververica.com>
Subject Re: where does flink store the intermediate results of a join and what is the key?
Date Tue, 28 Jan 2020 11:18:54 GMT
Yes, the default is writing to an external system. Especially if you want
SQL, then there is currently no other way around it.

The drawbacks of writing to external systems are: additional maintenance of
another system and higher latency.

On Tue, Jan 28, 2020 at 11:49 AM kant kodali <kanth909@gmail.com> wrote:

> Hi Arvid,
>
> I am trying to understand your statement. I am new to Flink so excuse me
> if I don't know something I should have known. ProcessFunction just process
> the records right? If so, how is it better than writing to an external
> system? At the end of the day I want to be able to query it (doesn't have
> to be through Queryable state and actually I probably don't want to use
> Queryable state for its limitations). But ideally I want to be able to
> query the intermediate states using SQL and hopefully, the store that is
> maintaining the intermediate state has some sort of index support so the
> read queries are faster than doing the full scan.
>
> Also, I hear Querying intermediate state just like one would in a database
> is a widely requested feature so its a bit surprising that this is not
> solved just yet but I am hopeful!
>
> Thanks!
>
>
>
> On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise <arvid@ververica.com> wrote:
>
>> Hi Kant,
>>
>> just wanted to mention the obvious. If you add a ProcessFunction right
>> after the join, you could maintain a user state with the same result. That
>> will of course blow up the data volume by a factor of 2, but may still be
>> better than writing to an external system.
>>
>> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
>> benoit.paris@centraliens-lille.org> wrote:
>>
>>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>>> changed.
>>> Thanks for the details, Jark!
>>>
>>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <imjark@gmail.com> wrote:
>>>
>>>> Hi Kant,
>>>> Having a custom state backend is very difficult and is not recommended.
>>>>
>>>> Hi Benoît,
>>>> Yes, the "Query on the intermediate state is on the roadmap" I
>>>> mentioned is referring to integrate Table API & SQL with Queryable State.
>>>> We also have an early issue FLINK-6968 to tracks this.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>>>> benoit.paris@centraliens-lille.org> wrote:
>>>>
>>>>> Hi all!
>>>>>
>>>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>>>> on the intermediate state is on the roadmap"?
>>>>> Are you referring to working on
>>>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>>>
>>>>> Cheers
>>>>> Ben
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>>>
>>>>>
>>>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <kanth909@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Is it a common practice to have a custom state backend? if so, what
>>>>>> would be a popular custom backend?
>>>>>>
>>>>>> Can I do Elasticseatch as a state backend?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imjark@gmail.com>
wrote:
>>>>>>
>>>>>>> Hi Kant,
>>>>>>>
>>>>>>> 1) List of row is also sufficient in this case. Using a MapState
is
>>>>>>> in order to retract a row faster, and save the storage size.
>>>>>>>
>>>>>>> 2) State Process API is usually used to process save point. I’m
>>>>>>> afraid the performance is not good to use it for querying.
>>>>>>>     On the other side, AFAIK, State Process API requires the
uid of
>>>>>>> operator. However, uid of operators is not set in Table API &
SQL.
>>>>>>>     So I’m not sure whether it works or not.
>>>>>>>
>>>>>>> 3)You can have a custom statebackend by
>>>>>>> implement org.apache.flink.runtime.state.StateBackend interface,
and use it
>>>>>>> via `env.setStateBackend(…)`.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth909@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jark,
>>>>>>>>
>>>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can
have
>>>>>>>> the same joining key right?
>>>>>>>>
>>>>>>>> 2) Can I use state processor API
>>>>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>>>>> from an external application to query the intermediate results
in near
>>>>>>>> real-time? I thought querying rocksdb state is a widely requested
feature.
>>>>>>>> It would be really great to consider this feature for 1.11
>>>>>>>>
>>>>>>>> 3) Is there any interface where I can implement my own state
>>>>>>>> backend?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imjark@gmail.com>
wrote:
>>>>>>>>
>>>>>>>>> Hi Kant,
>>>>>>>>>
>>>>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>>>>> 2) In old planner, the left state is the same with right
state
>>>>>>>>> which are both `<col1, MapState<Row, Tuple2<count,
expiredTime>>>`.
>>>>>>>>>     It is a 2-level map structure, where the `col1` is
the join
>>>>>>>>> key, it is the first-level key of the state. The key
of the MapState is the
>>>>>>>>> input row,
>>>>>>>>>     and the `count` is the number of this row, the expiredTime
>>>>>>>>> indicates when to cleanup this row (avoid infinite state
size). You can
>>>>>>>>> find the source code here[1].
>>>>>>>>>     In blink planner, the state structure will be more
complex
>>>>>>>>> which is determined by the meta-information of upstream.
You can see the
>>>>>>>>> source code of blink planner here [2].
>>>>>>>>> 3) Currently, the intermediate state is not exposed to
users.
>>>>>>>>> Usually, users should write the query result to an external
system (like
>>>>>>>>> Mysql) and query the external system.
>>>>>>>>>     Query on the intermediate state is on the roadmap,
but I guess
>>>>>>>>> it is not in 1.11 plan.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>>>>>> [2]:
>>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2020年1月21日 18:01,kant kodali <kanth909@gmail.com>
写道:
>>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> If I run a query like this
>>>>>>>>>
>>>>>>>>> StreamTableEnvironment.sqlQuery("select * from table1
join table2
>>>>>>>>> on table1.col1 = table2.col1")
>>>>>>>>>
>>>>>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>>>>>
>>>>>>>>> 2) If the intermediate results are stored in rockdb then
what is
>>>>>>>>> the key and value in this case(given the query above)?
>>>>>>>>>
>>>>>>>>> 3) What is the best way to query these intermediate results
from
>>>>>>>>> an external application? while the job is running and
while the job is not
>>>>>>>>> running?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>> --
>>>>> Benoît Paris
>>>>> Ingénieur Machine Learning Explicable
>>>>> Tél : +33 6 60 74 23 00
>>>>> http://benoit.paris
>>>>> http://explicable.ml
>>>>>
>>>>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>

Mime
View raw message