flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-4186] Use Flink metrics to report Kafka metrics
Date Fri, 15 Jul 2016 14:30:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master 70094a181 -> 41f581822


[FLINK-4186] Use Flink metrics to report Kafka metrics

This commit also adds monitoring for the current and committed offset

This closes #2236


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41f58182
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41f58182
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41f58182

Branch: refs/heads/master
Commit: 41f58182289226850b23c61a32f01223485d4775
Parents: 70094a1
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Jul 12 13:55:29 2016 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Jul 15 16:30:45 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java  |   4 +-
 .../kafka/internals/Kafka08Fetcher.java         |  38 +++--
 .../connectors/kafka/Kafka08ITCase.java         |   9 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |   2 -
 .../kafka/internal/Kafka09Fetcher.java          |  33 ++--
 .../connectors/kafka/Kafka09ITCase.java         |   9 +-
 .../kafka/FlinkKafkaConsumerBase.java           |   3 +
 .../kafka/FlinkKafkaProducerBase.java           |  19 +--
 .../kafka/internals/AbstractFetcher.java        |  64 +++++++-
 .../internals/KafkaTopicPartitionState.java     |  15 +-
 .../metrics/AvgKafkaMetricAccumulator.java      | 141 ----------------
 .../metrics/DefaultKafkaMetricAccumulator.java  | 159 -------------------
 .../internals/metrics/KafkaMetricWrapper.java   |  37 +++++
 .../metrics/MaxKafkaMetricAccumulator.java      |  57 -------
 .../metrics/MinKafkaMetricAccumulator.java      |  57 -------
 .../connectors/kafka/KafkaConsumerTestBase.java | 139 ++++++++++++++--
 .../AbstractFetcherTimestampsTest.java          |   4 +-
 .../kafka/testutils/MockRuntimeContext.java     |   8 +
 19 files changed, 311 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index f9bfedf..c16629d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -213,10 +213,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 			StreamingRuntimeContext runtimeContext) throws Exception {
 
+		boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
+
 		return new Kafka08Fetcher<>(sourceContext, thisSubtaskPartitions,
 				watermarksPeriodic, watermarksPunctuated,
 				runtimeContext, deserializer, kafkaProperties,
-				invalidOffsetBehavior, autoCommitInterval);
+				invalidOffsetBehavior, autoCommitInterval, useMetrics);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 91fdc71..aee3acc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.connectors.kafka.internals;
 
 import kafka.common.TopicAndPartition;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.kafka.common.Node;
 
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -65,11 +67,8 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	/** The properties that configure the Kafka connection */
 	private final Properties kafkaConfig;
 
-	/** The task name, to give more readable names to the spawned threads */
-	private final String taskName;
-
-	/** The class loader for dynamically loaded classes */
-	private final ClassLoader userCodeClassLoader;
+	/** The subtask's runtime context */
+	private final RuntimeContext runtimeContext;
 
 	/** The queue of partitions that are currently not assigned to a broker connection */
 	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
@@ -78,7 +77,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	private final long invalidOffsetBehavior;
 
 	/** The interval in which to automatically commit (-1 if deactivated) */
-	private final long autoCommitInterval; 
+	private final long autoCommitInterval;
 
 	/** The handler that reads/writes offsets from/to ZooKeeper */
 	private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
@@ -96,14 +95,14 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long invalidOffsetBehavior,
-			long autoCommitInterval) throws Exception
+			long autoCommitInterval,
+			boolean useMetrics) throws Exception
 	{
-		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
+		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, useMetrics);
 
 		this.deserializer = checkNotNull(deserializer);
 		this.kafkaConfig = checkNotNull(kafkaProperties);
-		this.taskName = runtimeContext.getTaskNameWithSubtasks();
-		this.userCodeClassLoader = runtimeContext.getUserCodeClassLoader();
+		this.runtimeContext = runtimeContext;
 		this.invalidOffsetBehavior = invalidOffsetBehavior;
 		this.autoCommitInterval = autoCommitInterval;
 		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
