Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 64634200C13 for ; Mon, 6 Feb 2017 22:35:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 62E7C160B56; Mon, 6 Feb 2017 21:35:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 59601160B53 for ; Mon, 6 Feb 2017 22:35:16 +0100 (CET) Received: (qmail 56738 invoked by uid 500); 6 Feb 2017 21:35:15 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 56727 invoked by uid 99); 6 Feb 2017 21:35:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2017 21:35:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id F0D76C0577 for ; Mon, 6 Feb 2017 21:35:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.199 X-Spam-Level: X-Spam-Status: No, score=-1.199 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id fgxGE9a8r4vy for ; Mon, 6 Feb 2017 21:35:14 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 9C2C85FB5B for ; Mon, 6 Feb 2017 21:35:13 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 68F91E05C2 for ; Mon, 6 Feb 2017 21:34:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id C3CDA25292 for ; Mon, 6 Feb 2017 21:34:41 +0000 (UTC) Date: Mon, 6 Feb 2017 21:34:41 +0000 (UTC) From: "Jason Gustafson (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 06 Feb 2017 21:35:17 -0000 [ https://issues.apache.org/jira/browse/KAFKA-4740?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1585= 4791#comment-15854791 ]=20 Jason Gustafson commented on KAFKA-4740: ---------------------------------------- That said, one thing I realized looking at the code is that we don't provid= e any information in the {{SerializationException}} about which topic parti= tion and which offset had the failure. This seems worth fixing if we decide= to preserve the current behavior. > Using new consumer API with a Deserializer that throws SerializationExcep= tion can lead to infinite loop > -------------------------------------------------------------------------= ------------------------------ > > Key: KAFKA-4740 > URL: https://issues.apache.org/jira/browse/KAFKA-4740 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10= .1.1 > Environment: Kafka broker 0.10.1.1 (but this bug is not dependent= on the broker version) > Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1 > Reporter: S=C3=A9bastien Launay > Priority: Critical > > The old consumer supports deserializing records into typed objects and th= rows a {{SerializationException}} through {{MessageAndMetadata#key()}} and = {{MessageAndMetadata#message()}} that can be catched by the client \[1\]. > When using the new consumer API with kafka-clients version < 0.10.0.1, su= ch the exception is swallowed by the {{NetworkClient}} class and result in = an infinite loop which the client has no control over like: > {noformat} > DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting off= set for partition test2-0 to earliest offset. > DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offse= t 0 for partition test2-0 > ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request = completion: > org.apache.kafka.common.errors.SerializationException: Size of data recei= ved by IntegerDeserializer is not 4 > ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request = completion: > org.apache.kafka.common.errors.SerializationException: Size of data recei= ved by IntegerDeserializer is not 4 > ... > {noformat} > Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but anoth= er issue still remains. > Indeed, the client can now catch the {{SerializationException}} but the n= ext call to {{Consumer#poll(long)}} will throw the same exception indefinit= ely. > The following snippet (full example available on Github \[2\] for most re= leased kafka-clients versions): > {code:java} > try (KafkaConsumer kafkaConsumer =3D new KafkaConsumer<>= (consumerConfig, new StringDeserializer(), new IntegerDeserializer())) { > kafkaConsumer.subscribe(Arrays.asList("topic")); > // Will run till the shutdown hook is called > while (!doStop) { > try { > ConsumerRecords records =3D kafkaConsumer.po= ll(1000); > if (!records.isEmpty()) { > logger.info("Got {} messages", records.count()); > for (ConsumerRecord record : records) { > logger.info("Message with partition: {}, offset: {}, = key: {}, value: {}", > record.partition(), record.offset(), record.key(), re= cord.value()); > } > } else { > logger.info("No messages to consume"); > } > } catch (SerializationException e) { > logger.warn("Failed polling some records", e); > } > } > } > {code} > when run with the following records (third record has an invalid Integer = value): > {noformat} > printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-= list localhost:9092 --topic topic > printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-= list localhost:9092 --topic topic > printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-= list localhost:9092 --topic topic > printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-= list localhost:9092 --topic topic > {noformat} > will produce the following logs: > {noformat} > INFO consumer.Consumer - Got 2 messages > INFO consumer.Consumer - Message with partition: 0, offset: 0, key: null= , value: 0 > INFO consumer.Consumer - Message with partition: 0, offset: 1, key: null= , value: 1 > WARN consumer.Consumer - Failed polling some records > org.apache.kafka.common.errors.SerializationException: Error deserializin= g key/value for partition topic-0 at offset 2 > Caused by: org.apache.kafka.common.errors.SerializationException: Size of= data received by IntegerDeserializer is not 4 > WARN consumer.Consumer - Failed polling some records > org.apache.kafka.common.errors.SerializationException: Error deserializin= g key/value for partition topic-0 at offset 2 > Caused by: org.apache.kafka.common.errors.SerializationException: Size of= data received by IntegerDeserializer is not 4 > WARN consumer.Consumer - Failed polling some records > org.apache.kafka.common.errors.SerializationException: Error deserializin= g key/value for partition topic-0 at offset 2 > Caused by: org.apache.kafka.common.errors.SerializationException: Size of= data received by IntegerDeserializer is not 4 > ... > {noformat} > I don't believe committing offsets would help and even if it did this cou= ld potentially result in a few well formed records not being consumed from = that {{ConsumerRecords}} batch (data loss). > I have only seen a few mentions of this bug online \[3\] but I believe th= is is a critical issue as the new consumer API is not in beta anymore yet i= f you do not control producers (that can inject malformed values) or you us= e some advanced deserializer that throws such exception (e.g. schema-regist= ry {{KafkaAvroDeserializer}}) then you can end up blocking a consumer from = advancing in the stream. > Current workarounds: > - use a {{Deserializer}} that do not throw a {{SerializationException}} (= e.g. {{ByteArrayDeserializer}}, {{StringDeserializer}}) > - wrap the {{Deserializer}} to catch and log the {{SerializationException= }} but return {{null}} and then check for {{null}} in the client code (that= 's what we use on top of {{KafkaAvroDeserializer}} in case there is an issu= e reaching the schema registry or the Avro datum is either invalid or not c= ompatible with the reader's schema for some reason) > Potential solutions: > # continue to throw {{SerializationException}} when calling {{Consumer#po= ll(long)}} but skip that malformed record on next {{Consumer#poll(long)}} > # do not throw {{SerializationException}} when calling {{Consumer#poll(lo= ng)}} but expose information about invalid records in {{ConsumerRecords}} > # do not throw {{SerializationException}} when calling {{Consumer#poll(lo= ng)}} but store the exception(s) in the {{ConsumerRecord}} object record so= that it is rethrown on {{ConsumerRecord#key()}} and {{ConsumerRecord#valu= e()}} > # do not deserialize records during {{Consumer#poll()}} but do it when ca= lling {{ConsumerRecord#key()}} and {{ConsumerRecord#value()}} (similar to t= he old consumer) > I believe any of those solutions breaks compatibility semantic wise but n= ot necessary binary compatibility as the {{SerializationException}} is a {{= RuntimeException}} so it could be "moved around". > My preference goes to the two last ones and I would be happy to contribut= e such a change as well as update the documentation on {{SerializationExcep= tion}} to reflect that it is not only used for serializing records. > \[1\] https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/ka= fka/message/MessageAndMetadata.scala > \[1\] http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-form= atter.html#serializer > \[2\] https://github.com/slaunay/kafka-consumer-serialization-exception-e= xample > \[3\] https://groups.google.com/forum/#!topic/kafka-clients/KBSPmY69H94 -- This message was sent by Atlassian JIRA (v6.3.15#6346)