flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From RKandoji <rkand...@gmail.com>
Subject Re: Duplicate tasks for the same query
Date Thu, 02 Jan 2020 16:43:03 GMT
Ok thanks, does it mean version 1.9.2 is what I need to use?

On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li <jingsonglee0@gmail.com> wrote:

> Blink planner was introduced in 1.9. We recommend use blink planner after
> 1.9.
> After some bug fix, I think the latest version of 1.9 is OK. The
> production environment has also been set up in some places.
>
> Best,
> Jingsong Lee
>
> On Wed, Jan 1, 2020 at 3:24 AM RKandoji <rkandoji@gmail.com> wrote:
>
>> Thanks Jingsong and Kurt for more details.
>>
>> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
>> version 1.9. Hopefully deduplication is done by only one task and reused
>> everywhere else.
>>
>> One more follow-up question, I see "For production use cases, we
>> recommend the old planner that was present before Flink 1.9 for now." warning
>> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>> This is actually the reason why started with version 1.8, could you
>> please let me know your opinion about this? and do you think there is any
>> production code running on version 1.9
>>
>> Thanks,
>> Reva
>>
>>
>>
>>
>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young <ykt836@gmail.com> wrote:
>>
>>> BTW, you could also have a more efficient version of deduplicating
>>> user table by using the topn feature [1].
>>>
>>> Best,
>>> Kurt
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>>
>>>
>>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <jingsonglee0@gmail.com>
>>> wrote:
>>>
>>>> Hi RKandoji,
>>>>
>>>> In theory, you don't need to do something.
>>>> First, the optimizer will optimize by doing duplicate nodes.
>>>> Second, after SQL optimization, if the optimized plan still has
>>>> duplicate nodes, the planner will automatically reuse them.
>>>> There are config options to control whether we should reuse plan, their
>>>> default value is true. So you don't need modify them.
>>>> - table.optimizer.reuse-sub-plan-enabled
>>>> - table.optimizer.reuse-source-enabled
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkandoji@gmail.com> wrote:
>>>>
>>>>> Thanks Terry and Jingsong,
>>>>>
>>>>> Currently I'm on 1.8 version using Flink planner for stream proessing,
>>>>> I'll switch to 1.9 version to try out blink planner.
>>>>>
>>>>> Could you please point me to any examples (Java preferred) using
>>>>> SubplanReuser?
>>>>>
>>>>> Thanks,
>>>>> RK
>>>>>
>>>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <jingsonglee0@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi RKandoji,
>>>>>>
>>>>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>>>>
>>>>>>        Join                      Join
>>>>>>      /      \                  /      \
>>>>>>  Filter1  Filter2          Filter1  Filter2
>>>>>>     |        |        =>       \     /
>>>>>>  Project1 Project2            Project1
>>>>>>     |        |                   |
>>>>>>   Scan1    Scan2               Scan1
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>>>>
>>>>>> Best,
>>>>>> Jingsong Lee
>>>>>>
>>>>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwangg@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi RKandoji~
>>>>>>>
>>>>>>> Could you provide more info about your poc environment?
>>>>>>> Stream or batch? Flink planner or blink planner?
>>>>>>> AFAIK, blink planner has done some optimization to deal such
>>>>>>> duplicate task for one same query. You can have a try with blink
planner :
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>>>>>
>>>>>>> Best,
>>>>>>> Terry Wang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2019年12月30日 03:07,RKandoji <rkandoji@gmail.com>
写道:
>>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> I'm doing a POC with flink to understand if it's a good fit for
my
>>>>>>> use case.
>>>>>>>
>>>>>>> As part of the process, I need to filter duplicate items and
created
>>>>>>> below query to get only the latest records based on timestamp.
For
>>>>>>> instance, I have "Users" table which may contain multiple messages
for the
>>>>>>> same "userId". So I wrote below query to get only the latest
message for a
>>>>>>> given "userId"
>>>>>>>
>>>>>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE
>>>>>>> (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp)
FROM Users
>>>>>>> GROUP BY userId)");
>>>>>>>
>>>>>>> The above query works as expected and contains only the latest
users
>>>>>>> based on timestamp.
>>>>>>>
>>>>>>> The issue is when I use "uniqueUsers" table multiple times in
a JOIN
>>>>>>> operation, I see multiple tasks in the flink dashboard for the
same query
>>>>>>> that is creating "uniqueUsers" table. It is simply creating as
many tasks
>>>>>>> as many times I'm using the table.
>>>>>>>
>>>>>>> Below is the JOIN query.
>>>>>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>>>>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>>>>>                                        LEFT JOIN uniqueUsersTbl
aa
>>>>>>> ON c.userId = aa.userId
>>>>>>>                                        LEFT JOIN uniqueUsersTbl
ab
>>>>>>> ON c.ownerId = ab.userId
>>>>>>>                                        LEFT JOIN uniqueUsersTbl
ac
>>>>>>> ON c.sellerId = ac.userId
>>>>>>>                                        LEFT JOIN uniqueUsersTbl
ad
>>>>>>> ON c.buyerId = ad.userId");
>>>>>>>
>>>>>>> Could someone please help me understand how I can avoid these
>>>>>>> duplicate tasks?
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> R Kandoji
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>
> --
> Best, Jingsong Lee
>

Mime
View raw message