flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [4/4] flink git commit: [FLINK-2097][core] implement a job session management
Date Tue, 22 Sep 2015 19:57:20 GMT
[FLINK-2097][core] implement a job session management

Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking #640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes #858.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71bf2f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71bf2f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71bf2f57

Branch: refs/heads/master
Commit: 71bf2f570861daae53b24bfcf1d06aedb85311b9
Parents: 7984acc
Author: Maximilian Michels <mxm@apache.org>
Authored: Fri Sep 4 17:34:44 2015 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Sep 22 19:55:46 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 182 +++---
 .../org/apache/flink/client/LocalExecutor.java  | 222 ++++---
 .../org/apache/flink/client/RemoteExecutor.java | 188 ++++--
 .../org/apache/flink/client/program/Client.java | 653 +++++++++----------
 .../client/program/ContextEnvironment.java      |  40 +-
 .../flink/client/program/JobWithJars.java       |   5 +-
 .../program/OptimizerPlanEnvironment.java       | 132 ++++
 .../flink/client/program/PackagedProgram.java   |  51 +-
 .../client/program/PreviewPlanEnvironment.java  |  80 +++
 .../flink/client/web/JobSubmissionServlet.java  |  30 +-
 .../flink/client/CliFrontendInfoTest.java       |  81 +--
 .../client/CliFrontendPackageProgramTest.java   |  10 +-
 .../apache/flink/client/CliFrontendRunTest.java |  30 +-
 .../RemoteExecutorHostnameResolutionTest.java   |   6 +-
 .../client/program/ClientConnectionTest.java    |   8 +-
 .../apache/flink/client/program/ClientTest.java | 142 ++--
 .../ExecutionPlanAfterExecutionTest.java        |  15 +-
 .../program/ExecutionPlanCreationTest.java      |   9 +-
 .../client/program/PackagedProgramTest.java     |   1 -
 .../stormcompatibility/api/FlinkClient.java     |  22 +-
 .../flink/api/common/JobExecutionResult.java    |  17 +-
 .../java/org/apache/flink/api/common/JobID.java |  46 +-
 .../flink/api/common/JobSubmissionResult.java   |   5 +-
 .../java/org/apache/flink/api/common/Plan.java  |  71 +-
 .../apache/flink/api/common/PlanExecutor.java   |  85 ++-
 .../flink/api/java/CollectionEnvironment.java   |   4 +
 .../flink/api/java/ExecutionEnvironment.java    |  97 ++-
 .../apache/flink/api/java/LocalEnvironment.java | 180 ++++-
 .../flink/api/java/RemoteEnvironment.java       | 153 ++++-
 .../flink/optimizer/plan/OptimizedPlan.java     |  15 +-
 .../plantranslate/JobGraphGenerator.java        |  24 +-
 .../optimizer/postpass/JavaApiPostPass.java     |   2 +-
 .../apache/flink/runtime/client/JobClient.java  |  11 +-
 .../runtime/client/JobExecutionException.java   |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +
 .../apache/flink/runtime/jobgraph/JobGraph.java |  58 +-
 .../runtime/taskmanager/TaskExecutionState.java |   2 +-
 .../flink/runtime/jobmanager/JobInfo.scala      |  18 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  98 ++-
 .../runtime/jobmanager/MemoryArchivist.scala    |  11 +-
 .../runtime/messages/JobManagerMessages.scala   |   6 +
 .../runtime/minicluster/FlinkMiniCluster.scala  |   9 +-
 .../PartialConsumePipelinedResultTest.java      |   3 +-
 .../TaskManagerProcessReapingTest.java          |   2 +-
 .../runtime/jobmanager/JobManagerITCase.scala   | 126 +++-
 .../flink/api/scala/ExecutionEnvironment.scala  |  31 +-
 .../api/avro/AvroExternalJarProgramITCase.java  |  38 +-
 .../environment/RemoteStreamEnvironment.java    |  24 +-
 .../environment/StreamContextEnvironment.java   |  28 +-
 .../environment/StreamExecutionEnvironment.java |  11 +-
 .../api/environment/StreamPlanEnvironment.java  |   7 +-
 .../flink/tez/client/LocalTezEnvironment.java   |   5 +
 .../flink/tez/client/RemoteTezEnvironment.java  |   5 +
 .../apache/flink/tez/client/TezExecutor.java    |  21 +
 .../apache/flink/test/util/TestEnvironment.java |   4 +
 .../clients/examples/LocalExecutorITCase.java   |   3 +-
 .../RemoteEnvironmentITCase.java                |   2 +-
 .../jsonplan/DumpCompiledPlanTest.java          |   6 +-
 .../jsonplan/JsonJobGraphGenerationTest.java    |   4 +
 .../jobmanager/JobManagerFailsITCase.scala      |   1 -
 .../org/apache/flink/yarn/YarnTestBase.java     |   2 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java |   2 +
 62 files changed, 2114 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index fc4d98a..f0e6c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -57,10 +57,13 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -297,44 +300,51 @@ public class CliFrontend {
 			int userParallelism = options.getParallelism();
 			LOG.debug("User parallelism is set to {}", userParallelism);
 
-			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+			Client client = getClient(options, program.getMainClassName(), userParallelism);
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
-			if(client.getMaxSlots() != -1 && userParallelism == -1) {
-				logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
-						"To use another parallelism, set it at the ./bin/flink client.");
-				userParallelism = client.getMaxSlots();
-			}
 
-			// check if detached per job yarn cluster is used to start flink
-			if(yarnCluster != null && yarnCluster.isDetached()) {
-				logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
-						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-						"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
-						"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
-				exitCode = executeProgram(program, client, userParallelism, false);
-			} else {
-				// regular (blocking) execution.
-				exitCode = executeProgram(program, client, userParallelism, true);
-			}
+			try {
+				if (client.getMaxSlots() != -1 && userParallelism == -1) {
+					logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
+							"To use another parallelism, set it at the ./bin/flink client.");
+					userParallelism = client.getMaxSlots();
+				}
 
