flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: where does flink store the intermediate results of a join and what is the key?
Date Thu, 23 Jan 2020 05:39:14 GMT
Is it a common practice to have a custom state backend? if so, what would
be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imjark@gmail.com> wrote:

> Hi Kant,
>
> 1) List of row is also sufficient in this case. Using a MapState is in
> order to retract a row faster, and save the storage size.
>
> 2) State Process API is usually used to process save point. I’m afraid the
> performance is not good to use it for querying.
>     On the other side, AFAIK, State Process API requires the uid of
> operator. However, uid of operators is not set in Table API & SQL.
>     So I’m not sure whether it works or not.
>
> 3)You can have a custom statebackend by
> implement org.apache.flink.runtime.state.StateBackend interface, and use it
> via `env.setStateBackend(…)`.
>
> Best,
> Jark
>
> On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth909@gmail.com> wrote:
>
>> Hi Jark,
>>
>> 1) shouldn't it be a col1 to List of row? multiple rows can have the same
>> joining key right?
>>
>> 2) Can I use state processor API
>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>> from an external application to query the intermediate results in near
>> real-time? I thought querying rocksdb state is a widely requested feature.
>> It would be really great to consider this feature for 1.11
>>
>> 3) Is there any interface where I can implement my own state backend?
>>
>> Thanks!
>>
>>
>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imjark@gmail.com> wrote:
>>
>>> Hi Kant,
>>>
>>> 1) Yes, it will be stored in rocksdb statebackend.
>>> 2) In old planner, the left state is the same with right state which are
>>> both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>     It is a 2-level map structure, where the `col1` is the join key, it
>>> is the first-level key of the state. The key of the MapState is the input
>>> row,
>>>     and the `count` is the number of this row, the expiredTime indicates
>>> when to cleanup this row (avoid infinite state size). You can find the
>>> source code here[1].
>>>     In blink planner, the state structure will be more complex which is
>>> determined by the meta-information of upstream. You can see the source code
>>> of blink planner here [2].
>>> 3) Currently, the intermediate state is not exposed to users. Usually,
>>> users should write the query result to an external system (like Mysql) and
>>> query the external system.
>>>     Query on the intermediate state is on the roadmap, but I guess it is
>>> not in 1.11 plan.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>> [2]:
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>
>>>
>>> 2020年1月21日 18:01,kant kodali <kanth909@gmail.com> 写道:
>>>
>>> Hi All,
>>>
>>> If I run a query like this
>>>
>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>> table1.col1 = table2.col1")
>>>
>>> 1) Where will flink store the intermediate result? Imagine
>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>
>>> 2) If the intermediate results are stored in rockdb then what is the key
>>> and value in this case(given the query above)?
>>>
>>> 3) What is the best way to query these intermediate results from an
>>> external application? while the job is running and while the job is not
>>> running?
>>>
>>> Thanks!
>>>
>>>
>>>

Mime
View raw message