flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject flink git commit: [FLINK-1767] [streaming] Make StreamExecutionEnvironment return JobExecutionResult instead of void.
Date Mon, 23 Mar 2015 16:53:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master 00d22c396 -> e83d1ec10


[FLINK-1767] [streaming] Make StreamExecutionEnvironment return JobExecutionResult instead
of void.

Conflicts:
	flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
	flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java

This closes #516


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e83d1ec1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e83d1ec1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e83d1ec1

Branch: refs/heads/master
Commit: e83d1ec102808cb9b3ef4208335f74e6f17bbe7a
Parents: 00d22c3
Author: Gabor Gevay <ggab90@gmail.com>
Authored: Sat Mar 21 10:28:23 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Mar 23 12:45:03 2015 +0100

----------------------------------------------------------------------
 .../api/environment/LocalStreamEnvironment.java       | 13 ++++++++-----
 .../api/environment/RemoteStreamEnvironment.java      | 14 ++++++++------
 .../api/environment/StreamContextEnvironment.java     |  9 +++++----
 .../api/environment/StreamExecutionEnvironment.java   | 10 ++++++----
 .../api/environment/StreamPlanEnvironment.java        |  7 ++++---
 .../org/apache/flink/streaming/util/ClusterUtil.java  | 10 ++++++----
 .../flink/streaming/util/TestStreamEnvironment.java   |  7 ++++---
 7 files changed, 41 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index e1b1453..07a552b 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.streaming.util.ClusterUtil;
 
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
@@ -26,10 +27,12 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment
{
 	/**
 	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
 	 * default name.
+	 *
+	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
 	@Override
-	public void execute() throws Exception {
-		ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism());
+	public JobExecutionResult execute() throws Exception {
+		return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism());
 	}
 
 	/**
@@ -38,10 +41,10 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment
{
 	 * 
 	 * @param jobName
 	 *            name of the job
+	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
 	@Override
-	public void execute(String jobName) throws Exception {
-		ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName),
-				getParallelism());
+	public JobExecutionResult execute(String jobName) throws Exception {
+		return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 3142bdd..4faa329 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -79,17 +80,17 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 	}
 
 	@Override
-	public void execute() {
+	public JobExecutionResult execute() {
 
 		JobGraph jobGraph = streamGraph.getJobGraph();
-		executeRemotely(jobGraph);
+		return executeRemotely(jobGraph);
 	}
 
 	@Override
-	public void execute(String jobName) {
+	public JobExecutionResult execute(String jobName) {
 
 		JobGraph jobGraph = streamGraph.getJobGraph(jobName);
-		executeRemotely(jobGraph);
+		return executeRemotely(jobGraph);
 	}
 
 	/**
@@ -97,8 +98,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 	 * 
 	 * @param jobGraph
 	 *            jobGraph to execute
+	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
-	private void executeRemotely(JobGraph jobGraph) {
+	private JobExecutionResult executeRemotely(JobGraph jobGraph) {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
@@ -112,7 +114,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 				JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()));
 
 		try {
-			client.run(jobGraph, true);
+			return client.run(jobGraph, true);
 		} catch (ProgramInvocationException e) {
 			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index f7dd0bf..b03ab0e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.environment;
 import java.io.File;
 import java.util.List;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -50,12 +51,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment
{
 	}
 
 	@Override
-	public void execute() throws Exception {
-		execute(null);
+	public JobExecutionResult execute() throws Exception {
+		return execute(null);
 	}
 
 	@Override
-	public void execute(String jobName) throws Exception {
+	public JobExecutionResult execute(String jobName) throws Exception {
 		currentEnvironment = null;
 
 		JobGraph jobGraph;
@@ -70,7 +71,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment
{
 		}
 
 		try {
-			client.run(jobGraph, true);
+			return client.run(jobGraph, true);
 
 		} catch (Exception e) {
 			throw e;

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9f2ccff..6dd7947 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.Serializer;
 
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -759,10 +760,11 @@ public abstract class StreamExecutionEnvironment {
 	 * <p>
 	 * The program execution will be logged and displayed with a generated
 	 * default name.
-	 * 
+	 *
+	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 * @throws Exception
 	 **/
-	public abstract void execute() throws Exception;
+	public abstract JobExecutionResult execute() throws Exception;
 
 	/**
 	 * Triggers the program execution. The environment will execute all parts of
@@ -773,10 +775,10 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @param jobName
 	 *            Desired name of the job
-	 * 
+	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 * @throws Exception
 	 **/
-	public abstract void execute(String jobName) throws Exception;
+	public abstract JobExecutionResult execute(String jobName) throws Exception;
 
 	/**
 	 * Getter of the {@link StreamGraph} of the streaming job.

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 592fa1a..02fccd0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
@@ -48,12 +49,12 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment
{
 	}
 
 	@Override
-	public void execute() throws Exception {
-		execute("");
+	public JobExecutionResult execute() throws Exception {
+		return execute("");
 	}
 
 	@Override
-	public void execute(String jobName) throws Exception {
+	public JobExecutionResult execute(String jobName) throws Exception {
 		currentEnvironment = null;
 
 		streamGraph.setJobName(jobName);

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index d04e7e6..77ac0c5 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
@@ -41,8 +42,9 @@ public class ClusterUtil {
 	 *            numberOfTaskTrackers
 	 * @param memorySize
 	 *            memorySize
+	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
-	public static void runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize)
+	public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long
memorySize)
 			throws Exception {
 
 		Configuration configuration = jobGraph.getJobConfiguration();
@@ -59,7 +61,7 @@ public class ClusterUtil {
 			exec = new LocalFlinkMiniCluster(configuration, true);
 			ActorRef jobClient = exec.getJobClient();
 
-			JobClient.submitJobAndWait(jobGraph, true, jobClient, exec.timeout());
+			return JobClient.submitJobAndWait(jobGraph, true, jobClient, exec.timeout());
 
 		} catch (Exception e) {
 			throw e;
@@ -70,7 +72,7 @@ public class ClusterUtil {
 		}
 	}
 
-	public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception
{
-		runOnMiniCluster(jobGraph, numOfSlots, -1);
+	public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws
Exception {
+		return runOnMiniCluster(jobGraph, numOfSlots, -1);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 5e785f9..a99e652 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -50,12 +50,12 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment
{
 	}
 
 	@Override
-	public void execute() throws Exception {
-		execute(DEFAULT_JOBNAME);
+	public JobExecutionResult execute() throws Exception {
+		return execute(DEFAULT_JOBNAME);
 	}
 
 	@Override
-	public void execute(String jobName) throws Exception {
+	public JobExecutionResult execute(String jobName) throws Exception {
 		JobGraph jobGraph = streamGraph.getJobGraph(jobName);
 
 		if (internalExecutor) {
@@ -70,6 +70,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 		try {
 			ActorRef client = executor.getJobClient();
 			latestResult = JobClient.submitJobAndWait(jobGraph, false, client, executor.timeout());
+			return latestResult;
 		} catch(JobExecutionException e) {
 			if (e.getMessage().contains("GraphConversionException")) {
 				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);


Mime
View raw message