flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/9] flink git commit: [FLINK-2787] [core] Prevent instantiation of RemoteEnvironment when running a program through the command line.
Date Thu, 01 Oct 2015 09:54:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master c04a77042 -> 846ad7064


[FLINK-2787] [core] Prevent instantiation of RemoteEnvironment when running a program through
the command line.

This also cleans up the use of context environments.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891db5e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891db5e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891db5e3

Branch: refs/heads/master
Commit: 891db5e3e7037f5c889dae29d0182e6b50fc746c
Parents: c04a770
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Sep 29 21:31:26 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Oct 1 09:00:42 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |  9 +--
 .../org/apache/flink/client/program/Client.java |  6 +-
 .../client/program/ContextEnvironment.java      | 11 +++-
 .../program/OptimizerPlanEnvironment.java       | 12 ++--
 .../flink/client/program/PackagedProgram.java   | 15 ++---
 .../client/program/PreviewPlanEnvironment.java  |  7 ++-
 .../flink/api/java/ExecutionEnvironment.java    | 61 ++++++++++++--------
 .../apache/flink/api/java/LocalEnvironment.java | 27 ++++-----
 .../flink/api/java/RemoteEnvironment.java       | 38 +++++++++---
 9 files changed, 109 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 7928e53..25da5c7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -20,7 +20,6 @@ package org.apache.flink.client;
 
 import java.util.List;
 
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -31,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -63,7 +61,7 @@ public class LocalExecutor extends PlanExecutor {
 	private LocalFlinkMiniCluster flink;
 
 	/** Custom user configuration for the execution */
-	private Configuration configuration;
+	private final Configuration configuration;
 
 	/** Config value for how many slots to provide in the local cluster */
 	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
@@ -78,11 +76,6 @@ public class LocalExecutor extends PlanExecutor {
 	}
 
 	public LocalExecutor(Configuration conf) {
-		if (!ExecutionEnvironment.localExecutionIsAllowed()) {
-			throw new InvalidProgramException(
-					"The LocalEnvironment cannot be used when submitting a program through a client.");
-		}
-
 		this.configuration = conf != null ? conf : new Configuration();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 6c886fe..c72681d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -283,14 +283,13 @@ public class Client {
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
 			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(),
parallelism, true);
-			ContextEnvironment.enableLocalExecution(false);
 
 			// invoke here
 			try {
 				prog.invokeInteractiveModeForExecution();
 			}
 			finally {
-				ContextEnvironment.enableLocalExecution(true);
+				ContextEnvironment.unsetContext();
 			}
 
 			return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
@@ -310,14 +309,13 @@ public class Client {
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
 			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(),
parallelism, false);
-			ContextEnvironment.enableLocalExecution(false);
 
 			// invoke here
 			try {
 				prog.invokeInteractiveModeForExecution();
 			}
 			finally {
-				ContextEnvironment.enableLocalExecution(true);
+				ContextEnvironment.unsetContext();
 			}
 
 			return new JobSubmissionResult(lastJobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index ad14a06..e33a05d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -115,12 +115,17 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		initializeContextEnvironment(factory);
 	}
 	
-	protected static void enableLocalExecution(boolean enabled) {
-		ExecutionEnvironment.enableLocalExecution(enabled);
+	static void unsetContext() {
+		resetContextEnvironment();
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * The factory that instantiates the environment to be used when running jobs that are
+	 * submitted through a pre-configured client connection.
+	 * This happens for example when a job is submitted from the command line.
+	 */
 	public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 		
 		private final Client client;

http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index c9c3b45..a5a9362 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -66,8 +66,7 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 	}
 
 	public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException
{
-		setAsContext();
-
+		
 		// temporarily write syserr and sysout to a byte array.
 		PrintStream originalOut = System.out;
 		PrintStream originalErr = System.err;
@@ -75,8 +74,9 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 		System.setOut(new PrintStream(baos));
 		ByteArrayOutputStream baes = new ByteArrayOutputStream();
 		System.setErr(new PrintStream(baes));
+
+		setAsContext();
 		try {
-			ContextEnvironment.enableLocalExecution(false);
 			prog.invokeInteractiveModeForExecution();
 		}
 		catch (ProgramInvocationException e) {
@@ -91,7 +91,7 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 			}
 		}
 		finally {
-			ContextEnvironment.enableLocalExecution(true);
+			unsetAsContext();
 			System.setOut(originalOut);
 			System.setErr(originalErr);
 			System.err.println(baes);
@@ -115,6 +115,10 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 		};
 		initializeContextEnvironment(factory);
 	}
+	
+	private void unsetAsContext() {
+		resetContextEnvironment();
+	}
 
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 091a959..0cd4d07 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -259,7 +259,6 @@ public class PackagedProgram {
 			PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 			env.setAsContext();
 			try {
-				ContextEnvironment.enableLocalExecution(false);
 				invokeInteractiveModeForExecution();
 			}
 			catch (ProgramInvocationException e) {
@@ -276,7 +275,7 @@ public class PackagedProgram {
 				}
 			}
 			finally {
-				ContextEnvironment.enableLocalExecution(true);
+				env.unsetAsContext();
 			}
 			
 			if (env.previewPlan != null) {
@@ -292,12 +291,8 @@ public class PackagedProgram {
 
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
 		StringWriter string = new StringWriter(1024);
-		PrintWriter pw = null;
-		try {
-			pw = new PrintWriter(string);
+		try (PrintWriter pw = new PrintWriter(string)) {
 			jsonGen.dumpPactPlanAsJSON(previewPlan, pw);
-		} finally {
-			pw.close();
 		}
 		return string.toString();
 
@@ -455,9 +450,9 @@ public class PackagedProgram {
 	}
 
 	private static String getEntryPointClassNameFromJar(File jarFile) throws ProgramInvocationException
{
-		JarFile jar = null;
-		Manifest manifest = null;
-		String className = null;
+		JarFile jar;
+		Manifest manifest;
+		String className;
 
 		// Open jar file
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
index c5ced37..0051e60 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
@@ -57,7 +57,7 @@ public final class PreviewPlanEnvironment extends ExecutionEnvironment {
 	}
 
 	@Override
-	public void startNewSession() throws Exception {
+	public void startNewSession() {
 	}
 
 	public void setAsContext() {
@@ -69,6 +69,11 @@ public final class PreviewPlanEnvironment extends ExecutionEnvironment
{
 		};
 		initializeContextEnvironment(factory);
 	}
+	
+	public void unsetAsContext() {
+		resetContextEnvironment();
+	}
+	
 
 	public void setPreview(String preview) {
 		this.preview = preview;

http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 0f61d88..084e608 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -103,9 +103,6 @@ public abstract class ExecutionEnvironment {
 	/** The default parallelism used by local environments */
 	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
 	
-	/** flag to disable local executor when using the ContextEnvironment */
-	private static boolean allowLocalExecution = true;
-	
 	// --------------------------------------------------------------------------------------------
 
 	private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
@@ -1127,9 +1124,7 @@ public abstract class ExecutionEnvironment {
 	 * @return A local execution environment with the specified parallelism.
 	 */
 	public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration)
{
-		LocalEnvironment lee = new LocalEnvironment();
-		lee.setConfiguration(customConfiguration);
-		return lee;
+		return new LocalEnvironment(customConfiguration);
 	}
 	
 	/**
@@ -1159,16 +1154,15 @@ public abstract class ExecutionEnvironment {
 	 *
 	 * @param host The host name or address of the master (JobManager), where the program should
be executed.
 	 * @param port The port of the master (JobManager), where the program should be executed.
-	 * @param clientConfiguration Pass a custom configuration to the Client.
+	 * @param clientConfiguration Configuration used by the client that connects to the cluster.
 	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
program uses
 	 *                 user-defined functions, user-defined input formats, or any libraries,
those must be
 	 *                 provided in the JAR files.
 	 * @return A remote environment that executes the program on a cluster.
 	 */
-	public static ExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration
clientConfiguration, String... jarFiles) {
-		RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles);
-		rec.setClientConfiguration(clientConfiguration);
-		return rec;
+	public static ExecutionEnvironment createRemoteEnvironment(
+			String host, int port, Configuration clientConfiguration, String... jarFiles) {
+		return new RemoteEnvironment(host, port, clientConfiguration, jarFiles);
 	}
 
 	/**
@@ -1201,23 +1195,40 @@ public abstract class ExecutionEnvironment {
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	//  Methods to control the context and local environments for execution from packaged programs
+	//  Methods to control the context environment and creation of explicit environments other
+	//  than the context environment
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * Sets a context environment factory, that creates the context environment for running
programs
+	 * with pre-configured environments. Examples are running programs from the command line,
and
+	 * running programs in the Scala shell.
+	 * 
+	 * <p>When the context environment factors is set, no other environments can be explicitly
used.
+	 * 
+	 * @param ctx The context environment factory.
+	 */
 	protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
-		contextEnvironmentFactory = ctx;
+		contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
 	}
-	
-	protected static boolean isContextEnvironmentSet() {
-		return contextEnvironmentFactory != null;
-	}
-	
-	protected static void enableLocalExecution(boolean enabled) {
-		allowLocalExecution = enabled;
-	}
-	
-	public static boolean localExecutionIsAllowed() {
-		return allowLocalExecution;
+
+	/**
+	 * Un-sets the context environment factory. After this method is called, the call to
+	 * {@link #getExecutionEnvironment()} will again return a default local execution environment,
and
+	 * it is possible to explicitly instantiate the LocalEnvironment and the RemoteEnvironment.
+	 */
+	protected static void resetContextEnvironment() {
+		contextEnvironmentFactory = null;
 	}
 
+	/**
+	 * Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment
+	 * or a RemoteEnvironment.
+	 * 
+	 * @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
+	 *         RemoteEnvironment, false otherwise.
+	 */
+	public static boolean areExplicitEnvironmentsAllowed() {
+		return contextEnvironmentFactory == null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 8163244..93050e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -29,17 +29,17 @@ import org.apache.flink.configuration.Configuration;
  * An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the
JVM where the
  * environment is instantiated.
  * 
- * <p>When this environment is instantiated, it uses a default parallelism of {@code
1}. Teh default
- * parallelism can be set via {@link #setParallelism(int)}.</p>
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code
1}. The default
+ * parallelism can be set via {@link #setParallelism(int)}.
  * 
  * <p>Local environments can also be instantiated through {@link ExecutionEnvironment#createLocalEnvironment()}
  * and {@link ExecutionEnvironment#createLocalEnvironment(int)}. The former version will
pick a
- * default parallelism equal to the number of hardware contexts in the local machine.</p>
+ * default parallelism equal to the number of hardware contexts in the local machine.
  */
 public class LocalEnvironment extends ExecutionEnvironment {
 	
 	/** The user-defined configuration for the local execution */
-	private Configuration configuration;
+	private final Configuration configuration;
 
 	/** Create lazily upon first use */
 	private PlanExecutor executor;
@@ -53,20 +53,21 @@ public class LocalEnvironment extends ExecutionEnvironment {
 	 * Creates a new local environment.
 	 */
 	public LocalEnvironment() {
-		if (!ExecutionEnvironment.localExecutionIsAllowed()) {
-			throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting
a program through a client.");
-		}
-		this.configuration = new Configuration();
+		this(new Configuration());
 	}
 
 	/**
-	 * Sets a configuration used to configure the local Flink executor.
-	 * If {@code null} is passed, then the default configuration will be used.
+	 * Creates a new local environment that configures its local executor with the given configuration.
 	 * 
-	 * @param customConfiguration The configuration to be used for the local execution.
+	 * @param config The configuration used to configure the local executor.
 	 */
-	public void setConfiguration(Configuration customConfiguration) {
-		this.configuration = customConfiguration != null ? customConfiguration : new Configuration();
+	public LocalEnvironment(Configuration config) {
+		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+			throw new InvalidProgramException(
+					"The LocalEnvironment cannot be instantiated when running in a pre-defined context "
+
+							"(such as Command Line Client, Scala Shell, or TestEnvironment)");
+		}
+		this.configuration = config == null ? new Configuration() : config;
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/891db5e3/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 63f59d3..5a4aa1e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -32,7 +33,7 @@ import org.apache.flink.configuration.Configuration;
  * <p>Many programs executed via the remote environment depend on additional classes.
Such classes
  * may be the classes of functions (transformation, aggregation, ...) or libraries. Those
classes
  * must be attached to the remote environment as JAR files, to allow the environment to ship
the
- * classes into the cluster for the distributed execution.</p>
+ * classes into the cluster for the distributed execution.
  */
 public class RemoteEnvironment extends ExecutionEnvironment {
 	
@@ -45,11 +46,11 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	/** The jar files that need to be attached to each job */
 	private final String[] jarFiles;
 
-	/** The remote executor lazily created upon first use */
-	private PlanExecutor executor;
-	
+	/** The configuration used by the client that connects to the cluster */
 	private Configuration clientConfiguration;
 	
+	/** The remote executor lazily created upon first use */
+	private PlanExecutor executor;
 	
 	/** Optional shutdown hook, used in session mode to eagerly terminate the last session */
 	private Thread shutdownHook;
@@ -58,7 +59,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
 	 * given host name and port.
 	 * 
-	 * <p>Each program execution will have all the given JAR files in its classpath.</p>
+	 * <p>Each program execution will have all the given JAR files in its classpath.
 	 * 
 	 * @param host The host name or address of the master (JobManager), where the program should
be executed.
 	 * @param port The port of the master (JobManager), where the program should be executed.

@@ -67,6 +68,28 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	 *                 provided in the JAR files.
 	 */	
 	public RemoteEnvironment(String host, int port, String... jarFiles) {
+		this(host, port, null, jarFiles);
+	}
+
+	/**
+	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
+	 * given host name and port.
+	 * 
+	 * <p>Each program execution will have all the given JAR files in its classpath.
+	 *
+	 * @param host The host name or address of the master (JobManager), where the program should
be executed.
+	 * @param port The port of the master (JobManager), where the program should be executed.
+	 * @param clientConfig The configuration used by the client that connects to the cluster.
+	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
program uses
+	 *                 user-defined functions, user-defined input formats, or any libraries,
those must be
+	 *                 provided in the JAR files.
+	 */
+	public RemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)
{
+		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+			throw new InvalidProgramException(
+					"The RemoteEnvironment cannot be instantiated when running in a pre-defined context
" +
+							"(such as Command Line Client, Scala Shell, or TestEnvironment)");
+		}
 		if (host == null) {
 			throw new NullPointerException("Host must not be null.");
 		}
@@ -77,6 +100,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		this.host = host;
 		this.port = port;
 		this.jarFiles = jarFiles;
+		this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
 	}
 
 	// ------------------------------------------------------------------------
@@ -170,10 +194,6 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 				(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
 	}
 	
-	public void setClientConfiguration(Configuration clientConfiguration) {
-		this.clientConfiguration = clientConfiguration;
-	}
-	
 	// ------------------------------------------------------------------------
 	//  Shutdown hooks and reapers
 	// ------------------------------------------------------------------------


Mime
View raw message