flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject incubator-flink git commit: [streaming] Streaming local execution fix to create proper jobclient for minicluster
Date Fri, 19 Dec 2014 19:43:39 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 227e40fe1 -> 33964003c


[streaming] Streaming local execution fix to create proper jobclient for minicluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/33964003
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/33964003
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/33964003

Branch: refs/heads/master
Commit: 33964003c7f85d0884edac8150628907a327edc1
Parents: 227e40f
Author: Gyula Fora <gyfora@apache.org>
Authored: Fri Dec 19 08:46:26 2014 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Dec 19 08:46:26 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/util/ClusterUtil.java       | 29 ++++++++------------
 1 file changed, 11 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/33964003/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 8fb6554..4362510 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -17,17 +17,16 @@
 
 package org.apache.flink.streaming.util;
 
-import java.net.InetSocketAddress;
-
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import akka.actor.ActorRef;
+
 public class ClusterUtil {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
@@ -43,8 +42,8 @@ public class ClusterUtil {
 	 * @param memorySize
 	 *            memorySize
 	 */
-	public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism,
-										long memorySize) throws Exception  {
+	public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism, long memorySize)
+			throws Exception {
 
 		Configuration configuration = jobGraph.getJobConfiguration();
 
@@ -58,22 +57,16 @@ public class ClusterUtil {
 
 		try {
 			exec = new LocalFlinkMiniCluster(configuration, true);
+			ActorRef jobClient = exec.getJobClient();
+
+			JobClient.submitJobAndWait(jobGraph, false, jobClient, exec.timeout());
 
-			Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRPCPort()),
-					configuration, ClusterUtil.class.getClassLoader());
-			client.run(jobGraph, true);
-		} catch (ProgramInvocationException e) {
-			if (e.getMessage().contains("GraphConversionException")) {
-				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
-			} else {
-				throw e;
-			}
 		} catch (Exception e) {
 			throw e;
 		} finally {
-				if(exec != null) {
-					exec.stop();
-				}
+			if (exec != null) {
+				exec.stop();
+			}
 		}
 	}
 


Mime
View raw message