Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 657A5200CFC for ; Thu, 28 Sep 2017 11:53:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 63F081609CD; Thu, 28 Sep 2017 09:53:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A90431609C2 for ; Thu, 28 Sep 2017 11:53:03 +0200 (CEST) Received: (qmail 68642 invoked by uid 500); 28 Sep 2017 09:53:02 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 68631 invoked by uid 99); 28 Sep 2017 09:53:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Sep 2017 09:53:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 6585EC39C2 for ; Thu, 28 Sep 2017 09:53:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id h7qiiPOyjxhj for ; Thu, 28 Sep 2017 09:53:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 3973E60D9C for ; Thu, 28 Sep 2017 09:53:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A8109E0E36 for ; Thu, 28 Sep 2017 09:53:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 3E21324286 for ; Thu, 28 Sep 2017 09:53:00 +0000 (UTC) Date: Thu, 28 Sep 2017 09:53:00 +0000 (UTC) From: "Andres Gomez Ferrer (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Resolved] (KAFKA-5961) NullPointerException when consumer restore read messages with null key. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 28 Sep 2017 09:53:04 -0000 [ https://issues.apache.org/jira/browse/KAFKA-5961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andres Gomez Ferrer resolved KAFKA-5961. ---------------------------------------- Resolution: Fixed > NullPointerException when consumer restore read messages with null key. > ----------------------------------------------------------------------- > > Key: KAFKA-5961 > URL: https://issues.apache.org/jira/browse/KAFKA-5961 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.1 > Reporter: Andres Gomez Ferrer > Fix For: 0.11.0.1, 0.11.0.0 > > > If you have a kafka streams that use: > {code:java} > stream.table("topicA") > {code} > When the application is running if you send a message with a null key, it works fine. Later, if you restart the application when the restore consumer starts to read the topicA from the beginning, it crashes because doesn't filter the null key. > I know that isn't normal send a null key to a topic that is a table topic, but maybe sometimes can happen .. and I think that kafka streams could protect it self. > This is the stack trace: > {code} > ConsumerCoordinator [ERROR] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group my-cep-app_enricher failed on partition assignment > java.lang.NullPointerException > at org.rocksdb.RocksDB.put(RocksDB.java:488) > at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254) > at org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67) > at org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164) > at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242) > at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) > at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160) > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63) > at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141) > at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)