kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
Date Mon, 17 Jul 2017 17:19:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090111#comment-16090111
] 

Guozhang Wang commented on KAFKA-5545:
--------------------------------------

Hello [~yogeshbelur] sorry for the late reply.

I think I have found the root cause of the issue, which is indeed a deadlock that cause the
stream thread to be not able to exit its current loop call and hence not be able to shutdown.
More details are the following:

1. When {{stream.close()}} is called, it does nothing but just set a flag in the stream threads
and let the shutdown thread to wait until the stream threads have seen this flag and end the
while loop and shutdown themselves.
2. There is an edge case, that the stream thread is still in the {{onPartitionAssigned}} callback
of the current loop, and is trying to grab the state directory locks in order to create the
assigned tasks; however the state directory locks are not released by other threads who have
seen a disconnect / request timeout from the brokers since the broker IP has changed within
the same JVM, they will retry indefinitely as the rebalance timeout is default to {{Integer.MAX_VALUE}}.
So the thread will never complete its current iteration and will not see the shutdown flag
and will not shutdown itself.

I have started proposing a fix for the state directory deadlock now, but this will probably
only in 0.11.0.1 release and backported if we are having an 0.10.2.2 release. As for the current
version (0.10.2.1) version you are on. One walkaround is to set the {{ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG}}
to a smaller value instead of the default {{Integer.MAX_VALUE}} so that it will not cause
a dead lock, and making sure that for whatever this config value is set to, the corresponding
{{REQUEST_TIMEOUT_MS_CONFIG}} is set to be larger than it.


> Kafka Stream not able to successfully restart over new broker ip
> ----------------------------------------------------------------
>
>                 Key: KAFKA-5545
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5545
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Yogesh BG
>            Priority: Critical
>         Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart the broker.
When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip changed and if
changed, we cleanup the stream, rebuild topology(tried with reusing topology) and start the
stream again. I end up with the following exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-38] Creating active task 0_5 with assigned partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-41] Creating active task 0_1 with assigned partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-34] Creating active task 0_7 with assigned partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-37] Creating active task 0_3 with assigned partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-45] Creating active task 0_0 with assigned partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-36] Creating active task 0_4 with assigned partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-43] Creating active task 0_6 with assigned partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-48] Creating active task 0_2 with assigned partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could not create
task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the state directory
for task 0_5
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.035 [StreamThread-41] WARN  o.a.k.s.p.internals.StreamThread - Could not create
task 0_1. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the state directory
for task 0_1
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.037 [StreamThread-37] WARN  o.a.k.s.p.internals.StreamThread - Could not create
task 0_3. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_3] Failed to lock the state directory
for task 0_3
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-34] WARN  o.a.k.s.p.internals.StreamThread - Could not create
task 0_7. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_7] Failed to lock the state directory
for task 0_7
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-43] WARN  o.a.k.s.p.internals.StreamThread - Could not create
task 0_6. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_6] Failed to lock the state directory
for task 0_6
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-45] WARN  o.a.k.s.p.internals.StreamThread - Could not create
task 0_0. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory
for task 0_0
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-36] WARN  o.a.k.s.p.internals.StreamThread - Could not create
task 0_4. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock the state directory
for task 0_4
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-48] WARN  o.a.k.s.p.internals.StreamThread - Could not create
task 0_2. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_2] Failed to lock the state directory
for task 0_2
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:13.642 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-44] Committing all tasks because the commit interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-47] Committing all tasks because the commit interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-42] Committing all tasks because the commit interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-46] Committing all tasks because the commit interval 10000ms has ela
> ]
> psed
> 11:04:13.646 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-33] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:13.648 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-40] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:13.655 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-39] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:13.660 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-35] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-42] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-46] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-47] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-44] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.671 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-33] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.676 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-40] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.677 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-39] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.682 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - stream-thread
[StreamThread-35] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:29.025 [pool-4-thread-1]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message