flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/4] flink git commit: [streaming] delegate JobGraph generation to Client class
Date Sun, 01 Nov 2015 20:04:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7fdaa4e2f -> 0845529ca


[streaming] delegate JobGraph generation to Client class


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf29de98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf29de98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf29de98

Branch: refs/heads/master
Commit: bf29de981c2bcd5cb5d33c68b158c95c8820f43d
Parents: 7fdaa4e
Author: Maximilian Michels <mxm@apache.org>
Authored: Fri Oct 30 14:34:07 2015 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Sun Nov 1 19:00:04 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |  4 +--
 .../org/apache/flink/storm/api/FlinkClient.java |  6 ++++-
 .../flink/storm/api/FlinkLocalCluster.java      |  6 ++++-
 .../api/environment/LocalStreamEnvironment.java |  8 ++++--
 .../environment/RemoteStreamEnvironment.java    | 26 ++++++--------------
 .../environment/StreamContextEnvironment.java   | 26 ++++++--------------
 .../flink/streaming/api/graph/StreamGraph.java  | 21 +++++-----------
 .../api/graph/StreamingJobGraphGenerator.java   |  2 +-
 .../streaming/util/TestStreamEnvironment.java   |  5 +++-
 9 files changed, 45 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 1cc1a54..ac668d3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -341,14 +341,14 @@ public class Client {
 	}
 	
 
-	public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<URL> libraries,
List<URL> classpaths,
+	public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List<URL> libraries,
List<URL> classpaths,
 			ClassLoader classLoader) throws ProgramInvocationException
 	{
 		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
 		return runBlocking(job, classLoader);
 	}
 
