kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ted Yu (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped
Date Thu, 19 Oct 2017 03:40:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210527#comment-16210527

Ted Yu commented on KAFKA-6085:

        final ConsumerRecords<byte[], byte[]> records = pollRequests();

        if (state == State.PARTITIONS_ASSIGNED) {
Can the if block be lifted ahead of the pollRequests() call ?
When there is no active running task at the end of the if block, runOnce() can return early
without calling pollRequests().

> Streams rebalancing may cause a first batch of fetched records to be dropped
> ----------------------------------------------------------------------------
>                 Key: KAFKA-6085
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6085
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions:
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Blocker
>             Fix For: 1.0.0
> This is a regression introduced in KAFKA-5152:
> Assuming you have one task without any state stores (and hence no restoration needed
for that task), and a rebalance happened in a {{records = pollRequests(pollTimeMs);}} call:
> 1. We name this `pollRequests` call A. And within call A the rebalance will happen, which
put the thread state from RUNNING to PARTITION_REVOKED, and then from PARITION_REVOKED to
PARTITION_ASSIGNED. Assume the same task gets assigned again, this task will be in the initialized
set of tasks but NOT in the running tasks yet.
> 2. Within the same call A, a fetch request may be sent and a response with a batch of
records could be returned, and it will be returned from `pollRequests`. At this time the thread
state become PARTITION_ASSIGNED and the task is not "running" yet.
> 3. Now the bug comes in this line:
> {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
> Since the task is not ing the active running set yet, this returned set of records would
be skipped. Effectively these records are dropped on the floor and would never be consumed
> 4. In the next run loop, the same `pollRequest()` will be called again. Let's call it
B. After B is called we will set the thread state to RUNNING and put the task to the running
task set. But at this point the previous batch of records will not be returned any more.
> So the bug lies in the fact that within a single run loop of the stream thread. We may
complete a rebalance with tasks assigned but not yet initialized, AND we can fetch a bunch
of records for that not-initialized task and drop on the floor.
> With further investigation I can confirm that the new flaky test https://issues.apache.org/jira/browse/KAFKA-5140
's root cause is also this bug. And a recent PR https://github.com/apache/kafka/pull/4086
exposed this bug by failing the reset integration test more frequently.

This message was sent by Atlassian JIRA

View raw message