@@ -161,6 +160,12 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 				periodicCommitter.start();
 			}
 
+			// register offset metrics
+			if (useMetrics) {
+				final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
+				addOffsetStateGauge(kafkaMetricGroup);
+			}
+
 			// Main loop polling elements from the unassignedPartitions queue to the threads
 			while (running) {
 				// re-throw any exception from the concurrent fetcher threads
@@ -324,6 +329,15 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 		if (zkHandler != null) {
 			zkHandler.writeOffsets(offsets);
 		}
+
+		// Set committed offsets in topic partition state
+		KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions();
+		for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
+			Long offset = offsets.get(partition.getKafkaTopicPartition());
+			if (offset != null) {
+				partition.setCommittedOffset(offset);
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -338,7 +352,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 		// each thread needs its own copy of the deserializer, because the deserializer is
 		// not necessarily thread safe
 		final KeyedDeserializationSchema<T> clonedDeserializer =
-				InstantiationUtil.clone(deserializer, userCodeClassLoader);
+				InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader());
 
 		// seed thread with list of fetch partitions (otherwise it would shut down immediately again
 		SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
@@ -346,7 +360,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 				clonedDeserializer, invalidOffsetBehavior);
 
 		brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
-				taskName, leader.id(), leader.host(), leader.port()));
+				runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port()));
 		brokerThread.setDaemon(true);
 		brokerThread.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 36e1151..467ccc5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -156,8 +156,13 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout=60000)
