flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Elias Levy (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-10348) Solve data skew when consuming data from kafka
Date Mon, 17 Sep 2018 17:49:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617899#comment-16617899
] 

Elias Levy edited comment on FLINK-10348 at 9/17/18 5:48 PM:
-------------------------------------------------------------

[~wind_ljy]

Re: 1.  The problem is timestamp alignment.  Setting like fetch sizes, max waits, etc are
simply mechanism you can use to attempt to influence the rate of processing to better align
the timestamps.  Those mechanism are at least one level removed from the actual issue.  It
is best to address the issue directly by attempting to align timestamp during consumption.

Re: 2.  Internally the Kafka consumer behaves like a multiple input operator, merging watermarks
and messages from each partition, which it then forwards downstream.  The Kafka consumer
can also selectively forward messages from the partitions with the lowest waternark if they
are available. 


was (Author: elevy):
[~wind_ljy]

Re: 1.  The problem is timestamp alignment.  Setting like fetch sizes, max waits, etc are
simply mechanism you can use to attempt to influence the rate of processing the better align
the timestamps.  Those mechanism are at least one level removed from the actual issue.  It
is best to address the issue directly by attempting to align timestamp during consumption.

Re: 2.  Internally the Kafka consumer behaves like a multiple input operator, merging watermarks
and messages from each partition, which it then forwards downstream.  The Kafka consumer
can also selectively forward messages from the partitions with the lowest waternark if they
are available. 

> Solve data skew when consuming data from kafka
> ----------------------------------------------
>
>                 Key: FLINK-10348
>                 URL: https://issues.apache.org/jira/browse/FLINK-10348
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>    Affects Versions: 1.6.0
>            Reporter: Jiayi Liao
>            Assignee: Jiayi Liao
>            Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with a fixed
fetch size. Assume x topic has n partition and there exists data skew between partitions,
now we need to consume data from x topic with earliest offset, and we can get max fetch size
data in every fetch request. The problem is that when an task consumes data from both "big"
partitions and "small" partitions, the data in "big" partitions may be late elements because
"small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every specific partition,
and calculate (latest offset - current offset), then get partitions which need to slow down
and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message