flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [01/17] flink git commit: [FLINK-5380] Fix task metrics reuse for single-operator chains
Date Fri, 20 Jan 2017 05:39:22 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 92e393500 -> 080617d28


[FLINK-5380] Fix task metrics reuse for single-operator chains


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/792f7e45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/792f7e45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/792f7e45

Branch: refs/heads/release-1.2
Commit: 792f7e45216377fa1d6f29dfc767d83cf1a84f37
Parents: 28c18e2
Author: zentol <chesnay@apache.org>
Authored: Thu Jan 5 14:37:03 2017 +0100
Committer: zentol <chesnay@apache.org>
Committed: Thu Jan 19 21:41:59 2017 +0100

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |  6 ++--
 .../graph/StreamingJobGraphGeneratorTest.java   | 38 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/792f7e45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1bfaf3f..f562b98 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -234,9 +234,9 @@ public class StreamingJobGraphGenerator {
 				config.setChainIndex(chainIndex);
 				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 				chainedConfigs.get(startNodeId).put(currentNodeId, config);
-				if (chainableOutputs.isEmpty()) {
-					config.setChainEnd();
-				}
+			}
+			if (chainableOutputs.isEmpty()) {
+				config.setChainEnd();
 			}
 
 			return transitiveOutEdges;

http://git-wip-us.apache.org/repos/asf/flink/blob/792f7e45/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index b817c93..4d462d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,11 +32,13 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
@@ -170,4 +173,39 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
 		assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval());
 	}
+
+	/**
+	 * Verifies that the chain start/end is correctly set.
+	 */
+	@Test
+	public void testChainStartEndSetting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// fromElements -> CHAIN(Map -> Print)
+		env.fromElements(1, 2, 3)
+			.map(new MapFunction<Integer, Integer>() {
+				@Override
+				public Integer map(Integer value) throws Exception {
+					return value;
+				}
+			})
+			.print();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+
+		JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
+		JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+
+		StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
+		StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
+		Map<Integer, StreamConfig> chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
+		StreamConfig printConfig = chainedConfigs.get(3);
+
+		assertTrue(sourceConfig.isChainStart());
+		assertTrue(sourceConfig.isChainEnd());
+
+		assertTrue(mapConfig.isChainStart());
+		assertFalse(mapConfig.isChainEnd());
+
+		assertFalse(printConfig.isChainStart());
+		assertTrue(printConfig.isChainEnd());
+	}
 }


Mime
View raw message