kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop
Date Fri, 08 Sep 2017 18:19:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16159062#comment-16159062
] 

Guozhang Wang commented on KAFKA-4740:
--------------------------------------

I'd agree with [~hachikuji] here. In the old consumer a single record is returned from the
iterator though we buffer a batch of them behind the scene; so when the serde fails it is
straight forward for users to choose whether skip / handle. In the new consumer we return
a batch of records in one call (though we may also buffer more behind the scene), and when
serde failed later it is less intuitive which position to rewind to since some of the previous
records may already be processed or even committed.

> Using new consumer API with a Deserializer that throws SerializationException can lead
to infinite loop
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4740
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4740
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>         Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on the broker
version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>            Reporter: S├ębastien Launay
>            Assignee: S├ębastien Launay
>            Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws a {{SerializationException}}
through {{MessageAndMetadata#key()}} and {{MessageAndMetadata#message()}} that can be catched
by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such the exception
is swallowed by the {{NetworkClient}} class and result in an infinite loop which the client
has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition
test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 for partition
test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer
is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer
is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another issue still
remains.
> Indeed, the client can now catch the {{SerializationException}} but the next call to
{{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most released kafka-clients
versions):
> {code:java}
> try (KafkaConsumer<String, Integer> kafkaConsumer = new KafkaConsumer<>(consumerConfig,
new StringDeserializer(), new IntegerDeserializer())) {
>     kafkaConsumer.subscribe(Arrays.asList("topic"));
>     // Will run till the shutdown hook is called
>     while (!doStop) {
>         try {
>             ConsumerRecords<String, Integer> records = kafkaConsumer.poll(1000);
>             if (!records.isEmpty()) {
>                 logger.info("Got {} messages", records.count());
>                 for (ConsumerRecord<String, Integer> record : records) {
>                     logger.info("Message with partition: {}, offset: {}, key: {}, value:
{}",
>                     record.partition(), record.offset(), record.key(), record.value());
>                 }
>             } else {
>                     logger.info("No messages to consume");
>             }
>         } catch (SerializationException e) {
>             logger.warn("Failed polling some records", e);
>         }
>      }
> }
> {code}
> when run with the following records (third record has an invalid Integer value):
> {noformat}
>     printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list localhost:9092
--topic topic
>     printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list localhost:9092
--topic topic
>     printf "\x00\x00\x00\n"     | bin/kafka-console-producer.sh --broker-list localhost:9092
--topic topic
>     printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list localhost:9092
--topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing key/value
for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received
by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing key/value
for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received
by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing key/value
for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received
by IntegerDeserializer is not 4
> ...
> {noformat}
> I don't believe committing offsets would help and even if it did this could potentially
result in a few well formed records not being consumed from that {{ConsumerRecords}} batch
(data loss).
> I have only seen a few mentions of this bug online \[3\] but I believe this is a critical
issue as the new consumer API is not in beta anymore yet if you do not control producers (that
can inject malformed values) or you use some advanced deserializer that throws such exception
(e.g. schema-registry {{KafkaAvroDeserializer}}) then you can end up blocking a consumer from
advancing in the stream.
> Current workarounds:
> - use a {{Deserializer}} that do not throw a {{SerializationException}} (e.g.  {{ByteArrayDeserializer}},
{{StringDeserializer}})
> - wrap the {{Deserializer}} to catch and log the {{SerializationException}} but return
{{null}} and then check for {{null}} in the client code (that's what we use on top of {{KafkaAvroDeserializer}}
in case there is an issue reaching the schema registry or the Avro datum is either invalid
or not compatible with the reader's schema for some reason)
> Potential solutions:
> # continue to throw {{SerializationException}} when calling {{Consumer#poll(long)}} but
skip that malformed record on next {{Consumer#poll(long)}}
> # do not throw {{SerializationException}} when calling {{Consumer#poll(long)}} but expose
information about invalid records in {{ConsumerRecords}}
> # do not throw {{SerializationException}} when calling {{Consumer#poll(long)}} but store
the exception(s) in the {{ConsumerRecord}} object record so that it is rethrown on  {{ConsumerRecord#key()}}
and {{ConsumerRecord#value()}}
> # do not deserialize records during {{Consumer#poll()}} but do it when calling {{ConsumerRecord#key()}}
and {{ConsumerRecord#value()}} (similar to the old consumer)
> I believe any of those solutions breaks compatibility semantic wise but not necessary
binary compatibility as the {{SerializationException}} is a {{RuntimeException}} so it could
be "moved around".
> My preference goes to the two last ones and I would be happy to contribute such a change
as well as update the documentation on {{SerializationException}} to reflect that it is not
only used for serializing records.
> \[1\] https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/message/MessageAndMetadata.scala
> \[1\] http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-formatter.html#serializer
> \[2\] https://github.com/slaunay/kafka-consumer-serialization-exception-example
> \[3\] https://groups.google.com/forum/#!topic/kafka-clients/KBSPmY69H94



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message