flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-4875] [metrics] Use correct operator name
Date Tue, 25 Oct 2016 08:47:24 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 05a5f460b -> dc768d3b7


[FLINK-4875] [metrics] Use correct operator name

This closes #2676.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc768d3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc768d3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc768d3b

Branch: refs/heads/release-1.1
Commit: dc768d3b781eb0cd0baca13746da55a0bebef115
Parents: 05a5f46
Author: zentol <chesnay@apache.org>
Authored: Thu Oct 20 15:53:03 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Tue Oct 25 10:47:00 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/graph/StreamConfig.java  | 9 +++++++++
 .../streaming/api/graph/StreamingJobGraphGenerator.java     | 2 ++
 .../streaming/api/operators/AbstractStreamOperator.java     | 3 +--
 3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 783b3e2..eb31fda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -68,6 +68,7 @@ public class StreamConfig implements Serializable {
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
 	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
+	private static final String OPERATOR_NAME = "operatorName";
 
 	private static final String CHECKPOINTING_ENABLED = "checkpointing";
 	private static final String CHECKPOINT_MODE = "checkpointMode";
@@ -388,6 +389,14 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate configuration.", e);
 		}
 	}
+	
+	public void setOperatorName(String name) {
+		this.config.setString(OPERATOR_NAME,name);
+	}
+	
+	public String getOperatorName() {
+		return this.config.getString(OPERATOR_NAME, null);
+	}
 
 	public void setChainIndex(int index) {
 		this.config.setInteger(CHAIN_INDEX, index);

http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 71cc7f2..d6819e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -213,6 +213,7 @@ public class StreamingJobGraphGenerator {
 
 				config.setChainStart();
 				config.setChainIndex(0);
+				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 				config.setOutEdgesInOrder(transitiveOutEdges);
 				config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
 
@@ -230,6 +231,7 @@ public class StreamingJobGraphGenerator {
 					chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
 				}
 				config.setChainIndex(chainIndex);
+				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 				chainedConfigs.get(startNodeId).put(currentNodeId, config);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc768d3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 0269a34..d51c320 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -102,9 +102,8 @@ public abstract class AbstractStreamOperator<OUT>
 	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>>
output) {
 		this.container = containingTask;
 		this.config = config;
-		String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
 		
-		this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName);
+		this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
 		this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
 		this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
 


Mime
View raw message