Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AA0A8200C81 for ; Thu, 20 Apr 2017 14:17:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A8BCB160BB0; Thu, 20 Apr 2017 12:17:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CE969160BB4 for ; Thu, 20 Apr 2017 14:17:12 +0200 (CEST) Received: (qmail 60625 invoked by uid 500); 20 Apr 2017 12:17:12 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 60454 invoked by uid 99); 20 Apr 2017 12:17:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 12:17:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2BEB3DF999; Thu, 20 Apr 2017 12:17:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 20 Apr 2017 12:17:14 -0000 Message-Id: <13c6c8d3ee6d4376a32cf4f65fda56e8@git.apache.org> In-Reply-To: <98bdf3bc53454d2f807fc8ce41fe5ba0@git.apache.org> References: <98bdf3bc53454d2f807fc8ce41fe5ba0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/6] flink git commit: [hotfix] [streaming api] Make it clear that StreamingJobGraphGenerator is single use. archived-at: Thu, 20 Apr 2017 12:17:13 -0000 [hotfix] [streaming api] Make it clear that StreamingJobGraphGenerator is single use. This reduces the public API of the StreamingJobGraphGenerator to a static createJobGraph() method, which internally instantiates a generator, creates the job, and drops the generator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cf78d68 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cf78d68 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cf78d68 Branch: refs/heads/master Commit: 4cf78d684700920ab0b0b58c5e2c3b6102da5008 Parents: 5f0d676 Author: Stephan Ewen Authored: Mon Mar 27 19:03:35 2017 +0200 Committer: Stephan Ewen Committed: Thu Apr 20 11:35:01 2017 +0200 ---------------------------------------------------------------------- .../flink/streaming/api/graph/StreamGraph.java | 4 +- .../api/graph/StreamingJobGraphGenerator.java | 47 ++++++++++++-------- .../graph/StreamingJobGraphGeneratorTest.java | 13 ++---- 3 files changed, 33 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4cf78d68/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index cc3ca9e..1acfc5b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -655,9 +655,7 @@ public class StreamGraph extends StreamingPlan { + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } - StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this); - - return jobgraphGenerator.createJobGraph(); + return StreamingJobGraphGenerator.createJobGraph(this); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/4cf78d68/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 7d62273..9eca9a8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.commons.lang3.StringUtils; + import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; @@ -49,6 +50,7 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +64,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +/** + * The StreamingJobGraphGenerator converts a {@link StreamGraph} into a {@link JobGraph}. + */ @Internal public class StreamingJobGraphGenerator { @@ -73,32 +78,38 @@ public class StreamingJobGraphGenerator { */ private static final long DEFAULT_RESTART_DELAY = 10000L; - private StreamGraph streamGraph; + // ------------------------------------------------------------------------ + + public static JobGraph createJobGraph(StreamGraph streamGraph) { + return new StreamingJobGraphGenerator(streamGraph).createJobGraph(); + } + + // ------------------------------------------------------------------------ + + private final StreamGraph streamGraph; - private Map jobVertices; - private JobGraph jobGraph; - private Collection builtVertices; + private final Map jobVertices; + private final JobGraph jobGraph; + private final Collection builtVertices; - private List physicalEdgesInOrder; + private final List physicalEdgesInOrder; - private Map> chainedConfigs; + private final Map> chainedConfigs; - private Map vertexConfigs; - private Map chainedNames; + private final Map vertexConfigs; + private final Map chainedNames; - private Map chainedMinResources; - private Map chainedPreferredResources; + private final Map chainedMinResources; + private final Map chainedPreferredResources; private final StreamGraphHasher defaultStreamGraphHasher; private final List legacyStreamGraphHashers; - public StreamingJobGraphGenerator(StreamGraph streamGraph) { + private StreamingJobGraphGenerator(StreamGraph streamGraph) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher()); - } - private void init() { this.jobVertices = new HashMap<>(); this.builtVertices = new HashSet<>(); this.chainedConfigs = new HashMap<>(); @@ -107,17 +118,15 @@ public class StreamingJobGraphGenerator { this.chainedMinResources = new HashMap<>(); this.chainedPreferredResources = new HashMap<>(); this.physicalEdgesInOrder = new ArrayList<>(); - } - - public JobGraph createJobGraph() { jobGraph = new JobGraph(streamGraph.getJobName()); + } + + private JobGraph createJobGraph() { // make sure that all vertices start immediately jobGraph.setScheduleMode(ScheduleMode.EAGER); - init(); - // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. Map hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); @@ -442,7 +451,7 @@ public class StreamingJobGraphGenerator { downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1); StreamPartitioner partitioner = edge.getPartitioner(); - JobEdge jobEdge = null; + JobEdge jobEdge; if (partitioner instanceof ForwardPartitioner) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, http://git-wip-us.apache.org/repos/asf/flink/blob/4cf78d68/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 2c71a07..8b5aef5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -61,7 +61,6 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { DataStream> input = env .fromElements("a", "b", "c", "d", "e", "f") .map(new MapFunction>() { - private static final long serialVersionUID = 471891682418382583L; @Override public Tuple2 map(String value) { @@ -73,8 +72,6 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { .keyBy(0) .map(new MapFunction, Tuple2>() { - private static final long serialVersionUID = 3583760206245136188L; - @Override public Tuple2 map(Tuple2 value) { return value; @@ -82,7 +79,6 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { }); result.addSink(new SinkFunction>() { - private static final long serialVersionUID = -5614849094269539342L; @Override public void invoke(Tuple2 value) {} @@ -115,8 +111,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { StreamGraph streamGraph = new StreamGraph(env); assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled()); - StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph); - JobGraph jobGraph = jobGraphGenerator.createJobGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings(); assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval()); @@ -137,7 +132,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { } }) .print(); - JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); JobVertex sourceVertex = verticesSorted.get(0); @@ -224,7 +219,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { }); sinkMethod.invoke(sink, resource5); - JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); @@ -291,7 +286,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { }).disableChaining().name("test_sink"); sinkMethod.invoke(sink, resource5); - JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); for (JobVertex jobVertex : jobGraph.getVertices()) { if (jobVertex.getName().contains("test_source")) {