kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Narendra Kumar (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-5167) streams task gets stuck after re-balance due to LockException
Date Mon, 08 May 2017 12:07:04 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Narendra Kumar updated KAFKA-5167:
----------------------------------
    Attachment: BugTest.java
                DebugTransformer.java

> streams task gets stuck after re-balance due to LockException
> -------------------------------------------------------------
>
>                 Key: KAFKA-5167
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5167
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Narendra Kumar
>         Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once from StreamThread.suspendTasksAndState()
and once from StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed which
I am closing in processor's close method. This instance's close method throws some exception
if I call close more than once. Because of this exception, the Kafka streams does not attempt
to close the statemanager ie.  task.closeStateManager(true) is never called. When a task moves
from one thread to another within same machine the task blocks trying to get lock on state
directory which is still held by unclosed statemanager and keep throwing the following exception:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the state directory
for task 0_1
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> 	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message