kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seweryn Habdank-Wojewodzki (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5779) Single message may exploit application based on KStream
Date Thu, 31 Aug 2017 09:27:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148708#comment-16148708

Seweryn Habdank-Wojewodzki commented on KAFKA-5779:


> Single message may exploit application based on KStream
> -------------------------------------------------------
>                 Key: KAFKA-5779
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5779
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions:,
>            Reporter: Seweryn Habdank-Wojewodzki
>            Priority: Critical
> The context: in Kafka streamming I am *defining* simple KStream processing:
> {code}
>     stringInput // line 54 of the SingleTopicStreamer class
>         .filter( streamFilter::passOrFilterMessages )
>         .map( normalizer )
>         .to( outTopicName );
> {code}
> For some reasons I got wrong message (I am still investigating what is the problem),

> but anyhow my services was exploited with FATAL error:
> {code}
> 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled exception: Input
record ConsumerRecord(topic = XXX_topic, partition = 8, offset = 15, CreateTime = -1, serialized
key size = -1, serialized value size = 255, headers = RecordHeaders(headers = [], isReadOnly
= false), key = null, value = {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to
write this record to Kafka without embedding a timestamp, or because the input topic was created
before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process
this data.; [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464), org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)] in thread
> {code}
> The possible reason about using old producer in message is false, as we are using Kafka and and the topics had been created within this version of Kafka. 
> The sender application is .NET client from Confluent.
> All the matter is a bit problematic with this exception, as it was suggested it is thrown
in scope of initialization of the stream, but effectively it happend in processing, so adding
try{} catch {} around stringInput statement does not help, as stream was correctly defined,
but only one message send later had exploited all the app.
> In my opinion KStream shall be robust enough to catch all such a exception and shall
protect application from death due to single corrupted message. Especially when timestamp
is not embedded. In such a case one can patch message with current timestamp without loss
of overall performance.
> I would expect Kafka Stream will handle this.
> I will continue to investigate, what is the problem with the message, but it is quite
hard to me, as it happens internally in Kafka stream combined with .NET producer.
> And I had already tested, that this problem does not occur when I got these concrete
messages in old-fashioned Kafka Consumer :-).

This message was sent by Atlassian JIRA

View raw message