flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kurt Young <ykt...@gmail.com>
Subject Re: Duplicate tasks for the same query
Date Mon, 06 Jan 2020 01:33:20 GMT
Another common skew case we've seen is null handling, the value of the join
key
is NULL. We will shuffle the NULL value into one task even if the join
condition
won't stand by definition.

For DeDuplication, I just want to make sure this behavior meets your
requirement.
Because for some other usages, users might be only interested with the
earliest
records because the updating for the same key is purely redundant, like
caused by
upstream failure and process the same data again. In that case, each key
will only have
at most one record and you won't face any join key skewing issue.

Best,
Kurt


On Mon, Jan 6, 2020 at 6:55 AM RKandoji <rkandoji@gmail.com> wrote:

> Hi Kurt,
>
> I understand what you mean, some userIds may appear more frequently than
> the others but this distribution doesn't look in proportionate with the
> data skew. Do you think of any other possible reasons or anything I can try
> out to investigate this more?
>
> For DeDuplication, I query for the latest record. Sorry I didn't follow
> above sentence, do you mean that for each update to user table the
> record(s) that were updated will be sent via retract stream.I think that's
> expected as I need to process latest records, as long as it is sending only
> the record(s) that's been updated.
>
> Thanks,
> RKandoji
>
> On Fri, Jan 3, 2020 at 9:57 PM Kurt Young <ykt836@gmail.com> wrote:
>
>> Hi RKandoji,
>>
>> It looks like you have a data skew issue with your input data. Some or
>> maybe only one "userId" appears more frequent than others. For join
>> operator to work correctly, Flink will apply "shuffle by join key" before
>> the
>> operator, so same "userId" will go to the same sub-task to perform join
>> operation. In this case, I'm afraid there is nothing much you can do for
>> now.
>>
>> BTW, for the DeDuplicate, do you keep the latest record or the earliest?
>> If
>> you keep the latest version, Flink will tigger retraction and then send
>> the latest
>> record again every time when your user table changes.
>>
>> Best,
>> Kurt
>>
>>
>> On Sat, Jan 4, 2020 at 5:09 AM RKandoji <rkandoji@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thanks a ton for the help with earlier questions, I updated code to
>>> version 1.9 and started using Blink Planner (DeDuplication). This is
>>> working as expected!
>>>
>>> I have a new question, but thought of asking in the same email chain as
>>> this has more context about my use case etc.
>>>
>>> Workflow:
>>> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
>>> input data, performing JOINs and writing the joined data to another Kafka
>>> topic.
>>>
>>> Issue:
>>> I set Parallelism to 8 and on analyzing the subtasks found that the data
>>> is not distributed well among 8 parallel tasks for the last Join query. One
>>> of a subtask is taking huge load, whereas others taking pretty low load.
>>>
>>> Tried a couple of things below, but no use. Not sure if they are
>>> actually related to the problem as I couldn't yet understand what's the
>>> issue here.
>>> 1. increasing the number of partitions of output Kafka topic.
>>> 2. tried adding keys to output so key partitioning happens at Kafka end.
>>>
>>> Below is a snapshot for reference:
>>> [image: image.png]
>>>
>>> Below are the config changes I made:
>>>
>>> taskmanager.numberOfTaskSlots: 8
>>> parallelism.default: 8
>>> jobmanager.heap.size: 5000m
>>> taskmanager.heap.size: 5000m
>>> state.backend: rocksdb
>>> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
>>> state.backend.incremental: true
>>>
>>> I don't see any errors and job seems to be running smoothly (and
>>> slowly). I need to make it distribute the load well for faster processing,
>>> any pointers on what could be wrong and how to fix it would be very helpful.
>>>
>>> Thanks,
>>> RKandoji
>>>
>>>
>>> On Fri, Jan 3, 2020 at 1:06 PM RKandoji <rkandoji@gmail.com> wrote:
>>>
>>>> Thanks!
>>>>
>>>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li <jingsonglee0@gmail.com>
>>>> wrote:
>>>>
>>>>> Yes,
>>>>>
>>>>> 1.9.2 or Coming soon 1.10
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji <rkandoji@gmail.com> wrote:
>>>>>
>>>>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>>>>
>>>>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li <jingsonglee0@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>>>>>> after 1.9.
>>>>>>> After some bug fix, I think the latest version of 1.9 is OK.
The
>>>>>>> production environment has also been set up in some places.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>>>
>>>>>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji <rkandoji@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Thanks Jingsong and Kurt for more details.
>>>>>>>>
>>>>>>>> Yes, I'm planning to try out DeDuplication when I'm done
upgrading
>>>>>>>> to version 1.9. Hopefully deduplication is done by only one
task and reused
>>>>>>>> everywhere else.
>>>>>>>>
>>>>>>>> One more follow-up question, I see "For production use cases,
we
>>>>>>>> recommend the old planner that was present before Flink 1.9
for now." warning
>>>>>>>> here
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>>>>>>>> This is actually the reason why started with version 1.8,
could you
>>>>>>>> please let me know your opinion about this? and do you think
there is any
>>>>>>>> production code running on version 1.9
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Reva
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young <ykt836@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> BTW, you could also have a more efficient version of
deduplicating
>>>>>>>>> user table by using the topn feature [1].
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Kurt
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <
>>>>>>>>> jingsonglee0@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi RKandoji,
>>>>>>>>>>
>>>>>>>>>> In theory, you don't need to do something.
>>>>>>>>>> First, the optimizer will optimize by doing duplicate
nodes.
>>>>>>>>>> Second, after SQL optimization, if the optimized
plan still has
>>>>>>>>>> duplicate nodes, the planner will automatically reuse
them.
>>>>>>>>>> There are config options to control whether we should
reuse plan,
>>>>>>>>>> their default value is true. So you don't need modify
them.
>>>>>>>>>> - table.optimizer.reuse-sub-plan-enabled
>>>>>>>>>> - table.optimizer.reuse-source-enabled
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jingsong Lee
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkandoji@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Terry and Jingsong,
>>>>>>>>>>>
>>>>>>>>>>> Currently I'm on 1.8 version using Flink planner
for stream
>>>>>>>>>>> proessing, I'll switch to 1.9 version to try
out blink planner.
>>>>>>>>>>>
>>>>>>>>>>> Could you please point me to any examples (Java
preferred) using
>>>>>>>>>>> SubplanReuser?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> RK
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li
<
>>>>>>>>>>> jingsonglee0@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi RKandoji,
>>>>>>>>>>>>
>>>>>>>>>>>> FYI: Blink-planner subplan reusing: [1] 1.9
available.
>>>>>>>>>>>>
>>>>>>>>>>>>        Join                      Join
>>>>>>>>>>>>      /      \                  /      \
>>>>>>>>>>>>  Filter1  Filter2          Filter1  Filter2
>>>>>>>>>>>>     |        |        =>       \     /
>>>>>>>>>>>>  Project1 Project2            Project1
>>>>>>>>>>>>     |        |                   |
>>>>>>>>>>>>   Scan1    Scan2               Scan1
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jingsong Lee
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang
<zjuwangg@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi RKandoji~
>>>>>>>>>>>>>
>>>>>>>>>>>>> Could you provide more info about your
poc environment?
>>>>>>>>>>>>> Stream or batch? Flink planner or blink
planner?
>>>>>>>>>>>>> AFAIK, blink planner has done some optimization
to deal such
>>>>>>>>>>>>> duplicate task for one same query. You
can have a try with blink planner :
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Terry Wang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2019年12月30日 03:07,RKandoji <rkandoji@gmail.com>
写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm doing a POC with flink to understand
if it's a good fit
>>>>>>>>>>>>> for my use case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As part of the process, I need to filter
duplicate items and
>>>>>>>>>>>>> created below query to get only the latest
records based on timestamp. For
>>>>>>>>>>>>> instance, I have "Users" table which
may contain multiple messages for the
>>>>>>>>>>>>> same "userId". So I wrote below query
to get only the latest message for a
>>>>>>>>>>>>> given "userId"
>>>>>>>>>>>>>
>>>>>>>>>>>>> Table uniqueUsers = tEnv.sqlQuery("SELECT
* FROM Users WHERE
>>>>>>>>>>>>> (userId, userTimestamp) IN (SELECT userId,
MAX(userTimestamp) FROM Users
>>>>>>>>>>>>> GROUP BY userId)");
>>>>>>>>>>>>>
>>>>>>>>>>>>> The above query works as expected and
contains only the latest
>>>>>>>>>>>>> users based on timestamp.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The issue is when I use "uniqueUsers"
table multiple times in
>>>>>>>>>>>>> a JOIN operation, I see multiple tasks
in the flink dashboard for the same
>>>>>>>>>>>>> query that is creating "uniqueUsers"
table. It is simply creating as many
>>>>>>>>>>>>> tasks as many times I'm using the table.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Below is the JOIN query.
>>>>>>>>>>>>> tEnv.registerTable("uniqueUsersTbl",
uniqueUsers);
>>>>>>>>>>>>> Table joinOut = tEnv.sqlQuery("SELECT
* FROM Car c
>>>>>>>>>>>>>                                     
  LEFT JOIN
>>>>>>>>>>>>> uniqueUsersTbl aa ON c.userId = aa.userId
>>>>>>>>>>>>>                                     
  LEFT JOIN
>>>>>>>>>>>>> uniqueUsersTbl ab ON c.ownerId = ab.userId
>>>>>>>>>>>>>                                     
  LEFT JOIN
>>>>>>>>>>>>> uniqueUsersTbl ac ON c.sellerId = ac.userId
>>>>>>>>>>>>>                                     
  LEFT JOIN
>>>>>>>>>>>>> uniqueUsersTbl ad ON c.buyerId = ad.userId");
>>>>>>>>>>>>>
>>>>>>>>>>>>> Could someone please help me understand
how I can avoid these
>>>>>>>>>>>>> duplicate tasks?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> R Kandoji
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>

Mime
View raw message