kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped
Date Thu, 19 Oct 2017 05:11:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210580#comment-16210580

ASF GitHub Bot commented on KAFKA-6085:

GitHub user guozhangwang reopened a pull request:


    [WIP] KAFKA-6085: Pause all partitions before tasks are initialized

    Mirror of #4085 against trunk. This PR contains two fixes (one major and one minor):
    Major: on rebalance, pause all partitions instead of the partitions for tasks with state
stores only, so that no records will be returned in the same `pollRecords()` call.
    Minor: during the restoration phase, when thread state is still PARTITION_ASSIGNED, call
consumer.poll with hard-coded pollMs = 0.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/guozhangwang/kafka KHotfix-restore-only

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4086
commit 62bf4784779f7379e849289c4456363f352cb850
Author: Guozhang Wang <wangguoz@gmail.com>
Date:   2017-10-12T21:18:46Z


commit 5726e39cba8a79e6858e8b932c5116b60f2ae314
Author: Guozhang Wang <wangguoz@gmail.com>
Date:   2017-10-12T21:18:46Z

    fix issues
    Remove debugging information

commit 8214a3ee340791eb18f7e5fa77f2510470cf977a
Author: Matthias J. Sax <matthias@confluent.io>
Date:   2017-10-17T00:38:31Z

    MINOR: update exception message for KIP-120
    Author: Matthias J. Sax <matthias@confluent.io>
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
    Closes #4078 from mjsax/hotfix-streams

commit 637b76342801cf4a32c9e65aa89bfe0bf76c24a7
Author: Jason Gustafson <jason@confluent.io>
Date:   2017-10-17T00:49:35Z

    MINOR: A few javadoc fixes
    Author: Jason Gustafson <jason@confluent.io>
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
    Closes #4076 from hachikuji/javadoc-fixes

commit f57c505f6e714b891a6d30e5501b463f14316708
Author: Damian Guy <damian.guy@gmail.com>
Date:   2017-10-17T01:01:32Z

    MINOR: add equals to SessionWindows
    Author: Damian Guy <damian.guy@gmail.com>
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax<matthias@confluent.io>,
Bill Bejeck <bill@confluent.io>
    Closes #4074 from dguy/minor-session-window-equals

commit 2f1dd0d4da24eee352f20436902d825d7851c45b
Author: Guozhang Wang <wangguoz@gmail.com>
Date:   2017-10-18T01:27:35Z

    normal poll with zero during restoration

commit 043f28ac89b50f9145ac719449f03a427376dcde
Author: Guozhang Wang <wangguoz@gmail.com>
Date:   2017-10-19T04:58:36Z

    recheck state change


> Streams rebalancing may cause a first batch of fetched records to be dropped
> ----------------------------------------------------------------------------
>                 Key: KAFKA-6085
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6085
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions:
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Blocker
>             Fix For: 1.0.0
> This is a regression introduced in KAFKA-5152:
> Assuming you have one task without any state stores (and hence no restoration needed
for that task), and a rebalance happened in a {{records = pollRequests(pollTimeMs);}} call:
> 1. We name this `pollRequests` call A. And within call A the rebalance will happen, which
put the thread state from RUNNING to PARTITION_REVOKED, and then from PARITION_REVOKED to
PARTITION_ASSIGNED. Assume the same task gets assigned again, this task will be in the initialized
set of tasks but NOT in the running tasks yet.
> 2. Within the same call A, a fetch request may be sent and a response with a batch of
records could be returned, and it will be returned from `pollRequests`. At this time the thread
state become PARTITION_ASSIGNED and the task is not "running" yet.
> 3. Now the bug comes in this line:
> {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
> Since the task is not ing the active running set yet, this returned set of records would
be skipped. Effectively these records are dropped on the floor and would never be consumed
> 4. In the next run loop, the same `pollRequest()` will be called again. Let's call it
B. After B is called we will set the thread state to RUNNING and put the task to the running
task set. But at this point the previous batch of records will not be returned any more.
> So the bug lies in the fact that within a single run loop of the stream thread. We may
complete a rebalance with tasks assigned but not yet initialized, AND we can fetch a bunch
of records for that not-initialized task and drop on the floor.
> With further investigation I can confirm that the new flaky test https://issues.apache.org/jira/browse/KAFKA-5140
's root cause is also this bug. And a recent PR https://github.com/apache/kafka/pull/4086
exposed this bug by failing the reset integration test more frequently.

This message was sent by Atlassian JIRA

View raw message