kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4593) Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException
Date Fri, 22 Sep 2017 20:14:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16177048#comment-16177048

ASF GitHub Bot commented on KAFKA-4593:

GitHub user mjsax opened a pull request:


    KAFKA-4593: Don't throw IllegalStateException and die on task migration


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mjsax/kafka kafka-4593-illegal-state-exception-in-restore

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3948
commit 7001cf72781b71459f3dbf852c529f237edf804a
Author: Matthias J. Sax <matthias@confluent.io>
Date:   2017-09-22T20:12:33Z

    KAFKA-4593: Don't throw IllegalStateException and die on task migration


> Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException
> ------------------------------------------------------------------------------------------------------
>                 Key: KAFKA-4593
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4593
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Matthias J. Sax
>              Labels: infrastructure
> 1. Assume 2 running threads A and B, and one task t1 just for simplicity. Thread A and
B are on different machines so their local state dir are not shared.
> 2. First rebalance is triggered, task t1 is assigned to A (B has no assigned task).
> 3. During the first rebalance callback, task t1's state store need to be restored on
thread A, and this is called in "restoreActiveState" of "createStreamTask".
> 4. Now suppose thread A has a long GC causing it to stall, a second rebalance then will
be triggered and kicked A out of the group; B gets the task t1 and did the same restoration
process, after the process thread B continues to process data and update the state store,
while at the same time writes more messages to the changelog (so its log end offset has incremented).
> 5. After a while A resumes from the long GC, not knowing it has actually be kicked out
of the group and task t1 is no longer owned to itself, it continues the restoration process
but then realize that the log end offset has advanced. When this happens, we will see the
following exception on thread A:
> {code}
> java.lang.IllegalStateException: task XXX Log end offset of
> YYY-table_stream-changelog-ZZ should not change while
> restoring: old end offset .., current offset ..
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)
>         at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)
>         at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)
>         at
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)
>         at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
> {code}

This message was sent by Atlassian JIRA

View raw message