flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
Date Thu, 29 Sep 2016 16:12:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533206#comment-15533206

ASF GitHub Bot commented on FLINK-4702:

GitHub user StephanEwen opened a pull request:


    [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block
on polls

    This fix is quite critical!
    While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer
lock. If no data comes in Kafka, the lock is not released before the poll timeout is over.
    During that time, no offset commit can make progress, because it needs the consumer lock.
The `notifyCheckpointComplete()` method of the Kafka Consumer hence blocks until the poll
timeout is over and the lock is released. For low-throughput Kafka Topics, this can cause
wildly long checkpoint delays.
    This changes `notifyCheckpointComplete()` to only "schedule" offsets to be committed,
while the main fetcher thread actually kick off the asynchronous offset commits. That way,
there is no interference between the `notifyCheckpointComplete()` method (which is executed
under checkpoint lock) and the consumer lock.
    In fact, the only KafkaConsumer method accessed concurrently to the main fetcher thread
is `wakeup()` which is actually thread-safe (where the rest of the KafkaConsumer is not).
The consumer lock was hence completely removed.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink kafka_09_fix

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2574
commit 0846fd907db7d52d7e5fb7d704c5e1c13462e331
Author: Stephan Ewen <sewen@apache.org>
Date:   2016-09-29T16:09:51Z

    [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block
on polls
    Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take
    very long. This is mostly relevant for low-throughput Kafka topics.


> Kafka consumer must commit offsets asynchronously
> -------------------------------------------------
>                 Key: FLINK-4702
>                 URL: https://issues.apache.org/jira/browse/FLINK-4702
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.2
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.2.0, 1.1.3
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and the KafkaConsumer
cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in progress,
to that commit requests do not pile up.

This message was sent by Atlassian JIRA

View raw message