flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [5/6] flink git commit: [FLINK-3274] Make accumulator names of Kafka connector unique
Date Mon, 25 Jan 2016 17:35:30 GMT
[FLINK-3274] Make accumulator names of Kafka connector unique

This closes #1541


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36d6572d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36d6572d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36d6572d

Branch: refs/heads/master
Commit: 36d6572d16c17adf3ced31f7dbf531481509bdf8
Parents: cf75f42
Author: Robert Metzger <rmetzger@apache.org>
Authored: Fri Jan 22 11:10:08 2016 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Jan 25 18:28:11 2016 +0100

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumer09.java    | 8 +++++++-
 .../streaming/connectors/kafka/FlinkKafkaProducerBase.java  | 9 ++++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36d6572d/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 9faa249..6a92e6d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -95,6 +96,9 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 	/** Ordered list of all partitions available in all subscribed partitions **/
 	private final List<KafkaTopicPartition> partitionInfos;
 
+	/** Unique ID identifying the consumer */
+	private final String consumerId;
+
 	// ------  Runtime State  -------
 
 	/** The partitions actually handled by this consumer at runtime */
@@ -223,6 +227,8 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 		if (LOG.isInfoEnabled()) {
 			logPartitionInfo(partitionInfos);
 		}
+
+		this.consumerId = UUID.randomUUID().toString();
 	}
 
 
@@ -283,7 +289,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
{
 				LOG.info("Consumer implementation does not support metrics");
 			} else {
 				for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
-					String name = "consumer-" + metric.getKey().name();
+					String name = consumerId + "-consumer-" + metric.getKey().name();
 					DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
 					// best effort: we only add the accumulator if available.
 					if (kafkaAccumulator != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/36d6572d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index ebc02c9..1eeb074 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 
 
 /**
@@ -92,6 +93,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 	protected final KafkaPartitioner<IN> partitioner;
 
 	/**
+	 * Unique ID identifying the producer
+	 */
+	private final String producerId;
+
+	/**
 	 * Flag indicating whether to accept failures (and log them), or to fail on failures
 	 */
 	protected boolean logFailuresOnly;
@@ -155,6 +161,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 		}
 
 		this.partitioner = customPartitioner;
+		this.producerId = UUID.randomUUID().toString();
 	}
 
 	// ---------------------------------- Properties --------------------------
@@ -197,7 +204,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 				LOG.info("Producer implementation does not support metrics");
 			} else {
 				for(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
-					String name = "producer-" + metric.getKey().name();
+					String name = producerId + "-producer-" + metric.getKey().name();
 					DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
 					// best effort: we only add the accumulator if available.
 					if(kafkaAccumulator != null) {


Mime
View raw message