kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lukas Gemela (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-5154) Kafka Streams throws NPE during rebalance
Date Tue, 02 May 2017 22:35:04 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993902#comment-15993902
] 

Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 10:34 PM:
--------------------------------------------------------------

Hi,

we have a kafka streams app consuming from one topic, aggregating data on another internal
topic (by using in-memory logged store) and then outputting them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 partitions as
well.
We are running 8 parallel nodes of this app at all time.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are lost now, I've
just seen couple of illegalStateExceptions on another nodes, coming from kafka streams stack,
but I'm not even sure if they were thrown around the same time as NPE happened on this node.


_______________________
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app with our custom
timeout and schedule a spring task to start it again. As far I can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas


was (Author: lukas gemela):
Hi,

we have a kafka streams app consuming from one topic, aggregating data on another internal
topic (by using in-memory logged store) and then outputting them to out topic.  
Input topic has I believe 40 partitions and therefore internal topic has 40 partitions as
well.
We are running 8 parallel nodes of this app at all times.
We are using low level kafka streams API.
None of our code is blocking nor asynchronous.

I'm not really sure what caused the rebalance as all logs from all nodes are lost now, I've
just seen couple of illegalStateExceptions on another nodes, coming from kafka streams stack,
but I'm not even sure if they were thrown around the same time as NPE happened on this node.


_______________________
Probably not important but might be related to the issue:
Please also note that in our exception handler we try to stop kafka streams app with our custom
timeout and schedule a spring task to start it again. As far I can tell calling 

boolean isClosed = streams.close(10, TimeUnit.SECONDS)

never actually succeed and we always get isClosed = false.

Hope it helps!

Lukas

> Kafka Streams throws NPE during rebalance
> -----------------------------------------
>
>                 Key: KAFKA-5154
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5154
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Lukas Gemela
>
> please see attached log, Kafka streams throws NullPointerException during rebalance,
which is caught by our custom exception handler
> {noformat}
> 2017-04-30T17:44:17,675 INFO  kafka-coordinator-heartbeat-thread | hades org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
@618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) dead for group
hades
> 2017-04-30T17:44:27,395 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess()
@573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) for group hades.
> 2017-04-30T17:44:27,941 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
@393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, poseidonIncidentFeed-29,
poseidonIncidentFeed-30, poseidonIncidentFeed-18] for group hades
> 2017-04-30T17:44:27,947 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
@407 - (Re-)joining group hades
> 2017-04-30T17:44:48,468 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
@407 - (Re-)joining group hades
> 2017-04-30T17:44:53,628 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
@407 - (Re-)joining group hades
> 2017-04-30T17:45:09,587 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
@407 - (Re-)joining group hades
> 2017-04-30T17:45:11,961 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess()
@375 - Successfully joined group hades with generation 99
> 2017-04-30T17:45:13,126 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete()
@252 - Setting newly assigned partitions [poseidonIncidentFeed-11, poseidonIncidentFeed-27,
poseidonIncidentFeed-25, poseidonIncidentFeed-29, poseidonIncidentFeed-19, poseidonIncidentFeed-18]
for group hades
> 2017-04-30T17:46:37,254 INFO  kafka-coordinator-heartbeat-thread | hades org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
@618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) dead for group
hades
> 2017-04-30T18:04:25,993 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess()
@573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) for group hades.
> 2017-04-30T18:04:29,401 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare()
@393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, poseidonIncidentFeed-27,
poseidonIncidentFeed-25, poseidonIncidentFeed-29, poseidonIncidentFeed-19, poseidonIncidentFeed-18]
for group hades
> 2017-04-30T18:05:10,877 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest()
@407 - (Re-)joining group hades
> 2017-05-01T00:01:55,707 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead()
@618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) dead for group
hades
> 2017-05-01T00:01:59,027 INFO  StreamThread-1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess()
@573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) for group hades.
> 2017-05-01T00:01:59,031 ERROR StreamThread-1 org.apache.kafka.streams.processor.internals.StreamThread.run()
@376 - stream-thread [StreamThread-1] Streams application error during processing:
>  java.lang.NullPointerException
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619)
~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
[kafka-streams-0.10.2.0.jar!/:?]
> 2017-05-01T00:02:00,038 INFO  StreamThread-1 org.apache.kafka.clients.producer.KafkaProducer.close()
@689 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2017-05-01T00:02:00,949 WARN  StreamThread-1 org.apache.kafka.streams.processor.internals.StreamThread.setState()
@160 - Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING
> 2017-05-01T00:02:00,951 ERROR StreamThread-1 com.williamhill.trading.platform.hades.kafka.KafkaStreamManager.uncaughtException()
@104 - UncaughtException in thread StreamThread-1, stopping kafka streams
>  java.lang.NullPointerException
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619)
~[kafka-streams-0.10.2.0.jar!/:?]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
~[kafka-streams-0.10.2.0.jar!/:?]
> 2017-05-01T00:02:01,076 WARN  kafka-streams-close-thread org.apache.kafka.streams.processor.internals.StreamThread.setState()
@160 - Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message