kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6376) Improve Streams metrics for skipped records
Date Wed, 28 Mar 2018 19:54:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418023#comment-16418023
] 

John Roesler commented on KAFKA-6376:
-------------------------------------

Thread-level metric looks like it only counts this:

org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the
record is skipped and not added to the queue for processing
{quote}
 

Ah, but it drills down into org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords
where we have
{quote}final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext,
rawRecord);
if (record == null) {
 continue;
}{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
 continue;
}{quote}
All other code paths in there either add the record or throw.

 

The former of these cases is accounted in org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
catch (final Exception deserializationException) {{quote}
 

 

 

> Improve Streams metrics for skipped records
> -------------------------------------------
>
>                 Key: KAFKA-6376
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6376
>             Project: Kafka
>          Issue Type: Improvement
>          Components: metrics, streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: John Roesler
>            Priority: Major
>              Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during user-customized
processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL operations. This
should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message