flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/2] flink git commit: [FLINK-3513] [runtime] Fix interplay of automatic Operator UID and Changing name of WindowOperator
Date Thu, 25 Feb 2016 19:04:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master a5ecb1886 -> c9cba2771


[FLINK-3513] [runtime] Fix interplay of automatic Operator UID and Changing name of WindowOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d90672fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d90672fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d90672fd

Branch: refs/heads/master
Commit: d90672fd647081ae82c53b42ca0080977465f176
Parents: a5ecb18
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Feb 25 16:55:04 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Feb 25 20:03:50 2016 +0100

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |  2 --
 .../StreamingJobGraphGeneratorNodeHashTest.java | 24 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d90672fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index da46424..e3e1ac6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -736,8 +736,6 @@ public class StreamingJobGraphGenerator {
 
 		hasher.putInt(node.getParallelism());
 
-		hasher.putString(node.getOperatorName(), Charset.forName("UTF-8"));
-
 		if (node.getOperator() instanceof AbstractUdfStreamOperator) {
 			String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
 					.getUserFunction().getClass().getName();

http://git-wip-us.apache.org/repos/asf/flink/blob/d90672fd/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 98750d0..9e1e9b4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -31,9 +31,13 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.util.NoOpSink;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -319,6 +323,26 @@ public class StreamingJobGraphGeneratorNodeHashTest {
 		}
 	}
 
+	/**
+	 * Tests that a changed operator name does not affect the hash.
+	 */
+	@Test
+	public void testChangedOperatorName() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+		env.addSource(new NoOpSourceFunction(), "A").map(new NoOpMapFunction());
+		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+		JobVertexID expected = jobGraph.getVerticesAsArray()[0].getID();
+
+		env = StreamExecutionEnvironment.createLocalEnvironment();
+		env.addSource(new NoOpSourceFunction(), "B").map(new NoOpMapFunction());
+		jobGraph = env.getStreamGraph().getJobGraph();
+
+		JobVertexID actual = jobGraph.getVerticesAsArray()[0].getID();
+
+		assertEquals(expected, actual);
+	}
+
 	// ------------------------------------------------------------------------
 	// Manual hash assignment
 	// ------------------------------------------------------------------------


Mime
View raw message