kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seweryn Habdank-Wojewodzki (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-5167) streams task gets stuck after re-balance due to LockException
Date Wed, 28 Jun 2017 07:32:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066045#comment-16066045
] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-5167 at 6/28/17 7:31 AM:
----------------------------------------------------------------------------

Hi,

I think I found much easier way to reproduce the same behaviour.
I am doing more less suche setup in the code:

// loop over the inTopicName(s) {

KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName
);

stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer ).to( outTopicName
);

// }

streams = new KafkaStreams( kBuilder, streamsConfig );
streams.cleanUp();
streams.start();

And if there are *_num.stream.threads=4_* but there are 20 or more inTopicNames (many topics
to read), then complete application startup is totally self-blocked, by writing endless:

2017-06-27 18:34:25 INFO  StreamThread:828 - stream-thread [StreamThread-3] Creating active
task 11_5 with assigned partitions [[int62_topic-5]]
2017-06-27 18:34:25 WARN  StreamThread:1184 - Could not create task 8_7. Will retry.
org.apache.kafka.streams.errors.LockException: task [8_7] Failed to lock the state directory:
/data/my-app/tmp/kafka-state/stream/8_7
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?]
2017-06-27 18:34:25 INFO  StreamThread:828 - stream-thread [StreamThread-2] Creating active
task 7_9 with assigned partitions [[c0206_topic-9]]
2017-06-27 18:34:26 WARN  StreamThread:1184 - Could not create task 6_9. Will retry.
org.apache.kafka.streams.errors.LockException: task [6_9] Failed to lock the state directory:
/data/my-app/tmp/kafka-state/stream/6_9
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?]
2017-06-27 18:34:26 WARN  StreamThread:1184 - Could not create task 15_1. Will retry.
org.apache.kafka.streams.errors.LockException: task [15_1] Failed to lock the state directory:
/data/my-app/tmp/kafka-state/stream/15_1
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?]
2017-06-27 18:34:26 INFO  StreamThread:828 - stream-thread [StreamThread-4] Creating active
task 13_3 with assigned partitions [[a0291_topic-3]]
2017-06-27 18:34:26 INFO  StreamThread:828 - stream-thread [StreamThread-1] Creating active
task 12_4 with assigned partitions [[int77_topic-4]]


was (Author: habdank):
Hi,

I think I found much easier way to reproduce the same behaviour.
I am doing more less suche setup in the code:

// loop over the inTopicName(s) {
KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName
);
stringInput.filter( streamFilter::passOrFilterMessages )
                   .map( ndmNormalizer )
                   .to( outTopicName );
// }

streams = new KafkaStreams( kBuilder, streamsConfig );
streams.cleanUp();
streams.start();

And if there are *_num.stream.threads=4_* but there are 20 or more inTopicNames (many topics
to read), then complete application startup is totally self-blocked, by writing endless:

2017-06-27 18:34:25 INFO  StreamThread:828 - stream-thread [StreamThread-3] Creating active
task 11_5 with assigned partitions [[int62_topic-5]]
2017-06-27 18:34:25 WARN  StreamThread:1184 - Could not create task 8_7. Will retry.
org.apache.kafka.streams.errors.LockException: task [8_7] Failed to lock the state directory:
/data/my-app/tmp/kafka-state/stream/8_7
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?]
2017-06-27 18:34:25 INFO  StreamThread:828 - stream-thread [StreamThread-2] Creating active
task 7_9 with assigned partitions [[c0206_topic-9]]
2017-06-27 18:34:26 WARN  StreamThread:1184 - Could not create task 6_9. Will retry.
org.apache.kafka.streams.errors.LockException: task [6_9] Failed to lock the state directory:
/data/my-app/tmp/kafka-state/stream/6_9
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?]
2017-06-27 18:34:26 WARN  StreamThread:1184 - Could not create task 15_1. Will retry.
org.apache.kafka.streams.errors.LockException: task [15_1] Failed to lock the state directory:
/data/my-app/tmp/kafka-state/stream/15_1
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[my-app-stream.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?]
2017-06-27 18:34:26 INFO  StreamThread:828 - stream-thread [StreamThread-4] Creating active
task 13_3 with assigned partitions [[a0291_topic-3]]
2017-06-27 18:34:26 INFO  StreamThread:828 - stream-thread [StreamThread-1] Creating active
task 12_4 with assigned partitions [[int77_topic-4]]

> streams task gets stuck after re-balance due to LockException
> -------------------------------------------------------------
>
>                 Key: KAFKA-5167
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5167
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.11.0.0, 0.10.2.1
>            Reporter: Narendra Kumar
>            Assignee: Matthias J. Sax
>             Fix For: 0.11.1.0, 0.10.2.2, 0.11.0.1
>
>         Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once from StreamThread.suspendTasksAndState()
and once from StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed which
I am closing in processor's close method. This instance's close method throws some exception
if I call close more than once. Because of this exception, the Kafka streams does not attempt
to close the statemanager ie.  task.closeStateManager(true) is never called. When a task moves
from one thread to another within same machine the task blocks trying to get lock on state
directory which is still held by unclosed statemanager and keep throwing the below warning
message:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the state directory
for task 0_1
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> 	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message