flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: Revert "[FLINK-4913][yarn] include user jars in system class loader"
Date Fri, 25 Nov 2016 15:16:20 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 b9e6dcc3c -> 3b5d3c6f3


Revert "[FLINK-4913][yarn] include user jars in system class loader"

This reverts commit ea41b9c56fdc0af3c97d6dd48d04218db6176ec8.

This closes #2795


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5d3c6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5d3c6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5d3c6f

Branch: refs/heads/release-1.1
Commit: 3b5d3c6f359dbbdfebcf0b7c034264a3ed9ad12c
Parents: b9e6dcc
Author: Ufuk Celebi <uce@apache.org>
Authored: Sat Nov 12 20:49:17 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Nov 25 16:14:52 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 14 ++----
 .../flink/client/cli/CustomCommandLine.java     | 14 +-----
 .../org/apache/flink/client/cli/DefaultCLI.java |  5 +-
 .../flink/client/program/ClusterClient.java     | 40 +++++-----------
 .../flink/client/program/PackagedProgram.java   | 46 ++++++-------------
 .../client/program/StandaloneClusterClient.java |  6 ---
 .../org/apache/flink/api/scala/FlinkShell.scala |  7 +--
 ...CliFrontendYarnAddressConfigurationTest.java |  5 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |  5 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 48 +-------------------
 .../apache/flink/yarn/YarnClusterClient.java    |  6 ---
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 10 +---
 12 files changed, 40 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 3a322dc..69963fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -234,7 +234,7 @@ public class CliFrontend {
 		ClusterClient client = null;
 		try {
 
-			client = createClient(options, program);
+			client = createClient(options, program.getMainClassName());
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			client.setDetached(options.getDetachedMode());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
@@ -871,12 +871,12 @@ public class CliFrontend {
 	/**
 	 * Creates a {@link ClusterClient} object from the given command line options and other
parameters.
 	 * @param options Command line options
-	 * @param program The program for which to create the client.
+	 * @param programName Program name
 	 * @throws Exception
 	 */
 	protected ClusterClient createClient(
 			CommandLineOptions options,
-			PackagedProgram program) throws Exception {
+			String programName) throws Exception {
 
 		// Get the custom command-line (e.g. Standalone/Yarn/Mesos)
 		CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
@@ -887,12 +887,8 @@ public class CliFrontend {
 			logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
 		} catch (UnsupportedOperationException e) {
 			try {
-				String applicationName = "Flink Application: " + program.getMainClassName();
-				client = activeCommandLine.createCluster(
-					applicationName,
-					options.getCommandLine(),
-					config,
-					program.getAllLibraries());
+				String applicationName = "Flink Application: " + programName;
+				client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
 				logAndSysout("Cluster started: " + client.getClusterIdentifier());
 			} catch (UnsupportedOperationException e2) {
 				throw new IllegalConfigurationException(

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index c58c74c..aecdc7c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -22,9 +22,6 @@ import org.apache.commons.cli.Options;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 
-import java.net.URL;
-import java.util.List;
-
 
 /**
  * Custom command-line interface to load hooks for the command-line interface.
@@ -64,22 +61,15 @@ public interface CustomCommandLine<ClusterType extends ClusterClient>
{
 	 * @return Client if a cluster could be retrieved
 	 * @throws UnsupportedOperationException if the operation is not supported
 	 */
-	ClusterType retrieveCluster(
-			CommandLine commandLine,
-			Configuration config) throws UnsupportedOperationException;
+	ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
 
 	/**
 	 * Creates the client for the cluster
 	 * @param applicationName The application name to use
 	 * @param commandLine The command-line options parsed by the CliFrontend
 	 * @param config The Flink config to use
-	 * @param userJarFiles User jar files to include in the classpath of the cluster.
 	 * @return The client to communicate with the cluster which the CustomCommandLine brought
up.
 	 * @throws UnsupportedOperationException if the operation is not supported
 	 */
-	ClusterType createCluster(
-			String applicationName,
-			CommandLine commandLine,
-			Configuration config,
-			List<URL> userJarFiles) throws UnsupportedOperationException;
+	ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration
config) throws UnsupportedOperationException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 598c612..5f83c3d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -26,8 +26,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
 import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.List;
 
 import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
 
@@ -77,8 +75,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient>
{
 	public StandaloneClusterClient createCluster(
 			String applicationName,
 			CommandLine commandLine,
-			Configuration config,
-			List<URL> userJarFiles) throws UnsupportedOperationException {
+			Configuration config) throws UnsupportedOperationException {
 
 		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
 		return descriptor.deploy();

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 5e88af6..2d743fa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -307,27 +307,11 @@ public abstract class ClusterClient {
 	{
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
-
-			final JobWithJars jobWithJars;
-			if (hasUserJarsInClassPath(prog.getAllLibraries())) {
-				jobWithJars = prog.getPlanWithoutJars();
-			} else {
-				jobWithJars = prog.getPlanWithJars();
-			}
-
-			return run(jobWithJars, parallelism, prog.getSavepointSettings());
+			return run(prog.getPlanWithJars(), parallelism, prog.getSavepointSettings());
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
-
-			final List<URL> libraries;
-			if (hasUserJarsInClassPath(prog.getAllLibraries())) {
-				libraries = Collections.emptyList();
-			} else {
-				libraries = prog.getAllLibraries();
-			}
-
-			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
+			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
 					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
 					prog.getSavepointSettings());
 			ContextEnvironment.setAsContext(factory);
@@ -365,7 +349,7 @@ public abstract class ClusterClient {
 	 * Runs a program on the Flink cluster to which this client is connected. The call blocks
until the
 	 * execution is complete, and returns afterwards.
 	 *
-	 * @param jobWithJars The program to be executed.
+	 * @param program The program to be executed.
 	 * @param parallelism The default parallelism to use when running the program. The default
parallelism is used
 	 *                    when the program does not set a parallelism by itself.
 	 *
@@ -375,15 +359,15 @@ public abstract class ClusterClient {
 	 *                                    i.e. the job-manager is unreachable, or due to the
fact that the
 	 *                                    parallel execution failed.
 	 */
-	public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings
savepointSettings)
+	public JobSubmissionResult run(JobWithJars program, int parallelism, SavepointRestoreSettings
savepointSettings)
 			throws CompilerException, ProgramInvocationException {
-		ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
+		ClassLoader classLoader = program.getUserCodeClassLoader();
 		if (classLoader == null) {
 			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode
class loader.");
 		}
 
-		OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
-		return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader,
savepointSettings);
+		OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
+		return run(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointSettings);
 	}
 
 	public JobSubmissionResult run(
@@ -614,6 +598,10 @@ public abstract class ClusterClient {
 		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
 	}
 
+	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException
{
+		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null);
+	}
+
 	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings
savepointSettings) throws ProgramInvocationException {
 		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings);
 	}
@@ -740,12 +728,6 @@ public abstract class ClusterClient {
 	public abstract int getMaxSlots();
 
 	/**
-	 * Returns true if the client already has the user jar and providing it again would
-	 * result in duplicate uploading of the jar.
-	 */
-	public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);
-
-	/**
 	 * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods
or it may perform
 	 * some custom job submission logic.
 	 * @param jobGraph The JobGraph to be submitted

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 8931a3e..daa5737 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -283,38 +283,23 @@ public class PackagedProgram {
 	}
 
 	/**
-	 * Returns the plan without the required jars when the files are already provided by the
cluster.
-	 *
-	 * @return The plan without attached jar files.
-	 * @throws ProgramInvocationException
-	 */
-	public JobWithJars getPlanWithoutJars() throws ProgramInvocationException {
-		if (isUsingProgramEntryPoint()) {
-			return new JobWithJars(getPlan(), Collections.<URL>emptyList(), classpaths, userCodeClassLoader);
-		} else {
-			throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName()
+
-				" for a program that is using the interactive mode.");
-		}
-	}
-
-	/**
 	 * Returns the plan with all required jars.
-	 *
+	 * 
 	 * @return The plan with attached jar files.
-	 * @throws ProgramInvocationException
+	 * @throws ProgramInvocationException 
 	 */
 	public JobWithJars getPlanWithJars() throws ProgramInvocationException {
 		if (isUsingProgramEntryPoint()) {
 			return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader);
 		} else {
-			throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName()
+
+			throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName()
+ 
 					" for a program that is using the interactive mode.");
 		}
 	}
 
 	/**
 	 * Returns the analyzed plan without any optimizations.
-	 *
+	 * 
 	 * @return
 	 *         the analyzed plan without any optimizations.
 	 * @throws ProgramInvocationException Thrown if an error occurred in the
@@ -324,7 +309,7 @@ public class PackagedProgram {
 	public String getPreviewPlan() throws ProgramInvocationException {
 		Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
 		List<DataSinkNode> previewPlan;
-
+		
 		if (isUsingProgramEntryPoint()) {
 			previewPlan = Optimizer.createPreOptimizedPlan(getPlan());
 		}
@@ -351,7 +336,7 @@ public class PackagedProgram {
 			finally {
 				env.unsetAsContext();
 			}
-
+			
 			if (env.previewPlan != null) {
 				previewPlan =  env.previewPlan;
 			} else {
@@ -375,7 +360,7 @@ public class PackagedProgram {
 	/**
 	 * Returns the description provided by the Program class. This
 	 * may contain a description of the plan itself and its arguments.
-	 *
+	 * 
 	 * @return The description of the PactProgram's input parameters.
 	 * @throws ProgramInvocationException
 	 *         This invocation is thrown if the Program can't be properly loaded. Causes
@@ -383,7 +368,7 @@ public class PackagedProgram {
 	 */
 	public String getDescription() throws ProgramInvocationException {
 		if (ProgramDescription.class.isAssignableFrom(this.mainClass)) {
-
+			
 			ProgramDescription descr;
 			if (this.program != null) {
 				descr = (ProgramDescription) this.program;
@@ -395,22 +380,22 @@ public class PackagedProgram {
 					return null;
 				}
 			}
-
+			
 			try {
 				return descr.getDescription();
 			}
 			catch (Throwable t) {
-				throw new ProgramInvocationException("Error while getting the program description" +
+				throw new ProgramInvocationException("Error while getting the program description" +

 						(t.getMessage() == null ? "." : ": " + t.getMessage()), t);
 			}
-
+			
 		} else {
 			return null;
 		}
 	}
-
+	
 	/**
-	 *
+	 * 
 	 * This method assumes that the context environment is prepared, or the execution
 	 * will be a local execution by default.
 	 */
@@ -433,16 +418,13 @@ public class PackagedProgram {
 
 	/**
 	 * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
-	 *
+	 * 
 	 * @return The user code ClassLoader.
 	 */
 	public ClassLoader getUserCodeClassLoader() {
 		return this.userCodeClassLoader;
 	}
 
-	/**
-	 * Returns all provided libraries needed to run the program.
-	 */
 	public List<URL> getAllLibraries() {
 		List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 296ddc9..3343b69 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -28,7 +28,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 
 import java.io.IOException;
-import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
@@ -88,11 +87,6 @@ public class StandaloneClusterClient extends ClusterClient {
 	}
 
 	@Override
-	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
-		return false;
-	}
-
-	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
 			throws ProgramInvocationException {
 		if (isDetached()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index f00013e..fb70280 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.Collections
 
 import org.apache.commons.cli.CommandLine
 import org.apache.flink.client.cli.CliFrontendParser
@@ -253,11 +252,7 @@ object FlinkShell {
     val config = frontend.getConfiguration
     val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
 
-    val cluster = customCLI.createCluster(
-      "Flink Scala Shell",
-      options.getCommandLine,
-      config,
-      Collections.emptyList())
+    val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config)
 
     val address = cluster.getJobManagerAddress.getAddress.getHostAddress
     val port = cluster.getJobManagerAddress.getPort

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 77d3149..8ba786f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -332,8 +331,8 @@ public class CliFrontendYarnAddressConfigurationTest {
 
 		@Override
 		// make method public
-		public ClusterClient createClient(CommandLineOptions options, PackagedProgram program)
throws Exception {
-			return super.createClient(options, program);
+		public ClusterClient createClient(CommandLineOptions options, String programName) throws
Exception {
+			return super.createClient(options, programName);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 78e16ed..7e612c4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -23,7 +23,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -668,9 +667,9 @@ public abstract class YarnTestBase extends TestLogger {
 		public TestingCLI() throws Exception {}
 
 		@Override
-		protected ClusterClient createClient(CommandLineOptions options, PackagedProgram program)
throws Exception {
+		protected ClusterClient createClient(CommandLineOptions options, String programName) throws
Exception {
 			// mock the returned ClusterClient to disable shutdown and verify shutdown behavior later
on
-			originalClusterClient = super.createClient(options, program);
+			originalClusterClient = super.createClient(options, programName);
 			spiedClusterClient = Mockito.spy(originalClusterClient);
 			Mockito.doNothing().when(spiedClusterClient).shutdown();
 			return spiedClusterClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 000b2c1..ab1fbc1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -61,8 +61,6 @@ import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.security.PrivilegedExceptionAction;
-import java.net.URISyntaxException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -132,10 +130,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private String zookeeperNamespace;
 
-	/** Optional Jar file to include in the system class loader of all application nodes
-	 * (for per-job submission) */
-	private Set<File> userJarFiles;
-
 	public AbstractYarnClusterDescriptor() {
 		// for unit tests only
 		if(System.getenv("IN_TESTS") != null) {
@@ -246,41 +240,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
 	}
 
-	/**
-	 * Returns true if the descriptor has the job jars to include in the classpath.
-	 */
-	public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
-		if (userJarFiles == null || userJarFiles.size() != requiredJarFiles.size()) {
-			return false;
-		}
-		try {
-			for(URL jarFile : requiredJarFiles) {
-				if (!userJarFiles.contains(new File(jarFile.toURI()))) {
-					return false;
-				}
-			}
-		} catch (URISyntaxException e) {
-			return false;
-		}
-		return true;
-	}
-
-	/**
-	 * Sets the user jar which is included in the system classloader of all nodes.
-	 */
-	public void setProvidedUserJarFiles(List<URL> userJarFiles) {
-		Set<File> localUserJarFiles = new HashSet<>(userJarFiles.size());
-		for (URL jarFile : userJarFiles) {
-			try {
-				localUserJarFiles.add(new File(jarFile.toURI()));
-			} catch (URISyntaxException e) {
-				throw new IllegalArgumentException("Couldn't add local user jar: " + jarFile
-					+ " Currently only file:/// URLs are supported.");
-			}
-		}
-		this.userJarFiles = localUserJarFiles;
-	}
-
 	public String getDynamicPropertiesEncoded() {
 		return this.dynamicPropertiesEncoded;
 	}
@@ -596,11 +555,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback,
hasLog4j);
 
-		// add the user jar to the classpath of the to-be-created cluster
-		if (userJarFiles != null) {
-			effectiveShipFiles.addAll(userJarFiles);
-		}
-
 		// Set-up ApplicationSubmissionContext for the application
 		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
 
@@ -755,7 +709,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			try {
 				report = yarnClient.getApplicationReport(appId);
 			} catch (IOException e) {
-				throw new YarnDeploymentException("Failed to deploy the cluster.", e);
+				throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
 			}
 			YarnApplicationState appState = report.getYarnApplicationState();
 			switch(appState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index cd447d7..79501b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -55,7 +55,6 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -195,11 +194,6 @@ public class YarnClusterClient extends ClusterClient {
 	}
 
 	@Override
-	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
-		return clusterDescriptor.hasUserJarFiles(userJarFiles);
-	}
-
-	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws
ProgramInvocationException {
 		if (isDetached()) {
 			if (newlyCreatedCluster) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 4823d35..28d8fb8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -29,7 +29,6 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -49,7 +48,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
-import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -517,16 +515,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	}
 
 	@Override
-	public YarnClusterClient createCluster(
-			String applicationName,
-			CommandLine cmdLine,
-			Configuration config,
-			List<URL> userJarFiles) {
-		Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
+	public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration
config) {
 
 		AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName,
cmdLine);
 		yarnClusterDescriptor.setFlinkConfiguration(config);
-		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
 		try {
 			return yarnClusterDescriptor.deploy();


Mime
View raw message