kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Soby Chacko (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
Date Thu, 01 Feb 2018 02:51:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Soby Chacko updated KAFKA-6502:
-------------------------------
    Description: 
See this StackOverflow issue: [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]

and this comment: [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]

 I am trying to use the LogAndContinueExceptionHandler on deserialization. It works fine
when an error occurs by successfully logging and continuing. However, on a continuous stream
of errors, it seems like these messages are not committed and on a restart of the application
they reappear again.  It is more problematic if I try to send the messages in error to a
DLQ. On a restart, they are sent again to DLQ. As soon as I have a good record coming in,
it looks like the offset moves further and not seeing the already logged messages again after
a restart. 

I reproduced this behavior by running the sample provided here: [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]

I changed the incoming value Serde to {{Serdes.Integer().getClass().getName()}} to force a
deserialization error on input and reduced the commit interval to just 1 second. Also added
the following to the config.

{{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);}}.

 It looks like when deserialization exceptions occur, this flag is never set to be true
here: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
It only becomes true once processing succeeds. That might be the reason why commit is not
happening even after I manually call processorContext#commit().

  was:
See this StackOverflow issue: [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]

and this comment: [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]

 I am trying to use the LogAndContinueExceptionHandler on deserialization. It works fine
when an error occurs by successfully logging and continuing. However, on a continuous stream
of errors, it seems like these messages are not committed and on a restart of the application
they reappear again.  It is more problematic if I try to send the messages in error to a
DLQ. On a restart, they are sent again to DLQ. As soon as I have a good record coming in,
it looks like the offset moves further and not seeing the already logged messages again after
a restart. 

I reproduced this behavior by running the sample provided here: [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]

I changed the incoming value Serde to {{Serdes.Integer().getClass().getName()}} to force a
deserialization error on input and reduced the commit interval to just 1 second. Also added
the following to the config.

{{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);}}.

 

It looks like when deserialization exceptions occur, this flag is never set to be true here:
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
It only becomes true once processing succeeds. That might be the reason why commit is not
happening even after I manually call processorContext#commit().


> Kafka streams deserialization handler not committing offsets on error records
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-6502
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6502
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Soby Chacko
>            Priority: Minor
>
> See this StackOverflow issue: [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It works
fine when an error occurs by successfully logging and continuing. However, on a continuous
stream of errors, it seems like these messages are not committed and on a restart of the application
they reappear again.  It is more problematic if I try to send the messages in error to a
DLQ. On a restart, they are sent again to DLQ. As soon as I have a good record coming in,
it looks like the offset moves further and not seeing the already logged messages again after
a restart. 
> I reproduced this behavior by running the sample provided here: [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to {{Serdes.Integer().getClass().getName()}} to force
a deserialization error on input and reduced the commit interval to just 1 second. Also added
the following to the config.
> {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);}}.
>  It looks like when deserialization exceptions occur, this flag is never set to be
true here: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
It only becomes true once processing succeeds. That might be the reason why commit is not
happening even after I manually call processorContext#commit().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message