-	public void testMetricsAndEndOfStream() throws Exception {
-		runMetricsAndEndOfStreamTest();
+	public void testEndOfStream() throws Exception {
+		runEndOfStreamTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMetrics() throws Throwable {
+		runMetricsTest();
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 75ca9ed..864773a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -97,7 +97,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	@Override
 	public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
-		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<T>(topic, serSchema, props, partitioner);
+		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return prod;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index bc2904c..8c3eaf8 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -73,8 +73,6 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	/**  Configuration key to change the polling timeout **/
 	public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
 
-	/** Boolean configuration key to disable metrics tracking **/
-	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
 
 	/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
 	 * available. If 0, returns immediately with any records that are available now. */

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 6bad180..9c861c9 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
@@ -27,7 +28,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
@@ -73,9 +74,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	/** The maximum number of milliseconds to wait for a fetch batch */
 	private final long pollTimeout;
 
-	/** Flag whether to register Kafka metrics as Flink accumulators */
-	private final boolean forwardKafkaMetrics;
-
 	/** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */
 	private final Object consumerLock = new Object();
 
@@ -99,15 +97,14 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
-			boolean forwardKafkaMetrics) throws Exception
+			boolean useMetrics) throws Exception
 	{
-		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
+		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, useMetrics);
 
 		this.deserializer = deserializer;
 		this.runtimeContext = runtimeContext;
 		this.kafkaProperties = kafkaProperties;
 		this.pollTimeout = pollTimeout;
-		this.forwardKafkaMetrics = forwardKafkaMetrics;
 
 		// if checkpointing is enabled, we are not automatically committing to Kafka.
 		kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
@@ -178,23 +175,18 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		try {
 			consumer.assign(convertKafkaPartitions(subscribedPartitions()));
 
-			// register Kafka metrics to Flink accumulators
-			if (forwardKafkaMetrics) {
+			if (useMetrics) {
+				final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
+				addOffsetStateGauge(kafkaMetricGroup);
+				// register Kafka metrics to Flink
 				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
 				if (metrics == null) {
 					// MapR's Kafka implementation returns null here.
 					LOG.info("Consumer implementation does not support metrics");
 				} else {
-					// we have metrics, register them where possible
-					for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
-						String name = "KafkaConsumer-" + metric.getKey().name();
-						DefaultKafkaMetricAccumulator kafkaAccumulator =
-								DefaultKafkaMetricAccumulator.createFor(metric.getValue());
-
-						// best effort: we only add the accumulator if available.
-						if (kafkaAccumulator != null) {
-							runtimeContext.addAccumulator(name, kafkaAccumulator);
-						}
+					// we have Kafka metrics, register them
+					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
 					}
 				}
 			}
@@ -286,7 +278,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
 			Long offset = offsets.get(partition.getKafkaTopicPartition());
 			if (offset != null) {
-				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset, ""));
+				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset));
+				partition.setCommittedOffset(offset);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index ef64171..957833d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -118,8 +118,13 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testMetricsAndEndOfStream() throws Exception {
-		runMetricsAndEndOfStreamTest();
+	public void testEndOfStream() throws Exception {
+		runEndOfStreamTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMetrics() throws Throwable {
+		runMetricsTest();
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 96a1e0d..2b2c527 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -65,6 +65,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
 	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
 
+	/** Boolean configuration key to disable metrics tracking **/
+	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
 	// ------------------------------------------------------------------------
 	//  configuration state, set on the client relevant for all subtasks
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 0e05f91..e63f033 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -21,10 +21,11 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -47,7 +48,6 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 
 import static java.util.Objects.requireNonNull;
 
@@ -100,11 +100,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	protected final KafkaPartitioner<IN> partitioner;
 
 	/**
-	 * Unique ID identifying the producer
-	 */
-	private final String producerId;
-
-	/**
 	 * Flag indicating whether to accept failures (and log them), or to fail on failures
 	 */
 	protected boolean logFailuresOnly;
@@ -152,7 +147,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		this.producerConfig = producerConfig;
 
 		// set the producer configuration properties.
-
 		if (!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
 			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
 		} else {
@@ -179,7 +173,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		}
 
 		this.partitioner = customPartitioner;
-		this.producerId = UUID.randomUUID().toString();
 	}
 
 	// ---------------------------------- Properties --------------------------
@@ -239,13 +232,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 				// MapR's Kafka implementation returns null here.
 				LOG.info("Producer implementation does not support metrics");
 			} else {
+				final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
 				for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
-					String name = producerId + "-producer-" + metric.getKey().name();
-					DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
-					// best effort: we only add the accumulator if available.
-					if (kafkaAccumulator != null) {
-						getRuntimeContext().addAccumulator(name, kafkaAccumulator);
-					}
+					kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index f9d2e64..8ec26cc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
@@ -64,7 +66,10 @@ public abstract class AbstractFetcher<T, KPH> {
 
 	/** The mode describing whether the fetcher also generates timestamps and watermarks */
 	private final int timestampWatermarkMode;
-	
+
+	/** Flag whether to register metrics for the fetcher */
+	protected final boolean useMetrics;
+
 	/** Only relevant for punctuated watermarks: The current cross partition watermark */
 	private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
 
@@ -75,10 +80,11 @@ public abstract class AbstractFetcher<T, KPH> {
 			List<KafkaTopicPartition> assignedPartitions,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext) throws Exception
+			StreamingRuntimeContext runtimeContext, boolean useMetrics) throws Exception
 	{
 		this.sourceContext = checkNotNull(sourceContext);
 		this.checkpointLock = sourceContext.getCheckpointLock();
+		this.useMetrics = useMetrics;
 		
 		// figure out what we watermark mode we will be using
 		
@@ -389,8 +395,58 @@ public abstract class AbstractFetcher<T, KPH> {
 				throw new RuntimeException();
 		}
 	}
-	
-	// ------------------------------------------------------------------------
+
+	// ------------------------- Metrics ----------------------------------
+
+	/**
+	 * Add current and committed offsets to metric group
+	 *
+	 * @param metricGroup The metric group to use
+	 */
+	protected void addOffsetStateGauge(MetricGroup metricGroup) {
+		// add current offsets to gage
+		MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
+		MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
+		for(KafkaTopicPartitionState ktp: subscribedPartitions()){
+			currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
+			committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
+		}
+	}
+
+	/**
+	 * Gauge types
+	 */
+	private enum OffsetGaugeType {
+		CURRENT_OFFSET,
+		COMMITTED_OFFSET
+	}
+
+	/**
+	 * Gauge for getting the offset of a KafkaTopicPartitionState.
+	 */
+	private static class OffsetGauge implements Gauge<Long> {
+
+		private final KafkaTopicPartitionState ktp;
+		private final OffsetGaugeType gaugeType;
+
+		public OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) {
+			this.ktp = ktp;
+			this.gaugeType = gaugeType;
+		}
+
+		@Override
+		public Long getValue() {
+			switch(gaugeType) {
+				case COMMITTED_OFFSET:
+					return ktp.getCommittedOffset();
+				case CURRENT_OFFSET:
+					return ktp.getOffset();
+				default:
+					throw new RuntimeException("Unknown gauge type: " + gaugeType);
+			}
+		}
+	}
+ 	// ------------------------------------------------------------------------
 	
 	/**
 	 * The periodic watermark emitter. In its given interval, it checks all partitions for

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
index 36612a4..7cb5f46 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -44,12 +44,16 @@ public class KafkaTopicPartitionState<KPH> {
 	/** The offset within the Kafka partition that we already processed */
 	private volatile long offset;
 
+	/** The offset of the Kafka partition that has been committed */
+	private volatile long committedOffset;
+
 	// ------------------------------------------------------------------------
 	
 	public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
 		this.partition = partition;
 		this.kafkaPartitionHandle = kafkaPartitionHandle;
 		this.offset = OFFSET_NOT_SET;
+		this.committedOffset = OFFSET_NOT_SET;
 	}
 
 	// ------------------------------------------------------------------------
