flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-2632] [client] Fix web client to respect the class loader of submitted jobs
Date Thu, 10 Sep 2015 13:36:47 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.10.0-milestone-1 ba44abf9c -> d81a89644


[FLINK-2632] [client] Fix web client to respect the class loader of submitted jobs

This closes #1114


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d81a8964
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d81a8964
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d81a8964

Branch: refs/heads/release-0.10.0-milestone-1
Commit: d81a8964420303392543a622ba4774e8634b5e58
Parents: ba44abf
Author: mjsax <mjsax@apache.org>
Authored: Thu Sep 10 13:40:52 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Sep 10 15:36:12 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  9 ++++-----
 .../flink/client/web/JobSubmissionServlet.java  | 21 ++++++++++----------
 2 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d81a8964/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index ea1a6e9..2ba7769 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -69,7 +69,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
@@ -139,7 +138,7 @@ public class CliFrontend {
 
 	private FlinkPlan optimizedPlan;
 
-	private JobGraph jobGraph;
+	private PackagedProgram packagedProgram;
 
 	/**
 	 *
@@ -382,7 +381,7 @@ public class CliFrontend {
 
 			if (webFrontend) {
 				this.optimizedPlan = flinkPlan;
-				this.jobGraph = client.getJobGraph(program, flinkPlan);
+				this.packagedProgram = program;
 			} else {
 				String jsonPlan = new PlanJSONDumpGenerator()
 						.getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
@@ -957,8 +956,8 @@ public class CliFrontend {
 		return this.optimizedPlan;
 	}
 
-	public JobGraph getJobGraph() {
-		return this.jobGraph;
+	public PackagedProgram getPackagedProgram() {
+		return this.packagedProgram;
 	}
 
 	public void shutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d81a8964/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index fed3546..e43d7cc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -36,9 +36,11 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.plan.FlinkPlan;
@@ -46,7 +48,6 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,13 +87,13 @@ public class JobSubmissionServlet extends HttpServlet {
 
 	// ------------------------------------------------------------------------
 
-	private final File jobStoreDirectory;				// the directory containing the uploaded jobs
+	private final File jobStoreDirectory;										// the directory containing the uploaded
jobs
 
-	private final File planDumpDirectory;				// the directory to dump the optimizer plans to
+	private final File planDumpDirectory;										// the directory to dump the optimizer plans
to
 
-	private final Map<Long, JobGraph> submittedJobs;	// map from UIDs to the running jobs
+	private final Map<Long, Tuple2<PackagedProgram, FlinkPlan>> submittedJobs;	//
map from UIDs to the submitted jobs
 
-	private final Random rand;							// random number generator for UID
+	private final Random rand;													// random number generator for UID
 
 	private final CliFrontend cli;
 
@@ -103,7 +104,7 @@ public class JobSubmissionServlet extends HttpServlet {
 		this.jobStoreDirectory = jobDir;
 		this.planDumpDirectory = planDir;
 
-		this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
+		this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, Tuple2<PackagedProgram,
FlinkPlan>>());
 
 		this.rand = new Random(System.currentTimeMillis());
 	}
@@ -263,7 +264,7 @@ public class JobSubmissionServlet extends HttpServlet {
 					}
 				}
 				else {
-					this.submittedJobs.put(uid, this.cli.getJobGraph());
+					this.submittedJobs.put(uid, new Tuple2<PackagedProgram, FlinkPlan>(this.cli.getPackagedProgram(),
optPlan));
 				}
 
 				// redirect to the plan display page
@@ -304,7 +305,7 @@ public class JobSubmissionServlet extends HttpServlet {
 			}
 
 			// get the retained job
-			JobGraph job = submittedJobs.remove(uid);
+			Tuple2<PackagedProgram, FlinkPlan> job = submittedJobs.remove(uid);
 			if (job == null) {
 				resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
 					"No job with the given uid was retained for later submission.");
@@ -313,8 +314,8 @@ public class JobSubmissionServlet extends HttpServlet {
 
 			// submit the job
 			try {
-				Client client = new Client(GlobalConfiguration.getConfiguration(), getClass().getClassLoader());
-				client.run(job, false);
+				Client client = new Client(GlobalConfiguration.getConfiguration(), job.f0.getUserCodeClassLoader());
+				client.run(client.getJobGraph(job.f0, job.f1), false);
 			}
 			catch (Exception ex) {
 				LOG.error("Error submitting job to the job-manager.", ex);


Mime
View raw message