kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seweryn Habdank-Wojewodzki (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-5779) Single message may exploit application based on KStream
Date Tue, 29 Aug 2017 06:41:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144808#comment-16144808
] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-5779 at 8/29/17 6:40 AM:
----------------------------------------------------------------------------

Additional comment. There is something logically contradictory in the code.

{code}
// RecordQueue.addRawRecords
        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
            ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
            long timestamp = timestampExtractor.extract(record, timeTracker.get());
            log.trace("Source node {} extracted timestamp {} for record {}", source.name(),
timestamp, record);

            // drop message if TS is invalid, i.e., negative
            if (timestamp < 0) {
                continue;
            }

            StampedRecord stampedRecord = new StampedRecord(record, timestamp);
            fifoQueue.addLast(stampedRecord);
            timeTracker.addElement(stampedRecord);
        } 
{code}

The problem is following: 

In the time extractor (ExtractRecordMetadataTimestamp.extract + FailOnInvalidTimestamp.onInvalidTimestamp)
exception is thrown when time stamp is negative.
In the code of RecordQueue.addRawRecords the message is skipped - comment "drop message if
TS is invalid, i.e., negative".

So in fact this "if" in RecordQueue.addRawRecords is dead code, because exception is thrown
and not caught before, additionally both actions have the same logical condition.

I would prefer that exception is not thrown and only message is skipped (perhaps with LOG
at debug level). Sounds this reasonable?


was (Author: habdank):
Additional comment. There is something logically contradictory in the code.

{code}
// RecordQueue.addRawRecords
        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
            ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
            long timestamp = timestampExtractor.extract(record, timeTracker.get());
            log.trace("Source node {} extracted timestamp {} for record {}", source.name(),
timestamp, record);

            // drop message if TS is invalid, i.e., negative
            if (timestamp < 0) {
                continue;
            }

            StampedRecord stampedRecord = new StampedRecord(record, timestamp);
            fifoQueue.addLast(stampedRecord);
            timeTracker.addElement(stampedRecord);
        } 
{code}

The problem if following: 

In the time extractor (ExtractRecordMetadataTimestamp.extract + FailOnInvalidTimestamp.onInvalidTimestamp)
exception is thrown when time stamp is negative.
In the code of RecordQueue.addRawRecords the message is skipped - comment "drop message if
TS is invalid, i.e., negative".

So in fact this "if" in RecordQueue.addRawRecords is dead code, because exception is thrown
and not caught before, additionally both actions have the same logical condition.

I would prefer that exception is not thrown and only message is skipped (perhaps with LOG
at debug level). Sounds this reasonable?

> Single message may exploit application based on KStream
> -------------------------------------------------------
>
>                 Key: KAFKA-5779
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5779
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.2.1, 0.11.0.0
>            Reporter: Seweryn Habdank-Wojewodzki
>            Priority: Critical
>
> The context: in Kafka streamming I am *defining* simple KStream processing:
> {code}
>     stringInput // line 54 of the SingleTopicStreamer class
>         .filter( streamFilter::passOrFilterMessages )
>         .map( normalizer )
>         .to( outTopicName );
> {code}
> For some reasons I got wrong message (I am still investigating what is the problem),

> but anyhow my services was exploited with FATAL error:
> {code}
> 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled exception: Input
record ConsumerRecord(topic = XXX_topic, partition = 8, offset = 15, CreateTime = -1, serialized
key size = -1, serialized value size = 255, headers = RecordHeaders(headers = [], isReadOnly
= false), key = null, value = {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to
write this record to Kafka without embedding a timestamp, or because the input topic was created
before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process
this data.; [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464), org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)] in thread
restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
> {code}
> The possible reason about using old producer in message is false, as we are using Kafka
0.10.2.1 and 0.11.0.0 and the topics had been created within this version of Kafka. 
> The sender application is .NET client from Confluent.
> All the matter is a bit problematic with this exception, as it was suggested it is thrown
in scope of initialization of the stream, but effectively it happend in processing, so adding
try{} catch {} around stringInput statement does not help, as stream was correctly defined,
but only one message send later had exploited all the app.
> In my opinion KStream shall be robust enough to catch all such a exception and shall
protect application from death due to single corrupted message. Especially when timestamp
is not embedded. In such a case one can patch message with current timestamp without loss
of overall performance.
> I would expect Kafka Stream will handle this.
> I will continue to investigate, what is the problem with the message, but it is quite
hard to me, as it happens internally in Kafka stream combined with .NET producer.
> And I had already tested, that this problem does not occur when I got these concrete
messages in old-fashioned Kafka Consumer :-).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message