kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6376) Improve Streams metrics for skipped records
Date Wed, 28 Mar 2018 19:54:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418023#comment-16418023

John Roesler commented on KAFKA-6376:

Thread-level metric looks like it only counts this:

{quote}Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the
record is skipped and not added to the queue for processing

Ah, but it drills down into org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords
where we have
{quote}final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext,
if (record == null) {
{quote}// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
All other code paths in there either add the record or throw.


The former of these cases is accounted in org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
catch (final Exception deserializationException) {{quote}



> Improve Streams metrics for skipped records
> -------------------------------------------
>                 Key: KAFKA-6376
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6376
>             Project: Kafka
>          Issue Type: Improvement
>          Components: metrics, streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: John Roesler
>            Priority: Major
>              Labels: needs-kip
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during user-customized
processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL operations. This
should be included as well.

This message was sent by Atlassian JIRA

View raw message