flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [07/28] flink git commit: [FLINK-8419] [kafka] Move consumer metric group registration to FlinkKafkaConsumerBase
Date Tue, 06 Feb 2018 19:03:07 GMT
[FLINK-8419] [kafka] Move consumer metric group registration to FlinkKafkaConsumerBase

This commit is a refactor to move the registration of the consumer
metric group (user scope "KafkaConsumer") to FlinkKafkaConsumerBase.
Previously, the registration was scattered around in Kafka
version-specific subclasses.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5ac4b08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5ac4b08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5ac4b08

Branch: refs/heads/release-1.4
Commit: e5ac4b08f75e785f257d0fe5010377000e5a0185
Parents: a951852
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Mon Jan 22 13:36:20 2018 +0100
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Feb 6 17:31:32 2018 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer010.java | 11 +++++-----
 .../kafka/internal/Kafka010Fetcher.java         |  6 ++++--
 .../kafka/internal/Kafka010FetcherTest.java     |  9 ++++++---
 .../connectors/kafka/FlinkKafkaConsumer08.java  | 12 +++++++----
 .../kafka/internals/Kafka08Fetcher.java         |  8 ++------
 .../connectors/kafka/FlinkKafkaConsumer09.java  | 16 +++++++++------
 .../kafka/internal/Kafka09Fetcher.java          | 13 ++++++------
 .../kafka/internal/KafkaConsumerThread.java     | 14 ++++++-------
 .../kafka/internal/Kafka09FetcherTest.java      |  9 ++++++---
 .../kafka/internal/KafkaConsumerThreadTest.java |  6 +++---
 .../kafka/FlinkKafkaConsumerBase.java           | 21 +++++++++++++++++---
 .../kafka/internals/AbstractFetcher.java        |  9 +++++----
 .../FlinkKafkaConsumerBaseMigrationTest.java    |  8 ++++++--
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  8 ++++++--
 .../kafka/internals/AbstractFetcherTest.java    |  2 ++
 15 files changed, 95 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 6fb63e1..b102ee0 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -31,7 +32,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -178,9 +178,9 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T>
{
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext,
-			OffsetCommitMode offsetCommitMode) throws Exception {
-
-		boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
+			OffsetCommitMode offsetCommitMode,
+			MetricGroup consumerMetricGroup,
+			boolean useMetrics) throws Exception {
 
 		// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
 		// this overwrites whatever setting the user configured in the properties
@@ -197,10 +197,11 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T>
{
 				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
 				runtimeContext.getUserCodeClassLoader(),
 				runtimeContext.getTaskNameWithSubtasks(),
-				runtimeContext.getMetricGroup(),
 				deserializer,
 				properties,
 				pollTimeout,
+				runtimeContext.getMetricGroup(),
+				consumerMetricGroup,
 				useMetrics);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index eb4dfee..c2639ca 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -53,10 +53,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T>
{
 			long autoWatermarkInterval,
 			ClassLoader userCodeClassLoader,
 			String taskNameWithSubtasks,
-			MetricGroup metricGroup,
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
+			MetricGroup subtaskMetricGroup,
+			MetricGroup consumerMetricGroup,
 			boolean useMetrics) throws Exception {
 		super(
 				sourceContext,
@@ -67,10 +68,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T>
{
 				autoWatermarkInterval,
 				userCodeClassLoader,
 				taskNameWithSubtasks,
-				metricGroup,
 				deserializer,
 				kafkaProperties,
 				pollTimeout,
+				subtaskMetricGroup,
+				consumerMetricGroup,
 				useMetrics);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
index 45ceadc..f57fbea 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -126,10 +126,11 @@ public class Kafka010FetcherTest {
 				10,
 				getClass().getClassLoader(),
 				"taskname-with-subtask",
-				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
+				new UnregisteredMetricsGroup(),
+				new UnregisteredMetricsGroup(),
 				false);
 
 		// ----- run the fetcher -----
@@ -262,10 +263,11 @@ public class Kafka010FetcherTest {
 				10,
 				getClass().getClassLoader(),
 				"taskname-with-subtask",
-				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
+				new UnregisteredMetricsGroup(),
+				new UnregisteredMetricsGroup(),
 				false);
 
 		// ----- run the fetcher -----
@@ -376,10 +378,11 @@ public class Kafka010FetcherTest {
 				10, /* watermark interval */
 				this.getClass().getClassLoader(),
 				"task_name",
-				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
+				new UnregisteredMetricsGroup(),
+				new UnregisteredMetricsGroup(),
 				false);
 
 		// ----- run the fetcher -----

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index f362046..af2efd1 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -43,6 +44,7 @@ import java.util.Properties;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getBoolean;
 import static org.apache.flink.util.PropertiesUtil.getLong;
 
 /**
@@ -216,7 +218,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T>
{
 				deserializer,
 				getLong(
 					checkNotNull(props, "props"),
-					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
+					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
+				!getBoolean(props, KEY_DISABLE_METRICS, false));
 
 		this.kafkaProperties = props;
 
@@ -234,9 +237,9 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T>
{
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext,
-			OffsetCommitMode offsetCommitMode) throws Exception {
-
-		boolean useMetrics = !PropertiesUtil.getBoolean(kafkaProperties, KEY_DISABLE_METRICS, false);
+			OffsetCommitMode offsetCommitMode,
+			MetricGroup consumerMetricGroup,
+			boolean useMetrics) throws Exception {
 
 		long autoCommitInterval = (offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC)
 				? PropertiesUtil.getLong(kafkaProperties, "auto.commit.interval.ms", 60000)
@@ -251,6 +254,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T>
{
 				deserializer,
 				kafkaProperties,
 				autoCommitInterval,
+				consumerMetricGroup,
 				useMetrics);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 8bcd663..d8bdd22 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -95,6 +95,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition>
{
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long autoCommitInterval,
+			MetricGroup consumerMetricGroup,
 			boolean useMetrics) throws Exception {
 		super(
 				sourceContext,
@@ -104,6 +105,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition>
{
 				runtimeContext.getProcessingTimeService(),
 				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
 				runtimeContext.getUserCodeClassLoader(),
+				consumerMetricGroup,
 				useMetrics);
 
 		this.deserializer = checkNotNull(deserializer);
@@ -175,12 +177,6 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition>
{
 				periodicCommitter.start();
 			}
 
-			// register offset metrics
-			if (useMetrics) {
-				final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
-				addOffsetStateGauge(kafkaMetricGroup);
-			}
-
 			// Main loop polling elements from the unassignedPartitions queue to the threads
 			while (running) {
 				// re-throw any exception from the concurrent fetcher threads

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 79be73c..fe75006 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -46,6 +47,7 @@ import java.util.Properties;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getBoolean;
 import static org.apache.flink.util.PropertiesUtil.getLong;
 
 /**
@@ -207,7 +209,8 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 				deserializer,
 				getLong(
 					checkNotNull(props, "props"),
-					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
+					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
+				!getBoolean(props, KEY_DISABLE_METRICS, false));
 
 		this.properties = props;
 		setDeserializer(this.properties);
@@ -232,9 +235,9 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext,
-			OffsetCommitMode offsetCommitMode) throws Exception {
-
-		boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
+			OffsetCommitMode offsetCommitMode,
+			MetricGroup consumerMetricGroup,
+			boolean useMetrics) throws Exception {
 
 		// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
 		// this overwrites whatever setting the user configured in the properties
@@ -251,10 +254,11 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
 				runtimeContext.getUserCodeClassLoader(),
 				runtimeContext.getTaskNameWithSubtasks(),
-				runtimeContext.getMetricGroup(),
 				deserializer,
 				properties,
 				pollTimeout,
+				runtimeContext.getMetricGroup(),
+				consumerMetricGroup,
 				useMetrics);
 	}
 
@@ -269,7 +273,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 
 	@Override
 	protected boolean getIsAutoCommitEnabled() {
-		return PropertiesUtil.getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true) &&
+		return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
 				PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
> 0;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 51f69cd..aca0da1 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -80,10 +80,11 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition>
{
 			long autoWatermarkInterval,
 			ClassLoader userCodeClassLoader,
 			String taskNameWithSubtasks,
-			MetricGroup metricGroup,
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
+			MetricGroup subtaskMetricGroup,
+			MetricGroup consumerMetricGroup,
 			boolean useMetrics) throws Exception {
 		super(
 				sourceContext,
@@ -92,25 +93,23 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition>
{
 				watermarksPunctuated,
 				processingTimeProvider,
 				autoWatermarkInterval,
-				userCodeClassLoader,
+				userCodeClassLoader.getParent(),
+				consumerMetricGroup,
 				useMetrics);
 
 		this.deserializer = deserializer;
 		this.handover = new Handover();
 
-		final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
-		addOffsetStateGauge(kafkaMetricGroup);
-
 		this.consumerThread = new KafkaConsumerThread(
 				LOG,
 				handover,
 				kafkaProperties,
 				unassignedPartitionsQueue,
-				kafkaMetricGroup,
 				createCallBridge(),
 				getFetcherName() + " for " + taskNameWithSubtasks,
 				pollTimeout,
-				useMetrics);
+				useMetrics,
+				subtaskMetricGroup);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index ee0a63b..e21bd5f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -78,9 +78,6 @@ public class KafkaConsumerThread extends Thread {
 	/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
 	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>
unassignedPartitionsQueue;
 
-	/** We get this from the outside to publish metrics. **/
-	private final MetricGroup kafkaMetricGroup;
-
 	/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility
is broken. */
 	private final KafkaConsumerCallBridge consumerCallBridge;
 
@@ -90,6 +87,9 @@ public class KafkaConsumerThread extends Thread {
 	/** Flag whether to add Kafka's metrics to the Flink metrics. */
 	private final boolean useMetrics;
 
+	/** We get this from the outside to publish metrics. */
+	private final MetricGroup subtaskMetricGroup;
+
 	/** Reference to the Kafka consumer, once it is created. */
 	private volatile KafkaConsumer<byte[], byte[]> consumer;
 
@@ -116,11 +116,11 @@ public class KafkaConsumerThread extends Thread {
 			Handover handover,
 			Properties kafkaProperties,
 			ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue,
-			MetricGroup kafkaMetricGroup,
 			KafkaConsumerCallBridge consumerCallBridge,
 			String threadName,
 			long pollTimeout,
-			boolean useMetrics) {
+			boolean useMetrics,
+			MetricGroup subtaskMetricGroup) {
 
 		super(threadName);
 		setDaemon(true);
@@ -128,7 +128,7 @@ public class KafkaConsumerThread extends Thread {
 		this.log = checkNotNull(log);
 		this.handover = checkNotNull(handover);
 		this.kafkaProperties = checkNotNull(kafkaProperties);
-		this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+		this.subtaskMetricGroup = checkNotNull(subtaskMetricGroup);
 		this.consumerCallBridge = checkNotNull(consumerCallBridge);
 
 		this.unassignedPartitionsQueue = checkNotNull(unassignedPartitionsQueue);
@@ -176,7 +176,7 @@ public class KafkaConsumerThread extends Thread {
 				} else {
 					// we have Kafka metrics, register them
 					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
-						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
+						subtaskMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
index e4e276a..27b67f1 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
@@ -126,10 +126,11 @@ public class Kafka09FetcherTest {
 				10, /* watermark interval */
 				this.getClass().getClassLoader(),
 				"task_name",
-				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
+				new UnregisteredMetricsGroup(),
+				new UnregisteredMetricsGroup(),
 				false);
 
 		// ----- run the fetcher -----
@@ -261,10 +262,11 @@ public class Kafka09FetcherTest {
 				10, /* watermark interval */
 				this.getClass().getClassLoader(),
 				"task_name",
-				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
+				new UnregisteredMetricsGroup(),
+				new UnregisteredMetricsGroup(),
 				false);
 
 		// ----- run the fetcher -----
@@ -375,10 +377,11 @@ public class Kafka09FetcherTest {
 				10, /* watermark interval */
 				this.getClass().getClassLoader(),
 				"task_name",
-				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
+				new UnregisteredMetricsGroup(),
+				new UnregisteredMetricsGroup(),
 				false);
 
 		// ----- run the fetcher -----

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 2368091..607a1a9 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -716,11 +716,11 @@ public class KafkaConsumerThreadTest {
 					handover,
 					new Properties(),
 					unassignedPartitionsQueue,
-					mock(MetricGroup.class),
 					new KafkaConsumerCallBridge(),
 					"test-kafka-consumer-thread",
 					0,
-					false);
+					false,
+					new UnregisteredMetricsGroup());
 
 			this.mockConsumer = mockConsumer;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index d71827f..2645ddc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -188,6 +189,13 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	//  internal metrics
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Flag indicating whether or not metrics should be exposed.
+	 * If {@code true}, offset metrics (e.g. current offset, committed offset) and
+	 * Kafka-shipped metrics will be registered.
+	 */
+	private final boolean useMetrics;
+
 	/** Counter for successful Kafka offset commits. */
 	private transient Counter successfulCommits;
 
@@ -217,7 +225,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			List<String> topics,
 			Pattern topicPattern,
 			KeyedDeserializationSchema<T> deserializer,
-			long discoveryIntervalMillis) {
+			long discoveryIntervalMillis,
+			boolean useMetrics) {
 		this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
 		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 
@@ -225,6 +234,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >=
0,
 			"Cannot define a negative value for the topic / partition discovery interval.");
 		this.discoveryIntervalMillis = discoveryIntervalMillis;
+
+		this.useMetrics = useMetrics;
 	}
 
 	// ------------------------------------------------------------------------
@@ -556,7 +567,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				periodicWatermarkAssigner,
 				punctuatedWatermarkAssigner,
 				(StreamingRuntimeContext) getRuntimeContext(),
-				offsetCommitMode);
+				offsetCommitMode,
+				getRuntimeContext().getMetricGroup().addGroup("KafkaConsumer"),
+				useMetrics);
 
 		if (!running) {
 			return;
@@ -835,7 +848,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext,
-			OffsetCommitMode offsetCommitMode) throws Exception;
+			OffsetCommitMode offsetCommitMode,
+			MetricGroup kafkaMetricGroup,
+			boolean useMetrics) throws Exception;
 
 	/**
 	 * Creates the partition discoverer that is used to find new partitions for this subtask.

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 5240326..441e660 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -83,9 +83,6 @@ public abstract class AbstractFetcher<T, KPH> {
 	/** The mode describing whether the fetcher also generates timestamps and watermarks. */
 	protected final int timestampWatermarkMode;
 
-	/** Flag whether to register metrics for the fetcher. */
-	protected final boolean useMetrics;
-
 	/**
 	 * Optional timestamp extractor / watermark generator that will be run per Kafka partition,
 	 * to exploit per-partition timestamp characteristics.
@@ -116,10 +113,10 @@ public abstract class AbstractFetcher<T, KPH> {
 			ProcessingTimeService processingTimeProvider,
 			long autoWatermarkInterval,
 			ClassLoader userCodeClassLoader,
+			MetricGroup consumerMetricGroup,
 			boolean useMetrics) throws Exception {
 		this.sourceContext = checkNotNull(sourceContext);
 		this.checkpointLock = sourceContext.getCheckpointLock();
-		this.useMetrics = useMetrics;
 		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
 
 		// figure out what we watermark mode we will be using
@@ -163,6 +160,10 @@ public abstract class AbstractFetcher<T, KPH> {
 			unassignedPartitionsQueue.add(partition);
 		}
 
+		if (useMetrics) {
+			addOffsetStateGauge(checkNotNull(consumerMetricGroup));
+		}
+
 		// if we have periodic watermarks, kick off the interval scheduler
 		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
 			@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 84f0e38..a533cca 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -376,7 +377,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 				Arrays.asList("dummy-topic"),
 				null,
 				(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class),
-				discoveryInterval);
+				discoveryInterval,
+				false);
 
 			this.fetcher = fetcher;
 			this.partitions = partitions;
@@ -393,7 +395,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 				StreamingRuntimeContext runtimeContext,
-				OffsetCommitMode offsetCommitMode) throws Exception {
+				OffsetCommitMode offsetCommitMode,
+				MetricGroup consumerMetricGroup,
+				boolean useMetrics) throws Exception {
 			return fetcher;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index f091c08..b4ee378 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -639,7 +639,8 @@ public class FlinkKafkaConsumerBaseTest {
 					Collections.singletonList("dummy-topic"),
 					null,
 					(KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class),
-					PARTITION_DISCOVERY_DISABLED);
+					PARTITION_DISCOVERY_DISABLED,
+					false);
 
 			this.testFetcher = testFetcher;
 			this.testPartitionDiscoverer = testPartitionDiscoverer;
@@ -654,7 +655,9 @@ public class FlinkKafkaConsumerBaseTest {
 				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 				StreamingRuntimeContext runtimeContext,
-				OffsetCommitMode offsetCommitMode) throws Exception {
+				OffsetCommitMode offsetCommitMode,
+				MetricGroup consumerMetricGroup,
+				boolean useMetrics) throws Exception {
 			return this.testFetcher;
 		}
 
@@ -737,6 +740,7 @@ public class FlinkKafkaConsumerBaseTest {
 					new TestProcessingTimeService(),
 					0,
 					MockFetcher.class.getClassLoader(),
+					new UnregisteredMetricsGroup(),
 					false);
 
 			this.stateSnapshotsToReturn.addAll(Arrays.asList(stateSnapshotsToReturn));

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ac4b08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 6fe1d6f..d276cfb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
@@ -411,6 +412,7 @@ public class AbstractFetcherTest {
 				processingTimeProvider,
 				autoWatermarkInterval,
 				TestFetcher.class.getClassLoader(),
+				new UnregisteredMetricsGroup(),
 				false);
 		}
 


Mime
View raw message