kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5145) Remove task close() call from closeNonAssignedSuspendedTasks method
Date Sun, 25 Jun 2017 21:19:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062424#comment-16062424

Matthias J. Sax commented on KAFKA-5145:

[~mingleizhang] Thanks for starting to contribute to Kafka! I just realized, that this is
a duplicate and there is already a PR for the other Jira (cf KAFKA-5485). There should be
other "newbie" Jira thought you can pick up.

[~damianguy] Can you add [~mingleizhang] to the contributor list, so he can assign Jiras to
himself. Thx.

> Remove task close() call from closeNonAssignedSuspendedTasks method
> -------------------------------------------------------------------
>                 Key: KAFKA-5145
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5145
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions:
>            Reporter: Narendra Kumar
>              Labels: newbie
>         Attachments: BugTest.java, DebugTransformer.java, logs.txt
> While rebalancing ProcessorNode.close() can be called twice, once  from StreamThread.suspendTasksAndState()
and once from  StreamThread.closeNonAssignedSuspendedTasks(). If ProcessorNode.close() throws
some exception because of calling close() multiple times( i.e. IllegalStateException from
 some KafkaConsumer instance being used by some processor for some lookup), we fail to close
the task's state manager ( i.e. call to task.closeStateManager(true); fails).  After rebalance,
if the same task id is launched on same application instance but in different thread then
the task get stuck because it fails to get lock to the task's state directory.
> Since processor close() is already called from StreamThread.suspendTasksAndState() we
don't need to call again from StreamThread.closeNonAssignedSuspendedTasks().

This message was sent by Atlassian JIRA

View raw message