flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/6] flink git commit: [hotfix] [streaming api] Make it clear that StreamingJobGraphGenerator is single use.
Date Thu, 20 Apr 2017 12:17:14 GMT
[hotfix] [streaming api] Make it clear that StreamingJobGraphGenerator is single use.

This reduces the public API of the StreamingJobGraphGenerator to a static createJobGraph()
method,
which internally instantiates a generator, creates the job, and drops the generator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cf78d68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cf78d68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cf78d68

Branch: refs/heads/master
Commit: 4cf78d684700920ab0b0b58c5e2c3b6102da5008
Parents: 5f0d676
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Mar 27 19:03:35 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Apr 20 11:35:01 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamGraph.java  |  4 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 47 ++++++++++++--------
 .../graph/StreamingJobGraphGeneratorTest.java   | 13 ++----
 3 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cf78d68/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cc3ca9e..1acfc5b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -655,9 +655,7 @@ public class StreamGraph extends StreamingPlan {
 							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling:
env.enableCheckpointing(interval,true)");
 		}
 
-		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
-
-		return jobgraphGenerator.createJobGraph();
+		return StreamingJobGraphGenerator.createJobGraph(this);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4cf78d68/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7d62273..9eca9a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.commons.lang3.StringUtils;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -49,6 +50,7 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +64,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+/**
+ * The StreamingJobGraphGenerator converts a {@link StreamGraph} into a {@link JobGraph}.
+ */
 @Internal
 public class StreamingJobGraphGenerator {
 
@@ -73,32 +78,38 @@ public class StreamingJobGraphGenerator {
 	 */
 	private static final long DEFAULT_RESTART_DELAY = 10000L;
 
-	private StreamGraph streamGraph;
+	// ------------------------------------------------------------------------
+
+	public static JobGraph createJobGraph(StreamGraph streamGraph) {
+		return new StreamingJobGraphGenerator(streamGraph).createJobGraph();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private final StreamGraph streamGraph;
 
-	private Map<Integer, JobVertex> jobVertices;
-	private JobGraph jobGraph;
-	private Collection<Integer> builtVertices;
+	private final Map<Integer, JobVertex> jobVertices;
+	private final JobGraph jobGraph;
+	private final Collection<Integer> builtVertices;
 
-	private List<StreamEdge> physicalEdgesInOrder;
+	private final List<StreamEdge> physicalEdgesInOrder;
 
-	private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
+	private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
 
-	private Map<Integer, StreamConfig> vertexConfigs;
-	private Map<Integer, String> chainedNames;
+	private final Map<Integer, StreamConfig> vertexConfigs;
+	private final Map<Integer, String> chainedNames;
 
-	private Map<Integer, ResourceSpec> chainedMinResources;
-	private Map<Integer, ResourceSpec> chainedPreferredResources;
+	private final Map<Integer, ResourceSpec> chainedMinResources;
+	private final Map<Integer, ResourceSpec> chainedPreferredResources;
 
 	private final StreamGraphHasher defaultStreamGraphHasher;
 	private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
+	private StreamingJobGraphGenerator(StreamGraph streamGraph) {
 		this.streamGraph = streamGraph;
 		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
 		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
-	}
 
-	private void init() {
 		this.jobVertices = new HashMap<>();
 		this.builtVertices = new HashSet<>();
 		this.chainedConfigs = new HashMap<>();
@@ -107,17 +118,15 @@ public class StreamingJobGraphGenerator {
 		this.chainedMinResources = new HashMap<>();
 		this.chainedPreferredResources = new HashMap<>();
 		this.physicalEdgesInOrder = new ArrayList<>();
-	}
-
-	public JobGraph createJobGraph() {
 
 		jobGraph = new JobGraph(streamGraph.getJobName());
+	}
+
+	private JobGraph createJobGraph() {
 
 		// make sure that all vertices start immediately
 		jobGraph.setScheduleMode(ScheduleMode.EAGER);
 
-		init();
-
 		// Generate deterministic hashes for the nodes in order to identify them across
 		// submission iff they didn't change.
 		Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
@@ -442,7 +451,7 @@ public class StreamingJobGraphGenerator {
 		downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
 
 		StreamPartitioner<?> partitioner = edge.getPartitioner();
-		JobEdge jobEdge = null;
+		JobEdge jobEdge;
 		if (partitioner instanceof ForwardPartitioner) {
 			jobEdge = downStreamVertex.connectNewDataSetAsInput(
 				headVertex,

http://git-wip-us.apache.org/repos/asf/flink/blob/4cf78d68/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 2c71a07..8b5aef5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -61,7 +61,6 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		DataStream<Tuple2<String, String>> input = env
 				.fromElements("a", "b", "c", "d", "e", "f")
 				.map(new MapFunction<String, Tuple2<String, String>>() {
-					private static final long serialVersionUID = 471891682418382583L;
 
 					@Override
 					public Tuple2<String, String> map(String value) {
@@ -73,8 +72,6 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 				.keyBy(0)
 				.map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>()
{
 
-					private static final long serialVersionUID = 3583760206245136188L;
-
 					@Override
 					public Tuple2<String, String> map(Tuple2<String, String> value) {
 						return value;
@@ -82,7 +79,6 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 				});
 
 		result.addSink(new SinkFunction<Tuple2<String, String>>() {
-			private static final long serialVersionUID = -5614849094269539342L;
 
 			@Override
 			public void invoke(Tuple2<String, String> value) {}
@@ -115,8 +111,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		StreamGraph streamGraph = new StreamGraph(env);
 		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph);
-		JobGraph jobGraph = jobGraphGenerator.createJobGraph();
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
 		JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
 		assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval());
@@ -137,7 +132,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 				}
 			})
 			.print();
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
 		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 		JobVertex sourceVertex = verticesSorted.get(0);
@@ -224,7 +219,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		});
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
 		JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 		JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
@@ -291,7 +286,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		}).disableChaining().name("test_sink");
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
 		for (JobVertex jobVertex : jobGraph.getVertices()) {
 			if (jobVertex.getName().contains("test_source")) {


Mime
View raw message