-	public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<URL> libraries,
List<URL> classpaths,
+	public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries,
List<URL> classpaths,
 			ClassLoader classLoader) throws ProgramInvocationException
 	{
 		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 3607fad..c311f6c 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -185,7 +186,10 @@ public class FlinkClient {
 			topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
 		}
 
-		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
+		final StreamGraph streamGraph = topology.getStreamGraph();
+		streamGraph.setJobName(name);
+
+		final JobGraph jobGraph = streamGraph.getJobGraph();
 		jobGraph.addJar(new Path(uploadedJarUri));
 
 		final Configuration configuration = jobGraph.getJobConfiguration();

http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 868801b..944c6cd 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +74,10 @@ public class FlinkLocalCluster {
 			topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
 		}
 
-		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
+		StreamGraph streamGraph = topology.getStreamGraph();
+		streamGraph.setJobName(topologyName);
+
+		JobGraph jobGraph = streamGraph.getJobGraph();
 		this.flink.submitJobDetached(jobGraph);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 35b3e59..d6ddcff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,8 +82,11 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment
{
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		// transform the streaming program into a JobGraph
-		JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
-		
+		StreamGraph streamGraph = getStreamGraph();
+		streamGraph.setJobName(jobName);
+
+		JobGraph jobGraph = streamGraph.getJobGraph();
+
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 02c938e..9f43e62 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.environment;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,9 +34,8 @@ import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -166,36 +164,28 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
-		JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
+		StreamGraph streamGraph = getStreamGraph();
+		streamGraph.setJobName(jobName);
 		transformations.clear();
-		return executeRemotely(jobGraph);
+		return executeRemotely(streamGraph);
 	}
 
 	/**
 	 * Executes the remote job.
 	 * 
-	 * @param jobGraph
-	 *            jobGraph to execute
+	 * @param streamGraph
+	 *            Stream Graph to execute
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
-	private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException
{
+	private JobExecutionResult executeRemotely(StreamGraph streamGraph) throws ProgramInvocationException
{
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		for (URL jarFile : jarFiles) {
-			try {
-				jobGraph.addJar(new Path(jarFile.toURI()));
-			} catch (URISyntaxException e) {
-				throw new ProgramInvocationException("URL is invalid", e);
-			}
-		}
-
 		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
 			getClass().getClassLoader());
 		
 		Configuration configuration = new Configuration();
-		configuration.addAll(jobGraph.getJobConfiguration());
 		configuration.addAll(this.config);
 		
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
@@ -211,7 +201,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 		}
 
 		try {
-			return client.runBlocking(jobGraph, usercodeClassLoader);
+			return client.runBlocking(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader);
 		}
 		catch (ProgramInvocationException e) {
 			throw e;

http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 263cb89..240c9d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -20,15 +20,15 @@ package org.apache.flink.streaming.api.environment;
 import java.net.URL;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,33 +69,23 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment
{
 
 	@Override
 	public JobExecutionResult execute() throws Exception {
-		return execute(null);
+		return execute(DEFAULT_JOB_NAME);
 	}
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
+		Preconditions.checkNotNull("Streaming Job name should not be null.");
 
-		JobGraph jobGraph;
-		if (jobName == null) {
-			jobGraph = this.getStreamGraph().getJobGraph();
-		} else {
-			jobGraph = this.getStreamGraph().getJobGraph(jobName);
-		}
+		StreamGraph streamGraph = this.getStreamGraph();
+		streamGraph.setJobName(jobName);
 
 		transformations.clear();
 
-		// attach all necessary jar files to the JobGraph
-		for (URL file : jars) {
-			jobGraph.addJar(new Path(file.toURI()));
-		}
-
-		jobGraph.setClasspaths(classpaths);
-
 		// execute the programs
 		if (wait) {
-			return client.runBlocking(jobGraph, userCodeClassLoader);
+			return client.runBlocking(streamGraph, jars, classpaths, userCodeClassLoader);
 		} else {
-			JobSubmissionResult result = client.runDetached(jobGraph, userCodeClassLoader);
+			JobSubmissionResult result = client.runDetached(streamGraph, jars, classpaths, userCodeClassLoader);
 			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
 			return JobExecutionResult.fromJobSubmissionResult(result);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index be020d7..7d8f9f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -126,6 +126,10 @@ public class StreamGraph extends StreamingPlan {
 		return executionConfig;
 	}
 
+	public String getJobName() {
+		return jobName;
+	}
+
 	public void setJobName(String jobName) {
 		this.jobName = jobName;
 	}
@@ -554,20 +558,9 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	/**
-	 * Gets the assembled {@link JobGraph} and adds a default name for it.
+	 * Gets the assembled {@link JobGraph}.
 	 */
 	public JobGraph getJobGraph() {
-		return getJobGraph(jobName);
-	}
-
-	/**
-	 * Gets the assembled {@link JobGraph} and adds a user specified name for
-	 * it.
-	 * 
-	 * @param jobGraphName
-	 *            name of the jobGraph
-	 */
-	public JobGraph getJobGraph(String jobGraphName) {
 		// temporarily forbid checkpointing for iterative jobs
 		if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
 			throw new UnsupportedOperationException(
@@ -576,11 +569,9 @@ public class StreamGraph extends StreamingPlan {
 							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling:
env.enableCheckpointing(interval,true)");
 		}
 
-		setJobName(jobGraphName);
-
 		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
 
-		return jobgraphGenerator.createJobGraph(jobGraphName);
+		return jobgraphGenerator.createJobGraph(jobName);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 45cfff1..49cbd11 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -83,7 +83,7 @@ public class StreamingJobGraphGenerator {
 	}
 
 	public JobGraph createJobGraph(String jobName) {
-		jobGraph = new JobGraph(jobName);
+		jobGraph = new JobGraph(streamGraph.getJobName());
 
 		// make sure that all vertices start immediately
 		jobGraph.setScheduleMode(ScheduleMode.ALL);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf29de98/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 8cd1e4a..e44815d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 /**
@@ -41,7 +42,9 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		final JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
+		final StreamGraph streamGraph = getStreamGraph();
+		streamGraph.setJobName(jobName);
+		final JobGraph jobGraph = streamGraph.getJobGraph();
 		return executor.submitJobAndWait(jobGraph, false);
 	}
 


Mime
View raw message