kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seweryn Habdank-Wojewodzki (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-5779) Single message may exploit application based on KStream
Date Thu, 24 Aug 2017 07:29:00 GMT
Seweryn Habdank-Wojewodzki created KAFKA-5779:

             Summary: Single message may exploit application based on KStream
                 Key: KAFKA-5779
                 URL: https://issues.apache.org/jira/browse/KAFKA-5779
             Project: Kafka
          Issue Type: Bug
    Affects Versions:,
            Reporter: Seweryn Habdank-Wojewodzki
            Priority: Critical

The context: in Kafka streamming I am *defining* simple KStream processing:

    stringInput // line 54 of the SingleTopicStreamer class
        .filter( streamFilter::passOrFilterMessages )
        .map( normalizer )
        .to( outTopicName );

For some reasons I got wrong message (I am still investigating what is the problem), 
but anyhow my services was exploited with FATAL error:

2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled exception: Input record
ConsumerRecord(topic = XXX_topic, partition = 8, offset = 15, CreateTime = -1, serialized
key size = -1, serialized value size = 255, headers = RecordHeaders(headers = [], isReadOnly
= false), key = null, value = {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to
write this record to Kafka without embedding a timestamp, or because the input topic was created
before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process
this data.; [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464), org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)] in thread

The possible reason about using old producer in message is false, as we are using Kafka
and and the topics had been created within this version of Kafka. 
The sender application is .NET client from Confluent.

All the matter is a bit problematic with this exception, as it was suggested it is thrown
in scope of initialization of the stream, but effectively it happend in processing, so adding
try{} catch {} around stringInput statement does not help, as stream was correctly defined,
but only one message send later had exploited all the app.

In my opinion KStream shall be robust enough to catch all such a exception and shall protect
application from death due to single corrupted message. Especially when timestamp is not embedded.
In such a case one can patch message with current timestamp without loss of overall performance.

I would expect Kafka Stream will handle this.

I will continue to investigate, what is the problem with the message, but it is quite hard
to me, as it happens internally in Kafka stream combined with .NET producer.

And I had already tested, that this problem does not occur when I got these concrete messages
in old-fashioned Kafka Consumer :-).

This message was sent by Atlassian JIRA

View raw message