flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer
Date Sat, 18 Mar 2017 09:06:41 GMT

     [ https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Tzu-Li (Gordon) Tai updated FLINK-6109:
---------------------------------------
    Description: 
This is a feature discussed in this ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.

As discussed, we can expose two kinds of "consumer lag" metrics for this:

 - *current consumer lag per partition:* the current difference between the latest offset
and the last collected record. This metric is calculated and updated at a configurable interval.
This metric basically serves as an indicator of how the consumer is keeping up with the head
of partitions. I propose to name this {{currentOffsetLag}}.

 - *Consumer lag of last checkpoint per partition:* the difference between the latest offset
and the offset stored in the checkpoint. This metric is only updated when checkpoints are
completed. It serves as an indicator of how much data may need to be replayed in case of a
failure. I propose to name this {{lastCheckpointedOffsetLag}}.

I don't think it is reasonable to define a metric of whether or not a consumer has "caught
up" with the HEAD. That would imply a threshold for the offset difference. We should probably
leave this "caught up" logic for the user to determine themselves when they query this metric.

The granularity of the metric is per-FlinkKafkaConsumer, and independent of the consumer group.id
used (the offset used to calculate consumer lag is the internal offset state of the FlinkKafkaConsumer,
not the consumer group's committed offsets in Kafka).

  was:
This is a feature discussed in this ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.

As discussed, we can expose two kinds of "consumer lag" metrics for this:

 - *current consumer lag per partition:* the current difference between the latest offset
and the last collected record record of a partition. This metric is calculated and updated
at a configurable interval. This metric basically serves as an indicator of how the consumer
is keeping up with the head of partitions. I propose to name this {{currentOffsetLag}}.

 - *Consumer lag of last checkpoint per partition:* the difference between the latest offset
and the offset stored in the checkpoint of a partition. This metric is only updated when checkpoints
are completed. It serves as an indicator of how much data may need to be replayed in case
of a failure. I propose to name this {{lastCheckpointedOffsetLag}}.

I don't think it is reasonable to define a metric of whether or not a consumer has "caught
up" with the HEAD. That would imply a threshold for the offset difference. We should probably
leave this "caught up" logic for the user to determine themselves when they query this metric.

The granularity of the metric is per-FlinkKafkaConsumer, and independent of the consumer group.id
used (the offset used to calculate consumer lag is the internal offset state of the FlinkKafkaConsumer,
not the consumer group's committed offsets in Kafka).


> Add "consumer lag" report metric to FlinkKafkaConsumer
> ------------------------------------------------------
>
>                 Key: FLINK-6109
>                 URL: https://issues.apache.org/jira/browse/FLINK-6109
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>
> This is a feature discussed in this ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the latest offset
and the last collected record. This metric is calculated and updated at a configurable interval.
This metric basically serves as an indicator of how the consumer is keeping up with the head
of partitions. I propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between the latest
offset and the offset stored in the checkpoint. This metric is only updated when checkpoints
are completed. It serves as an indicator of how much data may need to be replayed in case
of a failure. I propose to name this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a consumer has "caught
up" with the HEAD. That would imply a threshold for the offset difference. We should probably
leave this "caught up" logic for the user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of the consumer
group.id used (the offset used to calculate consumer lag is the internal offset state of the
FlinkKafkaConsumer, not the consumer group's committed offsets in Kafka).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message