@@ -90,10 +94,19 @@ public class KafkaTopicPartitionState<KPH> {
 	public final void setOffset(long offset) {
 		this.offset = offset;
 	}
-	
+
 	public final boolean isOffsetDefined() {
 		return offset != OFFSET_NOT_SET;
 	}
+
+	public final void setCommittedOffset(long offset) {
+		this.committedOffset = offset;
+	}
+
+	public final long getCommittedOffset() {
+		return committedOffset;
+	}
+
 	
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
deleted file mode 100644
index a038711..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals.metrics;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.kafka.common.metrics.KafkaMetric;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.SampledStat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.List;
-
-public class AvgKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
-	private static final Logger LOG = LoggerFactory.getLogger(AvgKafkaMetricAccumulator.class);
-
-	/** The last sum/count before the serialization  **/
-	private AvgSumCount lastSumCount;
-
-	public AvgKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
-		super(kafkaMetric);
-	}
-
-	@Override
-	public void merge(Accumulator<Void, Double> other) {
-		if(!(other instanceof AvgKafkaMetricAccumulator)) {
-			throw new RuntimeException("Trying to merge incompatible accumulators: "+this+" with "+other);
-		}
-		AvgKafkaMetricAccumulator otherMetric = (AvgKafkaMetricAccumulator) other;
-
-		AvgSumCount thisAvg;
-		if(this.lastSumCount == null) {
-			Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
-			if (!(thisMeasurable instanceof Avg)) {
-				throw new RuntimeException("Must be of type Avg");
-			}
-			thisAvg = getAvgSumCount((Avg) thisMeasurable);
-		} else {
-			thisAvg = this.lastSumCount;
-		}
-
-		AvgSumCount otherAvg;
-		if(otherMetric.lastSumCount == null) {
-			Measurable otherMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(otherMetric.kafkaMetric);
-			if(!(otherMeasurable instanceof Avg) ) {
-				throw new RuntimeException("Must be of type Avg");
-			}
-			otherAvg = getAvgSumCount((Avg) otherMeasurable);
-		} else {
-			otherAvg = otherMetric.lastSumCount;
-		}
-
-		thisAvg.count += otherAvg.count;
-		thisAvg.sum += otherAvg.sum;
-		this.mergedValue = thisAvg.sum / thisAvg.count;
-	}
-
-	@Override
-	public Accumulator<Void, Double> clone() {
-		AvgKafkaMetricAccumulator clone = new AvgKafkaMetricAccumulator(kafkaMetric);
-		clone.lastSumCount = this.lastSumCount;
-		clone.isMerged = this.isMerged;
-		clone.mergedValue = this.mergedValue;
-		return clone;
-	}
-
-	// ------------ Utilities
-
-	private static class AvgSumCount implements Serializable {
-		double sum;
-		long count;
-
-		@Override
-		public String toString() {
-			return "AvgSumCount{" +
-					"sum=" + sum +
-					", count=" + count +
-					", avg="+(sum/count)+"}";
-		}
-	}
-
-	/**
-	 * Extracts sum and count from Avg using reflection
-	 *
-	 * @param avg Avg SampledStat from Kafka
-	 * @return A KV pair with the average's sum and count
-	 */
-	private static AvgSumCount getAvgSumCount(Avg avg) {
-		try {
-			Field samplesField = SampledStat.class.getDeclaredField("samples");
-			Field sampleValue = Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("value");
-			Field sampleEventCount = Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("eventCount");
-			samplesField.setAccessible(true);
-			sampleValue.setAccessible(true);
-			sampleEventCount.setAccessible(true);
-			List samples = (List) samplesField.get(avg);
-			AvgSumCount res = new AvgSumCount();
-			for(int i = 0; i < samples.size(); i++) {
-				res.sum += (double)sampleValue.get(samples.get(i));
-				res.count += (long)sampleEventCount.get(samples.get(i));
-			}
-			return res;
-		} catch(Throwable t) {
-			throw new RuntimeException("Unable to extract sum and count from Avg using reflection. " +
-					"You can turn off the metrics from Flink's Kafka connector if this issue persists.", t);
-		}
-	}
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
-		if(!(thisMeasurable instanceof Avg) ) {
-			throw new RuntimeException("Must be of type Avg");
-		}
-		this.lastSumCount = getAvgSumCount((Avg) thisMeasurable);
-		out.defaultWriteObject();
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
deleted file mode 100644
index 06b7930..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals.metrics;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.metrics.KafkaMetric;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Min;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-
-public class DefaultKafkaMetricAccumulator implements Accumulator<Void, Double>, Serializable {
-
-	private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaMetricAccumulator.class);
-
-	protected boolean isMerged = false;
-	protected double mergedValue;
-	protected transient KafkaMetric kafkaMetric;
-
-
-	public static DefaultKafkaMetricAccumulator createFor(Metric metric) {
-		if(!(metric instanceof KafkaMetric)) {
-			return null;
-		}
-		KafkaMetric kafkaMetric = (KafkaMetric) metric;
-		Measurable measurable = getMeasurableFromKafkaMetric(kafkaMetric);
-		if(measurable == null) {
-			return null;
-		}
-		if (measurable instanceof Max) {
-			return new MaxKafkaMetricAccumulator(kafkaMetric);
-		} else if (measurable instanceof Min) {
-			return new MinKafkaMetricAccumulator(kafkaMetric);
-		} else if (measurable instanceof Avg) {
-			return new AvgKafkaMetricAccumulator(kafkaMetric);
-		} else {
-			// fallback accumulator. works for Rate, Total, Count.
-			return new DefaultKafkaMetricAccumulator(kafkaMetric);
-		}
-	}
-
-	/**
-	 * This utility method is using reflection to get the Measurable from the KafkaMetric.
-	 * Since Kafka 0.9, Kafka is exposing the Measurable properly, but Kafka 0.8.2 does not yet expose it.
-	 *
-	 * @param kafkaMetric the metric to extract the field form
-	 * @return Measurable type (or null in case of an error)
-	 */
-	protected static Measurable getMeasurableFromKafkaMetric(KafkaMetric kafkaMetric) {
-		try {
-			Field measurableField = kafkaMetric.getClass().getDeclaredField("measurable");
-			measurableField.setAccessible(true);
-			return (Measurable) measurableField.get(kafkaMetric);
-		} catch (Throwable e) {
-			LOG.warn("Unable to initialize Kafka metric: " + kafkaMetric, e);
-			return null;
-		}
-	}
-
-
-	DefaultKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
-		this.kafkaMetric = kafkaMetric;
-	}
-
-	@Override
-	public void add(Void value) {
-		// noop
-	}
-
-	@Override
-	public Double getLocalValue() {
-		if(isMerged && kafkaMetric == null) {
-			return mergedValue;
-		}
-		return kafkaMetric.value();
-	}
-
-	@Override
-	public void resetLocal() {
-		// noop
-	}
-
-	@Override
-	public void merge(Accumulator<Void, Double> other) {
-		if(!(other instanceof DefaultKafkaMetricAccumulator)) {
-			throw new RuntimeException("Trying to merge incompatible accumulators");
-		}
-		DefaultKafkaMetricAccumulator otherMetric = (DefaultKafkaMetricAccumulator) other;
-		if(this.isMerged) {
-			if(otherMetric.isMerged) {
-				this.mergedValue += otherMetric.mergedValue;
-			} else {
-				this.mergedValue += otherMetric.getLocalValue();
-			}
-		} else {
-			this.isMerged = true;
-			if(otherMetric.isMerged) {
-				this.mergedValue = this.getLocalValue() + otherMetric.mergedValue;
-			} else {
-				this.mergedValue = this.getLocalValue() + otherMetric.getLocalValue();
-			}
-
-		}
-	}
-
-	@Override
-	public Accumulator<Void, Double> clone() {
-		DefaultKafkaMetricAccumulator clone = new DefaultKafkaMetricAccumulator(this.kafkaMetric);
-		clone.isMerged = this.isMerged;
-		clone.mergedValue = this.mergedValue;
-		return clone;
-	}
-
-	@Override
-	public String toString() {
-		if(isMerged) {
-			return Double.toString(mergedValue);
-		}
-		if(kafkaMetric == null) {
-			return "null";
-		}
-		return Double.toString(kafkaMetric.value());
-	}
-
-	// -------- custom serialization methods
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		this.isMerged = true;
-		this.mergedValue = kafkaMetric.value();
-		out.defaultWriteObject();
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
new file mode 100644
index 0000000..cedb696
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * Gauge for getting the current value of a Kafka metric.
+ */
+public class KafkaMetricWrapper implements Gauge<Double> {
+	private final org.apache.kafka.common.Metric kafkaMetric;
+
+	public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) {
+		this.kafkaMetric = metric;
+	}
+
+	@Override
+	public Double getValue() {
+		return kafkaMetric.value();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
deleted file mode 100644
index c1770ff..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals.metrics;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.kafka.common.metrics.KafkaMetric;
-
-
-public class MaxKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
-	public MaxKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
-		super(kafkaMetric);
-	}
-
-	@Override
-	public void merge(Accumulator<Void, Double> other) {
-		if(!(other instanceof MaxKafkaMetricAccumulator)) {
-			throw new RuntimeException("Trying to merge incompatible accumulators");
-		}
-		MaxKafkaMetricAccumulator otherMetric = (MaxKafkaMetricAccumulator) other;
-		if(this.isMerged) {
-			if(otherMetric.isMerged) {
-				this.mergedValue = Math.max(this.mergedValue, otherMetric.mergedValue);
-			} else {
-				this.mergedValue = Math.max(this.mergedValue, otherMetric.getLocalValue());
-			}
-		} else {
-			this.isMerged = true;
-			if(otherMetric.isMerged) {
-				this.mergedValue = Math.max(this.getLocalValue(), otherMetric.mergedValue);
-			} else {
-				this.mergedValue = Math.max(this.getLocalValue(), otherMetric.getLocalValue());
-			}
-		}
-	}
-
-	@Override
-	public Accumulator<Void, Double> clone() {
-		MaxKafkaMetricAccumulator clone = new MaxKafkaMetricAccumulator(this.kafkaMetric);
-		clone.isMerged = this.isMerged;
-		clone.mergedValue = this.mergedValue;
-		return clone;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
deleted file mode 100644
index 4794893..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals.metrics;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.kafka.common.metrics.KafkaMetric;
-
-public class MinKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
-
-	public MinKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
-		super(kafkaMetric);
-	}
-
-	@Override
-	public void merge(Accumulator<Void, Double> other) {
-		if(!(other instanceof MinKafkaMetricAccumulator)) {
-			throw new RuntimeException("Trying to merge incompatible accumulators");
-		}
-		MinKafkaMetricAccumulator otherMetric = (MinKafkaMetricAccumulator) other;
-		if(this.isMerged) {
-			if(otherMetric.isMerged) {
-				this.mergedValue = Math.min(this.mergedValue, otherMetric.mergedValue);
-			} else {
-				this.mergedValue = Math.min(this.mergedValue, otherMetric.getLocalValue());
-			}
-		} else {
-			this.isMerged = true;
-			if(otherMetric.isMerged) {
-				this.mergedValue = Math.min(this.getLocalValue(), otherMetric.mergedValue);
-			} else {
-				this.mergedValue = Math.min(this.getLocalValue(), otherMetric.getLocalValue());
-			}
-		}
-	}
-
-	@Override
-	public Accumulator<Void, Double> clone() {
-		MinKafkaMetricAccumulator clone = new MinKafkaMetricAccumulator(this.kafkaMetric);
-		clone.isMerged = this.isMerged;
-		clone.mergedValue = this.mergedValue;
-		return clone;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index d49b48b..920f15b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -40,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.table.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -51,6 +52,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -66,6 +68,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -96,8 +99,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -108,6 +114,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1173,12 +1180,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		producerProperties.setProperty("retries", "3");
 		kvStream.addSink(kafkaServer.getProducer(topic, schema, producerProperties, null));
 
-		JobExecutionResult result = env.execute("Write deletes to Kafka");
-
-		Map<String, Object> accuResults = result.getAllAccumulatorResults();
-		// there are 37 accumulator results in Kafka 0.9
-		// and 34 in Kafka 0.8
-		Assert.assertTrue("Not enough accumulators from Kafka Producer: " + accuResults.size(), accuResults.size() > 33);
+		env.execute("Write deletes to Kafka");
 
 		// ----------- Read the data again -------------------
 
@@ -1209,12 +1211,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated
-	 * and that the metrics for the consumer are properly reported.
+	 * Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated.
 	 *
 	 * @throws Exception
 	 */
-	public void runMetricsAndEndOfStreamTest() throws Exception {
+	public void runEndOfStreamTest() throws Exception {
 
 		final int ELEMENT_COUNT = 300;
 		final String topic = writeSequence("testEndOfStream", ELEMENT_COUNT, 1, 1);
@@ -1235,15 +1236,127 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka");
 
-		Map<String, Object> accuResults = result.getAllAccumulatorResults();
-		// kafka 0.9 consumer: 39 results
-		if (kafkaServer.getVersion().equals("0.9")) {
-			assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38);
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * Test metrics reporting for consumer
+	 *
+	 * @throws Exception
+	 */
+	public void runMetricsTest() throws Throwable {
+
+		// create a stream with 5 topics
+		final String topic = "metricsStream";
+		createTestTopic(topic, 5, 1);
+
+		final Tuple1<Throwable> error = new Tuple1<>(null);
+		Runnable job = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					// start job writing & reading data.
+					final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+					env1.setParallelism(1);
+					env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+					env1.getConfig().disableSysoutLogging();
+					env1.disableOperatorChaining(); // let the source read everything into the network buffers
+
+					TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+					fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
+						@Override
+						public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
+						}
+					});
+
+					DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
+						boolean running = true;
+
+						@Override
+						public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+							int i = 0;
+							while (running) {
+								ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+								Thread.sleep(1);
+							}
+						}
+
+						@Override
+						public void cancel() {
+							running = false;
+						}
+					});
+
+					fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+					env1.execute("Metrics test job");
+				} catch(Throwable t) {
+					LOG.warn("Got exception during execution", t);
+					if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job
+						error.f0 = t;
+					}
+				}
+			}
+		};
+		Thread jobThread = new Thread(job);
+		jobThread.start();
+
+		try {
+			// connect to JMX
+			MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+			// wait until we've found all 5 offset metrics
+			Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*:key7=current-offsets,*"), null);
+			while (offsetMetrics.size() < 5) { // test will time out if metrics are not properly working
+				if (error.f0 != null) {
+					// fail test early
+					throw error.f0;
+				}
+				offsetMetrics = mBeanServer.queryNames(new ObjectName("*:key7=current-offsets,*"), null);
+				Thread.sleep(50);
+			}
+			Assert.assertEquals(5, offsetMetrics.size());
+			// we can't rely on the consumer to have touched all the partitions already
+			// that's why we'll wait until all five partitions have a positive offset.
+			// The test will fail if we never meet the condition
+			while (true) {
+				int numPosOffsets = 0;
+				// check that offsets are correctly reported
+				for (ObjectName object : offsetMetrics) {
+					Object offset = mBeanServer.getAttribute(object, "Value");
+					if((long) offset >= 0) {
+						numPosOffsets++;
+					}
+				}
+				if (numPosOffsets == 5) {
+					break;
+				}
+				// wait for the consumer to consume on all partitions
+				Thread.sleep(50);
+			}
+
+			// check if producer metrics are also available.
+			Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*:key6=KafkaProducer,*"), null);
+			Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30);
+
+
+			LOG.info("Found all JMX metrics. Cancelling job.");
+		} finally {
+			// cancel
+			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		}
+
+		while (jobThread.isAlive()) {
+			Thread.sleep(50);
+		}
+		if (error.f0 != null) {
+			throw error.f0;
 		}
 
 		deleteTestTopic(topic);
 	}
 
+
 	public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
 		
 		final int finalCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index c073a04..8c68fbe 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -190,7 +190,7 @@ public class AbstractFetcherTimestampsTest {
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 				StreamingRuntimeContext runtimeContext) throws Exception
 		{
-			super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
+			super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, false);
 		}
 
 		@Override
@@ -231,7 +231,7 @@ public class AbstractFetcherTimestampsTest {
 
 		@Override
 		public void collectWithTimestamp(T element, long timestamp) {
-			this.latestElement = new StreamRecord<T>(element, timestamp);
+			this.latestElement = new StreamRecord<>(element, timestamp);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/41f58182/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index f1bb157..2d5e2d8 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -33,8 +33,10 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
@@ -161,6 +163,11 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	}
 
 	@Override
+	public MetricGroup getMetricGroup() {
+		return new UnregisteredTaskMetricsGroup.DummyIOMetricGroup();
+	}
+
+	@Override
 	public <RT> List<RT> getBroadcastVariable(String name) {
 		throw new UnsupportedOperationException();
 	}
@@ -200,6 +207,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 		throw new UnsupportedOperationException();
 	}
 
+	@Override
 	public long getCurrentProcessingTime() {
 		Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
 		return timerService.getCurrentProcessingTime();


Mime
View raw message