kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andres Gomez Ferrer (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-5961) NullPointerException when consumer restore read messages with null key.
Date Thu, 28 Sep 2017 09:53:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-5961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Andres Gomez Ferrer resolved KAFKA-5961.
----------------------------------------
    Resolution: Fixed

> NullPointerException when consumer restore read messages with null key.
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-5961
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5961
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Andres Gomez Ferrer
>             Fix For: 0.11.0.1, 0.11.0.0
>
>
> If you have a kafka streams that use:
> {code:java}
> stream.table("topicA")
> {code}
> When the application is running if you send a message with a null key, it works fine.
Later, if you restart the application when the restore consumer starts to read the topicA
from the beginning, it crashes because doesn't filter the null key.
> I know that isn't normal send a null key to a topic that is a table topic, but maybe
sometimes can happen .. and I think that kafka streams could protect it self.
> This is the stack trace:
> {code}
> ConsumerCoordinator [ERROR] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1
for group my-cep-app_enricher failed on partition assignment
> java.lang.NullPointerException
> 	at org.rocksdb.RocksDB.put(RocksDB.java:488)
> 	at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> 	at org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> 	at org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> 	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> 	at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> 	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> 	at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message