From commits-return-15641-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Feb 6 20:03:05 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 939A5180771 for ; Tue, 6 Feb 2018 20:03:04 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 83773160C5E; Tue, 6 Feb 2018 19:03:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 04839160C5C for ; Tue, 6 Feb 2018 20:03:02 +0100 (CET) Received: (qmail 84679 invoked by uid 500); 6 Feb 2018 19:03:02 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 84398 invoked by uid 99); 6 Feb 2018 19:03:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Feb 2018 19:03:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BCDE1F217E; Tue, 6 Feb 2018 19:03:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tzulitai@apache.org To: commits@flink.apache.org Date: Tue, 06 Feb 2018 19:03:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/28] flink git commit: [FLINK-8419] [kafka] Move consumer metric group registration to FlinkKafkaConsumerBase [FLINK-8419] [kafka] Move consumer metric group registration to FlinkKafkaConsumerBase This commit is a refactor to move the registration of the consumer metric group (user scope "KafkaConsumer") to FlinkKafkaConsumerBase. Previously, the registration was scattered around in Kafka version-specific subclasses. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5ac4b08 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5ac4b08 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5ac4b08 Branch: refs/heads/release-1.4 Commit: e5ac4b08f75e785f257d0fe5010377000e5a0185 Parents: a951852 Author: Tzu-Li (Gordon) Tai Authored: Mon Jan 22 13:36:20 2018 +0100 Committer: Tzu-Li (Gordon) Tai Committed: Tue Feb 6 17:31:32 2018 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer010.java | 11 +++++----- .../kafka/internal/Kafka010Fetcher.java | 6 ++++-- .../kafka/internal/Kafka010FetcherTest.java | 9 ++++++--- .../connectors/kafka/FlinkKafkaConsumer08.java | 12 +++++++---- .../kafka/internals/Kafka08Fetcher.java | 8 ++------ .../connectors/kafka/FlinkKafkaConsumer09.java | 16 +++++++++------ .../kafka/internal/Kafka09Fetcher.java | 13 ++++++------ .../kafka/internal/KafkaConsumerThread.java | 14 ++++++------- .../kafka/internal/Kafka09FetcherTest.java | 9 ++++++--- .../kafka/internal/KafkaConsumerThreadTest.java | 6 +++--- .../kafka/FlinkKafkaConsumerBase.java | 21 +++++++++++++++++--- .../kafka/internals/AbstractFetcher.java | 9 +++++---- .../FlinkKafkaConsumerBaseMigrationTest.java | 8 ++++++-- .../kafka/FlinkKafkaConsumerBaseTest.java | 8 ++++++-- .../kafka/internals/AbstractFetcherTest.java | 2 ++ 15 files changed, 95 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index 6fb63e1..b102ee0 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -31,7 +32,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.util.PropertiesUtil; import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -178,9 +178,9 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { - - boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false); + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; // this overwrites whatever setting the user configured in the properties @@ -197,10 +197,11 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(), - runtimeContext.getMetricGroup(), deserializer, properties, pollTimeout, + runtimeContext.getMetricGroup(), + consumerMetricGroup, useMetrics); } http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index eb4dfee..c2639ca 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -53,10 +53,11 @@ public class Kafka010Fetcher extends Kafka09Fetcher { long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, - MetricGroup metricGroup, KeyedDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, + MetricGroup subtaskMetricGroup, + MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super( sourceContext, @@ -67,10 +68,11 @@ public class Kafka010Fetcher extends Kafka09Fetcher { autoWatermarkInterval, userCodeClassLoader, taskNameWithSubtasks, - metricGroup, deserializer, kafkaProperties, pollTimeout, + subtaskMetricGroup, + consumerMetricGroup, useMetrics); } http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java index 45ceadc..f57fbea 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java @@ -126,10 +126,11 @@ public class Kafka010FetcherTest { 10, getClass().getClassLoader(), "taskname-with-subtask", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- @@ -262,10 +263,11 @@ public class Kafka010FetcherTest { 10, getClass().getClassLoader(), "taskname-with-subtask", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- @@ -376,10 +378,11 @@ public class Kafka010FetcherTest { 10, /* watermark interval */ this.getClass().getClassLoader(), "task_name", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index f362046..af2efd1 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -43,6 +44,7 @@ import java.util.Properties; import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.PropertiesUtil.getBoolean; import static org.apache.flink.util.PropertiesUtil.getLong; /** @@ -216,7 +218,8 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { deserializer, getLong( checkNotNull(props, "props"), - KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED)); + KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), + !getBoolean(props, KEY_DISABLE_METRICS, false)); this.kafkaProperties = props; @@ -234,9 +237,9 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { - - boolean useMetrics = !PropertiesUtil.getBoolean(kafkaProperties, KEY_DISABLE_METRICS, false); + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { long autoCommitInterval = (offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC) ? PropertiesUtil.getLong(kafkaProperties, "auto.commit.interval.ms", 60000) @@ -251,6 +254,7 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { deserializer, kafkaProperties, autoCommitInterval, + consumerMetricGroup, useMetrics); } http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index 8bcd663..d8bdd22 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -95,6 +95,7 @@ public class Kafka08Fetcher extends AbstractFetcher { KeyedDeserializationSchema deserializer, Properties kafkaProperties, long autoCommitInterval, + MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super( sourceContext, @@ -104,6 +105,7 @@ public class Kafka08Fetcher extends AbstractFetcher { runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), + consumerMetricGroup, useMetrics); this.deserializer = checkNotNull(deserializer); @@ -175,12 +177,6 @@ public class Kafka08Fetcher extends AbstractFetcher { periodicCommitter.start(); } - // register offset metrics - if (useMetrics) { - final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer"); - addOffsetStateGauge(kafkaMetricGroup); - } - // Main loop polling elements from the unassignedPartitions queue to the threads while (running) { // re-throw any exception from the concurrent fetcher threads http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 79be73c..fe75006 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -46,6 +47,7 @@ import java.util.Properties; import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.PropertiesUtil.getBoolean; import static org.apache.flink.util.PropertiesUtil.getLong; /** @@ -207,7 +209,8 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { deserializer, getLong( checkNotNull(props, "props"), - KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED)); + KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), + !getBoolean(props, KEY_DISABLE_METRICS, false)); this.properties = props; setDeserializer(this.properties); @@ -232,9 +235,9 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { - - boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false); + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; // this overwrites whatever setting the user configured in the properties @@ -251,10 +254,11 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(), - runtimeContext.getMetricGroup(), deserializer, properties, pollTimeout, + runtimeContext.getMetricGroup(), + consumerMetricGroup, useMetrics); } @@ -269,7 +273,7 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { @Override protected boolean getIsAutoCommitEnabled() { - return PropertiesUtil.getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) && + return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) && PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 51f69cd..aca0da1 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -80,10 +80,11 @@ public class Kafka09Fetcher extends AbstractFetcher { long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, - MetricGroup metricGroup, KeyedDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, + MetricGroup subtaskMetricGroup, + MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super( sourceContext, @@ -92,25 +93,23 @@ public class Kafka09Fetcher extends AbstractFetcher { watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, - userCodeClassLoader, + userCodeClassLoader.getParent(), + consumerMetricGroup, useMetrics); this.deserializer = deserializer; this.handover = new Handover(); - final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer"); - addOffsetStateGauge(kafkaMetricGroup); - this.consumerThread = new KafkaConsumerThread( LOG, handover, kafkaProperties, unassignedPartitionsQueue, - kafkaMetricGroup, createCallBridge(), getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, - useMetrics); + useMetrics, + subtaskMetricGroup); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java index ee0a63b..e21bd5f 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java @@ -78,9 +78,6 @@ public class KafkaConsumerThread extends Thread { /** The queue of unassigned partitions that we need to assign to the Kafka consumer. */ private final ClosableBlockingQueue> unassignedPartitionsQueue; - /** We get this from the outside to publish metrics. **/ - private final MetricGroup kafkaMetricGroup; - /** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken. */ private final KafkaConsumerCallBridge consumerCallBridge; @@ -90,6 +87,9 @@ public class KafkaConsumerThread extends Thread { /** Flag whether to add Kafka's metrics to the Flink metrics. */ private final boolean useMetrics; + /** We get this from the outside to publish metrics. */ + private final MetricGroup subtaskMetricGroup; + /** Reference to the Kafka consumer, once it is created. */ private volatile KafkaConsumer consumer; @@ -116,11 +116,11 @@ public class KafkaConsumerThread extends Thread { Handover handover, Properties kafkaProperties, ClosableBlockingQueue> unassignedPartitionsQueue, - MetricGroup kafkaMetricGroup, KafkaConsumerCallBridge consumerCallBridge, String threadName, long pollTimeout, - boolean useMetrics) { + boolean useMetrics, + MetricGroup subtaskMetricGroup) { super(threadName); setDaemon(true); @@ -128,7 +128,7 @@ public class KafkaConsumerThread extends Thread { this.log = checkNotNull(log); this.handover = checkNotNull(handover); this.kafkaProperties = checkNotNull(kafkaProperties); - this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup); + this.subtaskMetricGroup = checkNotNull(subtaskMetricGroup); this.consumerCallBridge = checkNotNull(consumerCallBridge); this.unassignedPartitionsQueue = checkNotNull(unassignedPartitionsQueue); @@ -176,7 +176,7 @@ public class KafkaConsumerThread extends Thread { } else { // we have Kafka metrics, register them for (Map.Entry metric: metrics.entrySet()) { - kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); + subtaskMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java index e4e276a..27b67f1 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java @@ -126,10 +126,11 @@ public class Kafka09FetcherTest { 10, /* watermark interval */ this.getClass().getClassLoader(), "task_name", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- @@ -261,10 +262,11 @@ public class Kafka09FetcherTest { 10, /* watermark interval */ this.getClass().getClassLoader(), "task_name", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- @@ -375,10 +377,11 @@ public class Kafka09FetcherTest { 10, /* watermark interval */ this.getClass().getClassLoader(), "task_name", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java index 2368091..607a1a9 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internal; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; @@ -716,11 +716,11 @@ public class KafkaConsumerThreadTest { handover, new Properties(), unassignedPartitionsQueue, - mock(MetricGroup.class), new KafkaConsumerCallBridge(), "test-kafka-consumer-thread", 0, - false); + false, + new UnregisteredMetricsGroup()); this.mockConsumer = mockConsumer; } http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index d71827f..2645ddc 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -188,6 +189,13 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti // internal metrics // ------------------------------------------------------------------------ + /** + * Flag indicating whether or not metrics should be exposed. + * If {@code true}, offset metrics (e.g. current offset, committed offset) and + * Kafka-shipped metrics will be registered. + */ + private final boolean useMetrics; + /** Counter for successful Kafka offset commits. */ private transient Counter successfulCommits; @@ -217,7 +225,8 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti List topics, Pattern topicPattern, KeyedDeserializationSchema deserializer, - long discoveryIntervalMillis) { + long discoveryIntervalMillis, + boolean useMetrics) { this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern); this.deserializer = checkNotNull(deserializer, "valueDeserializer"); @@ -225,6 +234,8 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >= 0, "Cannot define a negative value for the topic / partition discovery interval."); this.discoveryIntervalMillis = discoveryIntervalMillis; + + this.useMetrics = useMetrics; } // ------------------------------------------------------------------------ @@ -556,7 +567,9 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), - offsetCommitMode); + offsetCommitMode, + getRuntimeContext().getMetricGroup().addGroup("KafkaConsumer"), + useMetrics); if (!running) { return; @@ -835,7 +848,9 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception; + OffsetCommitMode offsetCommitMode, + MetricGroup kafkaMetricGroup, + boolean useMetrics) throws Exception; /** * Creates the partition discoverer that is used to find new partitions for this subtask. http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 5240326..441e660 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -83,9 +83,6 @@ public abstract class AbstractFetcher { /** The mode describing whether the fetcher also generates timestamps and watermarks. */ protected final int timestampWatermarkMode; - /** Flag whether to register metrics for the fetcher. */ - protected final boolean useMetrics; - /** * Optional timestamp extractor / watermark generator that will be run per Kafka partition, * to exploit per-partition timestamp characteristics. @@ -116,10 +113,10 @@ public abstract class AbstractFetcher { ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, + MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); this.checkpointLock = sourceContext.getCheckpointLock(); - this.useMetrics = useMetrics; this.userCodeClassLoader = checkNotNull(userCodeClassLoader); // figure out what we watermark mode we will be using @@ -163,6 +160,10 @@ public abstract class AbstractFetcher { unassignedPartitionsQueue.add(partition); } + if (useMetrics) { + addOffsetStateGauge(checkNotNull(consumerMetricGroup)); + } + // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 84f0e38..a533cca 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; @@ -376,7 +377,8 @@ public class FlinkKafkaConsumerBaseMigrationTest { Arrays.asList("dummy-topic"), null, (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class), - discoveryInterval); + discoveryInterval, + false); this.fetcher = fetcher; this.partitions = partitions; @@ -393,7 +395,9 @@ public class FlinkKafkaConsumerBaseMigrationTest { SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { return fetcher; } http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index f091c08..b4ee378 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -639,7 +639,8 @@ public class FlinkKafkaConsumerBaseTest { Collections.singletonList("dummy-topic"), null, (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class), - PARTITION_DISCOVERY_DISABLED); + PARTITION_DISCOVERY_DISABLED, + false); this.testFetcher = testFetcher; this.testPartitionDiscoverer = testPartitionDiscoverer; @@ -654,7 +655,9 @@ public class FlinkKafkaConsumerBaseTest { SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { return this.testFetcher; } @@ -737,6 +740,7 @@ public class FlinkKafkaConsumerBaseTest { new TestProcessingTimeService(), 0, MockFetcher.class.getClassLoader(), + new UnregisteredMetricsGroup(), false); this.stateSnapshotsToReturn.addAll(Arrays.asList(stateSnapshotsToReturn)); http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java index 6fe1d6f..d276cfb 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.internals; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; @@ -411,6 +412,7 @@ public class AbstractFetcherTest { processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), + new UnregisteredMetricsGroup(), false); }