flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hequn Cheng <chenghe...@gmail.com>
Subject Re: How to join stream and batch data in Flink?
Date Tue, 25 Sep 2018 13:45:31 GMT
Hi vino,

There are no order problems of stream-stream join in Flink. No matter what
order the elements come, stream-stream join in Flink will output results
which consistent with standard SQL semantics. I haven't read the book you
mentioned. For join, it doesn't guarantee output orders. You have to do
orderBy if you want to get ordered results.

Best, Hequn

On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1127@gmail.com> wrote:

> Hi Fabian,
>
> I may not have stated it here, and there is no semantic problem at the
> Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>
> Yes, my initial answer was not to use this form of join in this scenario,
> but Henry said he converted the table into a stream table and asked about
> the feasibility of other methods.
>
> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3:
> Derived Data, Chapter 11: Stream Processing , Stream Joins.
>
> some content :
>
> *If the ordering of events across streams is undetermined, the join
> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
> on the same input and necessarily get the same result: the events on the
> input streams may be interleaved in a different way when you run the job
> again. *
>
>
> Fabian Hueske <fhueske@gmail.com> 于2018年9月25日周二 下午8:08写道:
>
>> Hi,
>>
>> I don't think that using the current join implementation in the Table API
>> / SQL will work.
>> The non-windowed join fully materializes *both* input tables in state.
>> This is necessary, because the join needs to be able to process updates on
>> either side.
>> While this is not a problem for the fixed sized MySQL table,
>> materializing the append-only table (aka stream) is probably not what you
>> want.
>> You can also not limit idle state retention because it would remove the
>> MySQL table from state at some point.
>>
>> The only way to make it work is using a user-defined TableFunction that
>> queries the MySQL table via JDBC.
>> However, please note that these calls would be synchronous, blocking
>> calls.
>>
>> @Vino: Why do you think that the stream & stream join is not mature and
>> which problems do you see in the semantics?
>> The semantics are correct (standard SQL semantics) and in my opinion the
>> implementation is also mature.
>> However, you should not use the non-windowed join if any of the input
>> tables is ever growing because both sides must be hold in state. This is
>> not an issue of the semantics.
>>
>> Cheers,
>> Fabian
>>
>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
>> yanghua1127@gmail.com>:
>>
>>> Hi Henry,
>>>
>>> 1) I don't recommend this method very much, but you said that you expect
>>> to convert mysql table to stream and then to flink table. Under this
>>> premise, I said that you can do this by joining two stream tables. But as
>>> you know, this join depends on the time period in which the state is saved.
>>> To make it equivalent to a dimension table, you must permanently save the
>>> state of the stream table that is defined as a "dimension table." I just
>>> said that modifying the relevant configuration in Flink can do this, Not
>>> for a single table.
>>>
>>> 2) Imagine that there are one million records in two tables. The records
>>> in both tables are just beginning to stream into flink, and the records as
>>> dimension tables are not fully arrived. Therefore, your matching results
>>> may not be as accurate as directly querying Mysql.
>>>
>>> In fact, the current stream & stream join is not very mature, there are
>>> some problems in semantics, I personally recommend that you return to
>>> stream/batch (mysql) join. For more principle content, I recommend you read
>>> a book, referred to as 《DDIA》.
>>>
>>> Thanks, vino.
>>>
>>> 徐涛 <happydexutao@gmail.com> 于2018年9月25日周二 下午5:48写道:
>>>
>>>> Hi Vino,
>>>> I do not quite understand in some sentences below, would you please
>>>> help explain it a bit more detailedly?
>>>> 1. “*such as setting the state retention time of one of the tables to
>>>> be permanent*” , as I know, the state retention time is a global
>>>> config, I can not set this property per table.
>>>> 2. "*you may not be able to match the results, because the data
>>>> belonging to the mysql table is just beginning to play as a stream*”
>>>>  Why it is not able to match the results?
>>>>
>>>> Best
>>>> Henry
>>>>
>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1127@gmail.com>
写道:
>>>>
>>>> Hi Henry,
>>>>
>>>> If you have converted the mysql table to a flink stream table. In flink
>>>> table/sql, streams and stream joins can also do this, such as setting the
>>>> state retention time of one of the tables to be permanent. But when the job
>>>> is just running, you may not be able to match the results, because the data
>>>> belonging to the mysql table is just beginning to play as a stream.
>>>>
>>>> Thanks, vino.
>>>>
>>>> 徐涛 <happydexutao@gmail.com> 于2018年9月25日周二 下午5:10写道:
>>>>
>>>>> Hi Vino & Hequn,
>>>>> I am now using the table/sql API, if I import the mysql table as a
>>>>> stream then convert it into a table, it seems that it can also be a
>>>>> workaround for batch/streaming joining. May I ask what is the difference
>>>>> between the UDTF method? Does this implementation has some defects?
>>>>> Best
>>>>> Henry
>>>>>
>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghequn@gmail.com>
写道:
>>>>>
>>>>> Hi
>>>>>
>>>>> +1 for vino's answer.
>>>>> Also, this kind of join will be supported in FLINK-9712
>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check
>>>>> more details in the jira.
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1127@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Henry,
>>>>>>
>>>>>> There are three ways I can think of:
>>>>>>
>>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension
>>>>>> table;
>>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>>> 3) customize the table/sql join API/statement's implementation (and
>>>>>> change the physical plan)
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> 徐涛 <happydexutao@gmail.com> 于2018年9月21日周五 下午4:43写道:
>>>>>>
>>>>>>> Hi All,
>>>>>>>         Sometimes some “dimension table” need to be joined
from the
>>>>>>> "fact table", if data are not joined before sent to Kafka.
>>>>>>>         So if the data are joined in Flink, does the “dimension
>>>>>>> table” have to be import as a stream, or there are some other
ways can
>>>>>>> achieve it?
>>>>>>>         Thanks a lot!
>>>>>>>
>>>>>>> Best
>>>>>>> Henry
>>>>>>
>>>>>>
>>>>>
>>>>

Mime
View raw message