-			// show YARN cluster status if its not a detached YARN cluster.
-			if (yarnCluster != null && !yarnCluster.isDetached()) {
-				List<String> msgs = yarnCluster.getNewMessages();
-				if (msgs != null && msgs.size() > 1) {
+				// check if detached per job yarn cluster is used to start flink
+				if (yarnCluster != null && yarnCluster.isDetached()) {
+					logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
+							"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+							"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
+							"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+					exitCode = executeProgramDetached(program, client, userParallelism);
+				}
+				else {
+					// regular (blocking) execution.
+					exitCode = executeProgramBlocking(program, client, userParallelism);
+				}
+
+				// show YARN cluster status if its not a detached YARN cluster.
+				if (yarnCluster != null && !yarnCluster.isDetached()) {
+					List<String> msgs = yarnCluster.getNewMessages();
+					if (msgs != null && msgs.size() > 1) {
 
-					logAndSysout("The following messages were created by the YARN cluster while running the Job:");
-					for (String msg : msgs) {
-						logAndSysout(msg);
+						logAndSysout("The following messages were created by the YARN cluster while running the Job:");
+						for (String msg : msgs) {
+							logAndSysout(msg);
+						}
+					}
+					if (yarnCluster.hasFailed()) {
+						logAndSysout("YARN cluster is in failed state!");
+						logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
 					}
 				}
-				if (yarnCluster.hasFailed()) {
-					logAndSysout("YARN cluster is in failed state!");
-					logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
-				}
-			}
 
-			return exitCode;
+				return exitCode;
+			}
+			finally {
+				client.shutdown();
+			}
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -395,8 +405,10 @@ public class CliFrontend {
 			int parallelism = options.getParallelism();
 
 			LOG.info("Creating program plan dump");
-			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
-			FlinkPlan flinkPlan = client.getOptimizedPlan(program, parallelism);
+
+			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
+
+			FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);
 
 			if (webFrontend) {
 				this.optimizedPlan = flinkPlan;
@@ -425,6 +437,8 @@ public class CliFrontend {
 				}
 			}
 			return 0;
+
+
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -623,52 +637,65 @@ public class CliFrontend {
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
 
-	protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
-		LOG.info("Starting execution of program");
-		JobSubmissionResult execResult;
+	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+		JobSubmissionResult result;
 		try {
-			execResult = client.run(program, parallelism, wait);
-		}
-		catch (ProgramInvocationException e) {
+			result = client.runDetached(program, parallelism);
+		} catch (ProgramInvocationException e) {
 			return handleError(e);
-		}
-		finally {
+		} finally {
 			program.deleteExtractedLibraries();
 		}
 
-		if(wait) {
-			LOG.info("Program execution finished");
-		}
-
-		// we come here after the job has finished (or the job has been submitted)
-		if (execResult != null) {
+		if (result != null) {
 			// if the job has been submitted to a detached YARN cluster, there won't be any
 			// exec results, but the object will be set (for the job id)
 			if (yarnCluster != null && yarnCluster.isDetached()) {
-				if(execResult.getJobID() == null) {
-					throw new RuntimeException("Error while starting job. No Job ID set.");
-				}
-				yarnCluster.stopAfterJob(execResult.getJobID());
+
+				yarnCluster.stopAfterJob(result.getJobID());
 				yarnCluster.disconnect();
-				if(!webFrontend) {
-					System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
+				if (!webFrontend) {
+					System.out.println("The Job has been submitted with JobID " + result.getJobID());
 				}
 				return 0;
-			}
-			if (execResult instanceof JobExecutionResult) {
-				JobExecutionResult result = (JobExecutionResult) execResult;
-				if(!webFrontend) {
-					System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
-				}
-				Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
-				if (accumulatorsResult.size() > 0 && !webFrontend) {
-					System.out.println("Accumulator Results: ");
-					System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
-				}
 			} else {
-				LOG.info("The Job did not return an execution result");
+				throw new RuntimeException("Error while starting job. No Job ID set.");
+			}
+		}
+
+		return 0;
+	}
+
+	protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
+		LOG.info("Starting execution of program");
+
+		JobExecutionResult result;
+		try {
+			client.setPrintStatusDuringExecution(true);
+			result = client.runBlocking(program, parallelism);
+		}
+		catch (ProgramInvocationException e) {
+			return handleError(e);
+		}
+		finally {
+			program.deleteExtractedLibraries();
+		}
+
+		LOG.info("Program execution finished");
+
+		if (result != null) {
+			if (!webFrontend) {
+				System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
+			}
+			Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+			if (accumulatorsResult.size() > 0 && !webFrontend) {
+				System.out.println("Accumulator Results: ");
+				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
 			}
+		} else {
+			LOG.info("The Job did not return an execution result");
 		}
+
 		return 0;
 	}
 
@@ -767,7 +794,6 @@ public class CliFrontend {
 	 * Retrieves a {@link Client} object from the given command line options and other parameters.
 	 *
 	 * @param options Command line options which contain JobManager address
-	 * @param classLoader Class loader to use by the Client
 	 * @param programName Program name
 	 * @param userParallelism Given user parallelism
 	 * @return
@@ -775,12 +801,10 @@ public class CliFrontend {
 	 */
 	protected Client getClient(
 			CommandLineOptions options,
-			ClassLoader classLoader,
 			String programName,
 			int userParallelism)
 		throws Exception {
-		InetSocketAddress jobManagerAddress = null;
-
+		InetSocketAddress jobManagerAddress;
 		int maxSlots = -1;
 
 		if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
@@ -796,14 +820,16 @@ public class CliFrontend {
 
 			// the number of slots available from YARN:
 			int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-			if(yarnTmSlots == -1) {
+			if (yarnTmSlots == -1) {
 				yarnTmSlots = 1;
 			}
 			maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
-			if(userParallelism != -1) {
+			if (userParallelism != -1) {
 				int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
-				logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
-						"Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
+				logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
+						"but the user requested a parallelism of " + userParallelism + " on YARN. " +
+						"Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
+						"will get "+slotsPerTM+" slots.");
 				flinkYarnClient.setTaskManagerSlots(slotsPerTM);
 			}
 
@@ -811,11 +837,12 @@ public class CliFrontend {
 				yarnCluster = flinkYarnClient.deploy();
 				yarnCluster.connectToCluster();
 			}
-			catch(Exception e) {
+			catch (Exception e) {
 				throw new RuntimeException("Error deploying the YARN cluster", e);
 			}
 
 			jobManagerAddress = yarnCluster.getJobManagerAddress();
+			writeJobManagerAddressToConfig(jobManagerAddress);
 
 			logAndSysout("YARN cluster started");
 			logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
@@ -847,14 +874,11 @@ public class CliFrontend {
 		else {
 			if(options.getJobManagerAddress() != null) {
 				jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
+				writeJobManagerAddressToConfig(jobManagerAddress);
 			}
 		}
 
-		if(jobManagerAddress != null) {
-			writeJobManagerAddressToConfig(jobManagerAddress);
-		}
-
-		return new Client(config, classLoader, maxSlots);
+		return new Client(config, maxSlots);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index cf08e0a..7928e53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -22,11 +22,14 @@ import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
@@ -35,52 +38,66 @@ import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 /**
- * A class for executing a {@link Plan} on a local embedded Flink runtime instance.
+ * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
+ *
+ * <p>By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} method,
+ * this executor still start up and shut down again immediately after the program finished.</p>
+ *
+ * <p>To use this executor to execute many dataflow programs that constitute one job together,
+ * then this executor needs to be explicitly started, to keep running across several executions.</p>
  */
 public class LocalExecutor extends PlanExecutor {
 	
-	private static boolean DEFAULT_OVERWRITE = false;
+	private static final boolean DEFAULT_OVERWRITE = false;
 
 	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
 
-	private final Object lock = new Object();	// we lock to ensure singleton execution
-	
+	/** we lock to ensure singleton execution */
+	private final Object lock = new Object();
+
+	/** The mini cluster on which to execute the local programs */
 	private LocalFlinkMiniCluster flink;
 
+	/** Custom user configuration for the execution */
 	private Configuration configuration;
 
-	// ---------------------------------- config options ------------------------------------------
-	
+	/** Config value for how many slots to provide in the local cluster */
 	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
+	/** Config flag whether to overwrite existing files by default */
 	private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
-	
-	// --------------------------------------------------------------------------------------------
-	
+
+	// ------------------------------------------------------------------------
+
 	public LocalExecutor() {
-		if (!ExecutionEnvironment.localExecutionIsAllowed()) {
-			throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
-		}
+		this(null);
 	}
 
 	public LocalExecutor(Configuration conf) {
-		this();
-		this.configuration = conf;
+		if (!ExecutionEnvironment.localExecutionIsAllowed()) {
+			throw new InvalidProgramException(
+					"The LocalEnvironment cannot be used when submitting a program through a client.");
+		}
+
+		this.configuration = conf != null ? conf : new Configuration();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Configuration
+	// ------------------------------------------------------------------------
 
-	
 	public boolean isDefaultOverwriteFiles() {
 		return defaultOverwriteFiles;
 	}
-	
+
 	public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
 		this.defaultOverwriteFiles = defaultOverwriteFiles;
 	}
-	
+
 	public void setTaskManagerNumSlots(int taskManagerNumSlots) {
 		this.taskManagerNumSlots = taskManagerNumSlots; 
 	}
@@ -88,51 +105,48 @@ public class LocalExecutor extends PlanExecutor {
 	public int getTaskManagerNumSlots() {
 		return this.taskManagerNumSlots;
 	}
-	
-	// --------------------------------------------------------------------------------------------
 
-	public static Configuration createConfiguration(LocalExecutor le) {
-		Configuration configuration = new Configuration();
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
-		configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
-		return configuration;
-	}
+	// --------------------------------------------------------------------------------------------
 
+	@Override
 	public void start() throws Exception {
-		synchronized (this.lock) {
-			if (this.flink == null) {
-				
+		synchronized (lock) {
+			if (flink == null) {
 				// create the embedded runtime
-				Configuration configuration = createConfiguration(this);
-				if(this.configuration != null) {
+				Configuration configuration = createConfiguration();
+				if (this.configuration != null) {
 					configuration.addAll(this.configuration);
 				}
 				// start it up
-				this.flink = new LocalFlinkMiniCluster(configuration, true);
+				flink = new LocalFlinkMiniCluster(configuration, true);
 				this.flink.start();
 			} else {
 				throw new IllegalStateException("The local executor was already started.");
 			}
 		}
 	}
-
-	/**
-	 * Stop the local executor instance. You should not call executePlan after this.
-	 */
+	
+	@Override
 	public void stop() throws Exception {
-		synchronized (this.lock) {
-			if (this.flink != null) {
-				this.flink.stop();
-				this.flink = null;
-			} else {
-				throw new IllegalStateException("The local executor was not started.");
+		synchronized (lock) {
+			if (flink != null) {
+				flink.stop();
+				flink = null;
 			}
 		}
 	}
 
+	@Override
+	public boolean isRunning() {
+		return flink != null;
+	}
+
 	/**
-	 * Execute the given plan on the local Nephele instance, wait for the job to
-	 * finish and return the runtime in milliseconds.
+	 * Executes the given program on a local runtime and waits for the job to finish.
+	 * 
+	 * <p>If the executor has not been started before, this starts the executor and shuts it down
+	 * after the job finished. If the job runs in session mode, the executor is kept alive until
+	 * no more references to the executor exist.</p>
 	 * 
 	 * @param plan The plan of the program to execute.
 	 * @return The net runtime of the program, in milliseconds.
@@ -145,15 +159,15 @@ public class LocalExecutor extends PlanExecutor {
 		if (plan == null) {
 			throw new IllegalArgumentException("The plan may not be null.");
 		}
-		
+
 		synchronized (this.lock) {
-			
+
 			// check if we start a session dedicated for this execution
 			final boolean shutDownAtEnd;
-			if (this.flink == null) {
-				// we start a session just for us now
+
+			if (flink == null) {
 				shutDownAtEnd = true;
-				
+
 				// configure the number of local slots equal to the parallelism of the local plan
 				if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
 					int maxParallelism = plan.getMaximumParallelism();
@@ -161,9 +175,11 @@ public class LocalExecutor extends PlanExecutor {
 						this.taskManagerNumSlots = maxParallelism;
 					}
 				}
-				
+
+				// start the cluster for us
 				start();
-			} else {
+			}
+			else {
 				// we use the existing session
 				shutDownAtEnd = false;
 			}
@@ -173,10 +189,10 @@ public class LocalExecutor extends PlanExecutor {
 
 				Optimizer pc = new Optimizer(new DataStatistics(), configuration);
 				OptimizedPlan op = pc.compile(plan);
-				
+
 				JobGraphGenerator jgg = new JobGraphGenerator(configuration);
-				JobGraph jobGraph = jgg.compileJobGraph(op);
-				
+				JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
+
 				boolean sysoutPrint = isPrintingStatusDuringExecution();
 				return flink.submitJobAndWait(jobGraph, sysoutPrint);
 			}
@@ -189,32 +205,50 @@ public class LocalExecutor extends PlanExecutor {
 	}
 
 	/**
-	 * Returns a JSON dump of the optimized plan.
-	 * 
-	 * @param plan
-	 *            The program's plan.
-	 * @return JSON dump of the optimized plan.
-	 * @throws Exception
+	 * Creates a JSON representation of the given dataflow's execution plan.
+	 *
+	 * @param plan The dataflow plan.
+	 * @return The dataflow's execution plan, as a JSON string.
+	 * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
 	 */
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		Optimizer pc = new Optimizer(new DataStatistics(), createConfiguration(this));
+		final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+
+		Optimizer pc = new Optimizer(new DataStatistics(), this.configuration);
+		pc.setDefaultParallelism(parallelism);
 		OptimizedPlan op = pc.compile(plan);
-		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-	
-		return gen.getOptimizerPlanAsJSON(op);
+
+		return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
 	}
-	
+
+	@Override
+	public void endSession(JobID jobID) throws Exception {
+		LocalFlinkMiniCluster flink = this.flink;
+		if (flink != null) {
+			ActorGateway leaderGateway = flink.getLeaderGateway(AkkaUtils.getDefaultTimeout());
+			leaderGateway.tell(new JobManagerMessages.RemoveCachedJob(jobID));
+		}
+	}
+
+	private Configuration createConfiguration() {
+		Configuration configuration = new Configuration();
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
+		configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
+		return configuration;
+	}
+
+
 	// --------------------------------------------------------------------------------------------
 	//  Static variants that internally bring up an instance and shut it down after the execution
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * Executes the program described by the given plan assembler.
+	 * Executes the given program.
 	 * 
-	 * @param pa The program's plan assembler. 
+	 * @param pa The program.
 	 * @param args The parameters.
-	 * @return The net runtime of the program, in milliseconds.
+	 * @return The execution result of the program.
 	 * 
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
@@ -222,57 +256,45 @@ public class LocalExecutor extends PlanExecutor {
 	public static JobExecutionResult execute(Program pa, String... args) throws Exception {
 		return execute(pa.getPlan(args));
 	}
-	
+
 	/**
-	 * Executes the program represented by the given Pact plan.
+	 * Executes the given dataflow plan.
 	 * 
-	 * @param plan The program's plan. 
-	 * @return The net runtime of the program, in milliseconds.
+	 * @param plan The dataflow plan. 
+	 * @return The execution result.
 	 * 
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
 	 */
 	public static JobExecutionResult execute(Plan plan) throws Exception {
-		LocalExecutor exec = new LocalExecutor();
-		try {
-			exec.start();
-			return exec.executePlan(plan);
-		} finally {
-			exec.stop();
-		}
+		return new LocalExecutor().executePlan(plan);
 	}
 
 	/**
-	 * Returns a JSON dump of the optimized plan.
+	 * Creates a JSON representation of the given dataflow's execution plan.
 	 * 
-	 * @param plan
-	 *            The program's plan.
-	 * @return JSON dump of the optimized plan.
-	 * @throws Exception
+	 * @param plan The dataflow plan.
+	 * @return The dataflow's execution plan, as a JSON string.
+	 * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
 	 */
 	public static String optimizerPlanAsJSON(Plan plan) throws Exception {
-		LocalExecutor exec = new LocalExecutor();
-		try {
-			exec.start();
-			Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.configuration());
-			OptimizedPlan op = pc.compile(plan);
-			PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-
-			return gen.getOptimizerPlanAsJSON(op);
-		} finally {
-			exec.stop();
-		}
+		final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+
+		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
+		pc.setDefaultParallelism(parallelism);
+		OptimizedPlan op = pc.compile(plan);
+
+		return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
 	}
 
 	/**
-	 * Return unoptimized plan as JSON.
+	 * Creates a JSON representation of the given dataflow plan.
 	 * 
-	 * @param plan The program plan.
-	 * @return The plan as a JSON object.
+	 * @param plan The dataflow plan.
+	 * @return The dataflow plan (prior to optimization) as a JSON string.
 	 */
 	public static String getPlanAsJSON(Plan plan) {
-		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 		List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan);
-		return gen.getPactPlanAsJSON(sinks);
+		return new PlanJSONDumpGenerator().getPactPlanAsJSON(sinks);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 20169f6..e8e9ade 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client;
 
-import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -26,36 +25,41 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
  * and ships it to a remote Flink cluster for execution.
  * 
- * The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
- * set of libraries that need to be shipped together with the program.
+ * <p>The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
+ * set of libraries that need to be shipped together with the program.</p>
  * 
- * The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
- * remotely execute program parts.
+ * <p>The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
+ * remotely execute program parts.</p>
  */
 public class RemoteExecutor extends PlanExecutor {
+		
+	private final Object lock = new Object();
 	
-	private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
-
 	private final List<String> jarFiles;
 
 	private final Configuration clientConfiguration;
+
+	private Client client;
+	
+	private int defaultParallelism = 1;
+	
 	
 	public RemoteExecutor(String hostname, int port) {
 		this(hostname, port, Collections.<String>emptyList(), new Configuration());
@@ -97,51 +101,148 @@ public class RemoteExecutor extends PlanExecutor {
 		clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
 	}
 
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Sets the parallelism that will be used when neither the program does not define
+	 * any parallelism at all.
+	 *
+	 * @param defaultParallelism The default parallelism for the executor.
+	 */
+	public void setDefaultParallelism(int defaultParallelism) {
+		if (defaultParallelism < 1) {
+			throw new IllegalArgumentException("The default parallelism must be at least one");
+		}
+		this.defaultParallelism = defaultParallelism;
+	}
+
+	/**
+	 * Gets the parallelism that will be used when neither the program does not define
+	 * any parallelism at all.
+	 * 
+	 * @return The default parallelism for the executor.
+	 */
+	public int getDefaultParallelism() {
+		return defaultParallelism;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Startup & Shutdown
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public void start() throws Exception {
+		synchronized (lock) {
+			if (client == null) {
+				client = new Client(clientConfiguration);
+				client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
+			}
+			else {
+				throw new IllegalStateException("The remote executor was already started.");
+			}
+		}
+	}
+
+	@Override
+	public void stop() throws Exception {
+		synchronized (lock) {
+			if (client != null) {
+				client.shutdown();
+				client = null;
+			}
+		}
+	}
+
+	@Override
+	public boolean isRunning() {
+		return client != null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Executing programs
+	// ------------------------------------------------------------------------
+
 	@Override
 	public JobExecutionResult executePlan(Plan plan) throws Exception {
+		if (plan == null) {
+			throw new IllegalArgumentException("The plan may not be null.");
+		}
+
 		JobWithJars p = new JobWithJars(plan, this.jarFiles);
 		return executePlanWithJars(p);
 	}
-	
-	public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
-		Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
-		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-		
-		JobSubmissionResult result = c.run(p, -1, true);
-		if (result instanceof JobExecutionResult) {
-			return (JobExecutionResult) result;
-		} else {
-			LOG.warn("The Client didn't return a JobExecutionResult");
-			return new JobExecutionResult(result.getJobID(), -1, null);
+
+	public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
+		if (program == null) {
+			throw new IllegalArgumentException("The job may not be null.");
 		}
-	}
 
-	public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception {
-		File jarFile = new File(jarPath);
-		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
-		
-		Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1);
-		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-		
-		JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
-		if(result instanceof JobExecutionResult) {
-			return (JobExecutionResult) result;
-		} else {
-			LOG.warn("The Client didn't return a JobExecutionResult");
-			return new JobExecutionResult(result.getJobID(), -1, null);
+		synchronized (this.lock) {
+			// check if we start a session dedicated for this execution
+			final boolean shutDownAtEnd;
+
+			if (client == null) {
+				shutDownAtEnd = true;
+				// start the executor for us
+				start();
+			}
+			else {
+				// we use the existing session
+				shutDownAtEnd = false;
+			}
+
+			try {
+				return client.runBlocking(program, defaultParallelism);
+			}
+			finally {
+				if (shutDownAtEnd) {
+					stop();
+				}
+			}
 		}
 	}
 
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		JobWithJars p = new JobWithJars(plan, this.jarFiles);
-		Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
-		
-		OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
-		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-		return jsonGen.getOptimizerPlanAsJSON(op);
+		Optimizer opt = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
+		OptimizedPlan optPlan = opt.compile(plan);
+		return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
 	}
-	
+
+	@Override
+	public void endSession(JobID jobID) throws Exception {
+		if (jobID == null) {
+			throw new NullPointerException("The supplied jobID must not be null.");
+		}
+
+		synchronized (this.lock) {
+			// check if we start a session dedicated for this execution
+			final boolean shutDownAtEnd;
+
+			if (client == null) {
+				shutDownAtEnd = true;
+				// start the executor for us
+				start();
+			}
+			else {
+				// we use the existing session
+				shutDownAtEnd = false;
+			}
+
+			try {
+				client.endSession(jobID);
+			}
+			finally {
+				if (shutDownAtEnd) {
+					stop();
+				}
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Utilities
 	// --------------------------------------------------------------------------------------------
@@ -168,5 +269,4 @@ public class RemoteExecutor extends PlanExecutor {
 		}
 		return new InetSocketAddress(host, port);
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e7464c8..6c886fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.client.program;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.PrintStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -32,8 +31,6 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -65,7 +62,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 import akka.actor.ActorSystem;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Encapsulates the functionality necessary to submit a program to a remote cluster.
@@ -78,62 +74,139 @@ public class Client {
 	 * The configuration to use for the client (optimizer, timeouts, ...) and to connect to the
 	 * JobManager.
 	 */
-	private final Configuration configuration;
-
 	/** The optimizer used in the optimization of batch programs */
-	private final Optimizer compiler;
+	final Optimizer compiler;
+	
+	/** The actor system used to communicate with the JobManager */
+	private final ActorSystem actorSystem;
 
-	/** The class loader to use for classes from the user program (e.g., functions and data types) */
-	private final ClassLoader userCodeClassLoader;
+	/** The actor reference to the JobManager */
+	private final ActorGateway jobManagerGateway;
+
+	/** The timeout for communication between the client and the JobManager */
+	private final FiniteDuration timeout;
+	
+	/**
+	 * If != -1, this field specifies the total number of available slots on the cluster
+	 * connected to the client.
+	 */
+	private final int maxSlots;
 
 	/** Flag indicating whether to sysout print execution updates */
 	private boolean printStatusDuringExecution = true;
 
 	/**
-	 * If != -1, this field specifies the total number of available slots on the cluster
-	 * connected to the client.
+	 *  For interactive invocations, the Job ID is only available after the ContextEnvironment has
+	 *  been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
+	 *  which lets us access the last JobID here.
 	 */
-	private int maxSlots;
+	private JobID lastJobID;
 
-	/** ID of the last job submitted with this client. */
-	private JobID lastJobId = null;
-	
-	
 	// ------------------------------------------------------------------------
 	//                            Construction
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a instance that submits the programs to the JobManager defined in the
-	 * configuration. It sets the maximum number of slots to unknown (= -1).
+	 * configuration. This method will try to resolve the JobManager hostname and throw an exception
+	 * if that is not possible.
 	 *
-	 * @param config The config used to obtain the JobManager's address.
-	 * @param userCodeClassLoader The class loader to use for loading user code classes.
+	 * @param config The config used to obtain the job-manager's address, and used to configure the optimizer.
+	 * 
+	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
 	 */
-	public Client(Configuration config, ClassLoader userCodeClassLoader) {
-		this(config, userCodeClassLoader, -1);
+	public Client(Configuration config) throws IOException {
+		this(config, -1);
 	}
 
 	/**
-	 * Creates a instance that submits the programs to the JobManager defined in the
-	 * configuration.
+	 * Creates a new instance of the class that submits the jobs to a job-manager.
+	 * at the given address using the default port.
 	 * 
-	 * @param config The config used to obtain the JobManager's address.
-	 * @param userCodeClassLoader The class loader to use for loading user code classes.
-	 * @param maxSlots The number of maxSlots on the cluster if != -1
+	 * @param config The configuration for the client-side processes, like the optimizer.
+	 * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1.
+	 *                    
+	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.   
 	 */
-	public Client(Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
-		Preconditions.checkNotNull(config, "Configuration is null");
-		Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
-		
-		this.configuration = config;
-		this.userCodeClassLoader = userCodeClassLoader;
+	public Client(Configuration config, int maxSlots) throws IOException {
 
-		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
 		this.maxSlots = maxSlots;
+
+		LOG.info("Starting client actor system");
+
+		try {
+			this.actorSystem = JobClient.startJobClientActorSystem(config);
+		} catch (Exception e) {
+			throw new IOException("Could start client actor system.", e);
+		}
+
+		// from here on, we need to make sure the actor system is shut down on error
+		boolean success = false;
+
+		try {
+
+			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(config);
+			this.timeout = AkkaUtils.getTimeout(config);
+
+			LOG.info("Looking up JobManager");
+			LeaderRetrievalService leaderRetrievalService;
+
+			try {
+				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+			} catch (Exception e) {
+				throw new IOException("Could not create the leader retrieval service.", e);
+			}
+
+			try {
+				this.jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+						leaderRetrievalService,
+						actorSystem,
+						lookupTimeout);
+			} catch (LeaderRetrievalException e) {
+				throw new IOException("Failed to retrieve JobManager gateway", e);
+			}
+
+			LOG.info("Leading JobManager actor system address is " + this.jobManagerGateway.path());
+
+			LOG.info("JobManager runs at " + this.jobManagerGateway.path());
+
+			LOG.info("Communication between client and JobManager will have a timeout of " + this.timeout);
+			success = true;
+		} finally {
+			if (!success) {
+				try {
+					this.actorSystem.shutdown();
+
+					// wait at most for 30 seconds, to work around an occasional akka problem
+					actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
+				} catch (Throwable t) {
+					LOG.error("Shutting down actor system after error caused another error", t);
+				}
+			}
+		}
 	}
+	// ------------------------------------------------------------------------
+	//  Startup & Shutdown
+	// ------------------------------------------------------------------------
 
 	/**
+	 * Shuts down the client. This stops the internal actor system and actors.
+	 */
+	public void shutdown() {
+		if (!this.actorSystem.isTerminated()) {
+			this.actorSystem.shutdown();
+			this.actorSystem.awaitTermination();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Configuration
+	// ------------------------------------------------------------------------
+	
+	/**
 	 * Configures whether the client should print progress updates during the execution to {@code System.out}.
 	 * All updates are logged via the SLF4J loggers regardless of this setting.
 	 * 
@@ -159,118 +232,84 @@ public class Client {
 	}
 	
 	// ------------------------------------------------------------------------
-	//                      Compilation and Submission
+	//  Access to the Program's Plan
 	// ------------------------------------------------------------------------
 	
-	public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+	public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
+			throws CompilerException, ProgramInvocationException
+	{
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-		return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(prog, parallelism));
+		return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
 	}
-	
-	public FlinkPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+
+	public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
+			throws CompilerException, ProgramInvocationException
+	{
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
-			return getOptimizedPlan(prog.getPlanWithJars(), parallelism);
-		}
-		else if (prog.isUsingInteractiveMode()) {
+			return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
+		} else if (prog.isUsingInteractiveMode()) {
 			// temporary hack to support the optimizer plan preview
-			OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler);
+			OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
 			if (parallelism > 0) {
 				env.setParallelism(parallelism);
 			}
-			env.setAsContext();
-			
-			// temporarily write syserr and sysout to a byte array.
-			PrintStream originalOut = System.out;
-			PrintStream originalErr = System.err;
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			System.setOut(new PrintStream(baos));
-			ByteArrayOutputStream baes = new ByteArrayOutputStream();
-			System.setErr(new PrintStream(baes));
-			try {
-				ContextEnvironment.enableLocalExecution(false);
-				prog.invokeInteractiveModeForExecution();
-			}
-			catch (ProgramInvocationException e) {
-				throw e;
-			}
-			catch (Throwable t) {
-				// the invocation gets aborted with the preview plan
-				if (env.optimizerPlan != null) {
-					return env.optimizerPlan;
-				} else {
-					throw new ProgramInvocationException("The program caused an error: ", t);
-				}
-			}
-			finally {
-				ContextEnvironment.enableLocalExecution(true);
-				System.setOut(originalOut);
-				System.setErr(originalErr);
-				System.err.println(baes);
-				System.out.println(baos);
-			}
-			
-			throw new ProgramInvocationException(
-					"The program plan could not be fetched - the program aborted pre-maturely.\n"
-					+ "System.err: " + baes.toString() + '\n'
-					+ "System.out: " + baos.toString() + '\n');
-		}
-		else {
-			throw new RuntimeException();
+
+			return env.getOptimizedPlan(prog);
+		} else {
+			throw new RuntimeException("Couldn't determine program mode.");
 		}
 	}
-	
-	public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
+
+	public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
 		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-			LOG.debug("Changing plan default parallelism from {} to {}",p.getDefaultParallelism(), parallelism);
+			LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
 			p.setDefaultParallelism(parallelism);
 		}
 		LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
 
-		return this.compiler.compile(p);
-	}
-	
-	
-	/**
-	 * Creates the optimized plan for a given program, using this client's compiler.
-	 *  
-	 * @param prog The program to be compiled.
-	 * @return The compiled and optimized plan, as returned by the compiler.
-	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
-	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
-	 */
-	public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
-		return getOptimizedPlan(prog.getPlan(), parallelism);
-	}
-	
-	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
-		return getJobGraph(optPlan, prog.getAllLibraries());
+		return compiler.compile(p);
 	}
-	
-	private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
-		JobGraph job;
-		if (optPlan instanceof StreamingPlan) {
-			job = ((StreamingPlan) optPlan).getJobGraph();
-		} else {
-			JobGraphGenerator gen = new JobGraphGenerator(this.configuration);
-			job = gen.compileJobGraph((OptimizedPlan) optPlan);
-		}
 
-		for (File jar : jarFiles) {
-			job.addJar(new Path(jar.getAbsolutePath()));
+	// ------------------------------------------------------------------------
+	//  Program submission / execution
+	// ------------------------------------------------------------------------
+
+	public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
+		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+		if (prog.isUsingProgramEntryPoint()) {
+			return runBlocking(prog.getPlanWithJars(), parallelism);
 		}
+		else if (prog.isUsingInteractiveMode()) {
+			LOG.info("Starting program in interactive mode");
+			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true);
+			ContextEnvironment.enableLocalExecution(false);
 
-		return job;
+			// invoke here
+			try {
+				prog.invokeInteractiveModeForExecution();
+			}
+			finally {
+				ContextEnvironment.enableLocalExecution(true);
+			}
+
+			return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
+		}
+		else {
+			throw new RuntimeException();
+		}
 	}
 
-	public JobSubmissionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
+	public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism)
+			throws ProgramInvocationException
+	{
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
-			return run(prog.getPlanWithJars(), parallelism, wait);
+			return runDetached(prog.getPlanWithJars(), parallelism);
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
-			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait);
+			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false);
 			ContextEnvironment.enableLocalExecution(false);
 
 			// invoke here
@@ -281,113 +320,108 @@ public class Client {
 				ContextEnvironment.enableLocalExecution(true);
 			}
 
-			// Job id has been set in the Client passed to the ContextEnvironment
-			return new JobSubmissionResult(lastJobId);
+			return new JobSubmissionResult(lastJobID);
 		}
 		else {
-			throw new RuntimeException();
+			throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
 		}
 	}
-	
-	public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
-		return run(optimizedPlan, prog.getAllLibraries(), wait);
 
-	}
-	
 	/**
-	 * Runs a program on Flink cluster whose job-manager is configured in this client's configuration.
-	 * This method involves all steps, from compiling, job-graph generation to submission.
-	 * 
-	 * @param prog The program to be executed.
+	 * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
+	 * execution is complete, and returns afterwards.
+	 *
+	 * @param program The program to be executed.
 	 * @param parallelism The default parallelism to use when running the program. The default parallelism is used
 	 *                    when the program does not set a parallelism by itself.
-	 * @param wait A flag that indicates whether this function call should block until the program execution is done.
+	 *
 	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
 	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
 	 *                                    or if the submission failed. That might be either due to an I/O problem,
 	 *                                    i.e. the job-manager is unreachable, or due to the fact that the
 	 *                                    parallel execution failed.
 	 */
-	public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
-		return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
+	public JobExecutionResult runBlocking(JobWithJars program, int parallelism) 
+			throws CompilerException, ProgramInvocationException
+	{
+		ClassLoader classLoader = program.getUserCodeClassLoader();
+		if (classLoader == null) {
+			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+		}
+
+		OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
+		return runBlocking(optPlan, program.getJarFiles(), classLoader);
+	}
+
+	/**
+	 * Submits a program to the Flink cluster to which this client is connected. The call returns after the
+	 * program was submitted and does not wait for the program to complete.
+	 *
+	 * @param program The program to be executed.
+	 * @param parallelism The default parallelism to use when running the program. The default parallelism is used
+	 *                    when the program does not set a parallelism by itself.
+	 *
+	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
+	 *                                    or if the submission failed. That might be either due to an I/O problem,
+	 *                                    i.e. the job-manager is unreachable.
+	 */
+	public JobSubmissionResult runDetached(JobWithJars program, int parallelism)
+			throws CompilerException, ProgramInvocationException
+	{
+		ClassLoader classLoader = program.getUserCodeClassLoader();
+		if (classLoader == null) {
+			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+		}
+
+		OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism);
+		return runDetached(optimizedPlan, program.getJarFiles(), classLoader);
 	}
 	
 
-	public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
+	public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
+			throws ProgramInvocationException
+	{
 		JobGraph job = getJobGraph(compiledPlan, libraries);
-		this.lastJobId = job.getJobID();
-		return run(job, wait);
+		return runBlocking(job, classLoader);
 	}
 
-	public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
-		this.lastJobId = jobGraph.getJobID();
-		
-		LOG.info("Starting client actor system");
-		final ActorSystem actorSystem;
+	public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
+			throws ProgramInvocationException
+	{
+		JobGraph job = getJobGraph(compiledPlan, libraries);
+		return runDetached(job, classLoader);
+	}
+
+	public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		LOG.info("Checking and uploading JAR files");
 		try {
-			actorSystem = JobClient.startJobClientActorSystem(configuration);
+			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+		} catch (IOException e) {
+			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
 		}
-		catch (Exception e) {
-			throw new ProgramInvocationException("Could start client actor system.", e);
+		try {
+			this.lastJobID = jobGraph.getJobID();
+			return JobClient.submitJobAndWait(actorSystem, jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, classLoader);
+		} catch (JobExecutionException e) {
+			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
 		}
+	}
 
+	public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		LOG.info("Checking and uploading JAR files");
 		try {
-			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-			FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-
-			LOG.info("Looking up JobManager");
-			ActorGateway jobManagerGateway;
-
-			LeaderRetrievalService leaderRetrievalService;
-
-			try {
-				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
-			}
-
-			try {
-				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-						leaderRetrievalService,
-						actorSystem,
-						lookupTimeout);
-			} catch (LeaderRetrievalException e) {
-				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
-			}
-
-			LOG.info("Leading JobManager actor system address is " + jobManagerGateway.path());
-
-			LOG.info("JobManager runs at " + jobManagerGateway.path());
-
-			LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
-
-			LOG.info("Checking and uploading JAR files");
-			try {
-				JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
-			} catch (IOException e) {
-				throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
-			}
-
-			try {
-				if (wait) {
-					return JobClient.submitJobAndWait(actorSystem,
-						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader);
-				} else {
-					JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader);
-					// return a dummy execution result with the JobId
-					return new JobSubmissionResult(jobGraph.getJobID());
-				}
-			} catch (JobExecutionException e) {
+			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+		}
+		catch (IOException e) {
+			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
+		}
+		try {
+			this.lastJobID = jobGraph.getJobID();
+			JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader);
+			return new JobSubmissionResult(jobGraph.getJobID());
+		} catch (JobExecutionException e) {
 				throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Exception during program execution.", e);
-			}
-		} finally {
-			// shut down started actor system
-			actorSystem.shutdown();
-			
-			// wait at most for 30 seconds, to work around an occasional akka problem
-			actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
 		}
 	}
 
@@ -397,62 +431,26 @@ public class Client {
 	 * @throws Exception In case an error occurred.
 	 */
 	public void cancel(JobID jobId) throws Exception {
-		final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-		final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
-		ActorSystem actorSystem;
+		Future<Object> response;
 		try {
-			actorSystem = JobClient.startJobClientActorSystem(configuration);
+			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
 		} catch (Exception e) {
-			throw new ProgramInvocationException("Could start client actor system.", e);
+			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
 		}
 
-		try {
-			ActorGateway jobManagerGateway;
-
-			LeaderRetrievalService leaderRetrievalService;
+		Object result = Await.result(response, timeout);
 
-			try {
-				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
-			}
-
-			try {
-				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-						leaderRetrievalService,
-						actorSystem,
-						lookupTimeout);
-			} catch (LeaderRetrievalException e) {
-				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
-			}
-
-			Future<Object> response;
-			try {
-				response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
-			}
-			
-			Object result = Await.result(response, timeout);
-
-			if (result instanceof JobManagerMessages.CancellationSuccess) {
-				LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
-			} else if (result instanceof JobManagerMessages.CancellationFailure) {
-				Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
-				LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
-				throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
-			} else {
-				throw new Exception("Unknown message received while cancelling.");
-			}
-		} finally {
-			// shut down started actor system
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
+		if (result instanceof JobManagerMessages.CancellationSuccess) {
+			LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
+		} else if (result instanceof JobManagerMessages.CancellationFailure) {
+			Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
+			LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
+			throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
+		} else {
+			throw new Exception("Unknown message received while cancelling.");
 		}
 	}
 
-
 	/**
 	 * Requests and returns the accumulators for the given job identifier. Accumulators can be
 	 * requested while a is running or after it has finished. The default class loader is used
@@ -473,117 +471,98 @@ public class Client {
 	 */
 	public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
 
-		final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-		final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
-		ActorSystem actorSystem;
+		Future<Object> response;
 		try {
-			actorSystem = JobClient.startJobClientActorSystem(configuration);
+			response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
 		} catch (Exception e) {
-			throw new Exception("Could start client actor system.", e);
+			throw new Exception("Failed to query the job manager gateway for accumulators.", e);
 		}
 
-		try {
-			ActorGateway jobManagerGateway;
-
-			LeaderRetrievalService leaderRetrievalService;
-
-			try {
-				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
-			}
-
-			try {
-				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-						leaderRetrievalService,
-						actorSystem,
-						lookupTimeout);
-			} catch (LeaderRetrievalException e) {
-				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
-			}
-
-			Future<Object> response;
-			try {
-				response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
-			} catch (Exception e) {
-				throw new Exception("Failed to query the job manager gateway for accumulators.", e);
-			}
-
-			Object result = Await.result(response, timeout);
+		Object result = Await.result(response, timeout);
 
-			if (result instanceof AccumulatorResultsFound) {
-				Map<String, SerializedValue<Object>> serializedAccumulators =
-						((AccumulatorResultsFound) result).result();
+		if (result instanceof AccumulatorResultsFound) {
+			Map<String, SerializedValue<Object>> serializedAccumulators =
+					((AccumulatorResultsFound) result).result();
 
-				return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
+			return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
 
-			} else if (result instanceof AccumulatorResultsErroneous) {
-				throw ((AccumulatorResultsErroneous) result).cause();
-			} else {
-				throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
-			}
-		} finally {
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
+		} else if (result instanceof AccumulatorResultsErroneous) {
+			throw ((AccumulatorResultsErroneous) result).cause();
+		} else {
+			throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
 		}
 	}
 
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Sessions
+	// ------------------------------------------------------------------------
 	
-	public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
-		
-		private final Optimizer compiler;
-		
-		private FlinkPlan optimizerPlan;
-		
-		
-		private OptimizerPlanEnvironment(Optimizer compiler) {
-			this.compiler = compiler;
-		}
-		
-		@Override
-		public JobExecutionResult execute(String jobName) throws Exception {
-			Plan plan = createProgramPlan(jobName);
-			this.optimizerPlan = compiler.compile(plan);
-			
-			// do not go on with anything now!
-			throw new ProgramAbortException();
+	/**
+	 * Tells the JobManager to finish the session (job) defined by the given ID.
+	 * 
+	 * @param jobId The ID that identifies the session.
+	 */
+	public void endSession(JobID jobId) throws Exception {
+		if (jobId == null) {
+			throw new IllegalArgumentException("The JobID must not be null.");
 		}
+		endSessions(Collections.singletonList(jobId));
+	}
 
-		@Override
-		public String getExecutionPlan() throws Exception {
-			Plan plan = createProgramPlan(null, false);
-			this.optimizerPlan = compiler.compile(plan);
-			
-			// do not go on with anything now!
-			throw new ProgramAbortException();
-		}
-		
-		private void setAsContext() {
-			ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
-				
-				@Override
-				public ExecutionEnvironment createExecutionEnvironment() {
-					return OptimizerPlanEnvironment.this;
-				}
-			};
-			initializeContextEnvironment(factory);
+	/**
+	 * Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
+	 *
+	 * @param jobIds The IDs that identify the sessions.
+	 */
+	public void endSessions(List<JobID> jobIds) throws Exception {
+		if (jobIds == null) {
+			throw new IllegalArgumentException("The JobIDs must not be null");
 		}
 		
-		public void setPlan(FlinkPlan plan){
-			this.optimizerPlan = plan;
+		for (JobID jid : jobIds) {
+			if (jid != null) {
+				LOG.info("Telling job manager to end the session {}.", jid);
+				jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
+			}
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Internal translation methods
+	// ------------------------------------------------------------------------
 
 	/**
-	 * A special exception used to abort programs when the caller is only interested in the
-	 * program plan, rather than in the full execution.
+	 * Creates the optimized plan for a given program, using this client's compiler.
+	 *
+	 * @param prog The program to be compiled.
+	 * @return The compiled and optimized plan, as returned by the compiler.
+	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
 	 */
-	public static final class ProgramAbortException extends Error {
-		private static final long serialVersionUID = 1L;
+	private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException,
+																					ProgramInvocationException {
+		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
 	}
+
+	public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
+		return getJobGraph(optPlan, prog.getAllLibraries());
+	}
+
+	private static JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
+		JobGraph job;
+		if (optPlan instanceof StreamingPlan) {
+			job = ((StreamingPlan) optPlan).getJobGraph();
+		} else {
+			JobGraphGenerator gen = new JobGraphGenerator();
+			job = gen.compileJobGraph((OptimizedPlan) optPlan);
+		}
+
+		for (File jar : jarFiles) {
+			job.addJar(new Path(jar.getAbsolutePath()));
+		}
+
+		return job;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9287017..ad14a06 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -39,15 +40,14 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
 
 	private final Client client;
-	
+
 	private final List<File> jarFilesToAttach;
-	
+
 	private final ClassLoader userCodeClassLoader;
 
 	private final boolean wait;
-	
-	
-	
+
+
 	public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
@@ -60,27 +60,33 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);
 
-		JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
-		if(result instanceof JobExecutionResult) {
-			this.lastJobExecutionResult = (JobExecutionResult) result;
-			return (JobExecutionResult) result;
-		} else {
-			LOG.warn("The Client didn't return a JobExecutionResult");
-			this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null);
+		if (wait) {
+			this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
+			return this.lastJobExecutionResult;
+		}
+		else {
+			JobSubmissionResult result = client.runDetached(toRun, getParallelism());
+			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+			this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result);
 			return this.lastJobExecutionResult;
 		}
 	}
 
 	@Override
 	public String getExecutionPlan() throws Exception {
-		Plan p = createProgramPlan("unnamed job");
-		
-		OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getParallelism());
+		Plan plan = createProgramPlan("unnamed job");
 
+		OptimizedPlan op = Client.getOptimizedPlan(client.compiler, plan, getParallelism());
 		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 		return gen.getOptimizerPlanAsJSON(op);
 	}
 
+	@Override
+	public void startNewSession() throws Exception {
+		client.endSession(jobID);
+		jobID = JobID.generate();
+	}
+
 	public boolean isWait() {
 		return wait;
 	}
@@ -104,7 +110,9 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	static void setAsContext(Client client, List<File> jarFilesToAttach, 
 				ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
 	{
-		initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait));
+		ContextEnvironmentFactory factory =
+				new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait);
+		initializeContextEnvironment(factory);
 	}
 	
 	protected static void enableLocalExecution(boolean enabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b86487f..9e84e2d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.program;
 
 import java.io.File;
@@ -30,6 +29,10 @@ import java.util.List;
 
 import org.apache.flink.api.common.Plan;
 
+/**
+ * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain
+ * the classes of the functions and libraries necessary for the execution.
+ */
 public class JobWithJars {
 	
 	private Plan plan;

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
new file mode 100644
index 0000000..c9c3b45
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+public class OptimizerPlanEnvironment extends ExecutionEnvironment {
+
+	private final Optimizer compiler;
+
+	private FlinkPlan optimizerPlan;
+
+	public OptimizerPlanEnvironment(Optimizer compiler) {
+		this.compiler = compiler;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Execution Environment methods
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		Plan plan = createProgramPlan(jobName);
+		this.optimizerPlan = compiler.compile(plan);
+
+		// do not go on with anything now!
+		throw new ProgramAbortException();
+	}
+
+	@Override
+	public String getExecutionPlan() throws Exception {
+		Plan plan = createProgramPlan(null, false);
+		this.optimizerPlan = compiler.compile(plan);
+
+		// do not go on with anything now!
+		throw new ProgramAbortException();
+	}
+
+	@Override
+	public void startNewSession() {
+		// do nothing
+	}
+
+	public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
+		setAsContext();
+
+		// temporarily write syserr and sysout to a byte array.
+		PrintStream originalOut = System.out;
+		PrintStream originalErr = System.err;
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		System.setOut(new PrintStream(baos));
+		ByteArrayOutputStream baes = new ByteArrayOutputStream();
+		System.setErr(new PrintStream(baes));
+		try {
+			ContextEnvironment.enableLocalExecution(false);
+			prog.invokeInteractiveModeForExecution();
+		}
+		catch (ProgramInvocationException e) {
+			throw e;
+		}
+		catch (Throwable t) {
+			// the invocation gets aborted with the preview plan
+			if (optimizerPlan != null) {
+				return optimizerPlan;
+			} else {
+				throw new ProgramInvocationException("The program caused an error: ", t);
+			}
+		}
+		finally {
+			ContextEnvironment.enableLocalExecution(true);
+			System.setOut(originalOut);
+			System.setErr(originalErr);
+			System.err.println(baes);
+			System.out.println(baos);
+		}
+
+		throw new ProgramInvocationException(
+				"The program plan could not be fetched - the program aborted pre-maturely.\n"
+						+ "System.err: " + baes.toString() + '\n'
+						+ "System.out: " + baos.toString() + '\n');
+	}
+	// ------------------------------------------------------------------------
+
+	private void setAsContext() {
+		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+
+			@Override
+			public ExecutionEnvironment createExecutionEnvironment() {
+				return OptimizerPlanEnvironment.this;
+			}
+		};
+		initializeContextEnvironment(factory);
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public void setPlan(FlinkPlan plan){
+		this.optimizerPlan = plan;
+	}
+
+	/**
+	 * A special exception used to abort programs when the caller is only interested in the
+	 * program plan, rather than in the full execution.
+	 */
+	public static final class ProgramAbortException extends Error {
+		private static final long serialVersionUID = 1L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 10096da..091a959 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -40,12 +40,9 @@ import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
@@ -166,7 +163,7 @@ public class PackagedProgram {
 		}
 	}
 	
-	public PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
+	PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
 		this.jarFile = null;
 		this.args = args == null ? new String[0] : args;
 		
@@ -685,51 +682,5 @@ public class PackagedProgram {
 			throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static final class PreviewPlanEnvironment extends ExecutionEnvironment {
-
-		private List<DataSinkNode> previewPlan;
-		private Plan plan;
-		
-		private String preview = null;
-		
-		@Override
-		public JobExecutionResult execute(String jobName) throws Exception {
-			this.plan = createProgramPlan(jobName);
-			this.previewPlan = Optimizer.createPreOptimizedPlan((Plan) plan);
-			
-			// do not go on with anything now!
-			throw new Client.ProgramAbortException();
-		}
 
-		@Override
-		public String getExecutionPlan() throws Exception {
-			Plan plan = createProgramPlan("unused");
-			this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
-			
-			// do not go on with anything now!
-			throw new Client.ProgramAbortException();
-		}
-		
-		public void setAsContext() {
-			ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
-				@Override
-				public ExecutionEnvironment createExecutionEnvironment() {
-					return PreviewPlanEnvironment.this;
-				}
-			};
-			initializeContextEnvironment(factory);
-		}
-
-		public Plan getPlan() {
-			return this.plan;
-		}
-		
-		public void setPreview(String preview) {
-			this.preview = preview;
-		}
-
-	}
 }


Mime
View raw message