kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eno Thereska (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
Date Fri, 07 Apr 2017 18:54:41 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Eno Thereska updated KAFKA-3758:
--------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Patch Available)

> KStream job fails to recover after Kafka broker stopped
> -------------------------------------------------------
>
>                 Key: KAFKA-3758
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3758
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Greg Fodor
>            Assignee: Eno Thereska
>         Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load it seems
the job fails to rebalance + recover if we shut down one of the kafka brokers. The test we
were running had a 3-node kafka cluster where each topic had at least a replication factor
of 2, and we terminated one of the nodes.
> Attached is the full log, the root exception seems to be contention on the lock on the
state directory. The job continues to try to recover but throws errors relating to locks over
and over. Restarting the job itself resolves the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the
state manager
>  1703         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
>  1704         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
>  1705         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707         at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714         at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>  1732         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>  1733         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>  1734         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>  1735         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>  1736 Caused by: java.io.IOException: Failed to lock the state directory: /muon/state/job-stream_photon_messages-1/2_82
>  1737         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
>  1738         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
>  1739         ... 32 more



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message