flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [06/10] flink git commit: [FLINK-3667] refactor client communication classes
Date Fri, 17 Jun 2016 08:45:20 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
deleted file mode 100644
index dcf542a..0000000
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.program;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
-import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorSystem;
-
-/**
- * Encapsulates the functionality necessary to submit a program to a remote cluster.
- */
-public class Client {
-
-	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-
-	/** The optimizer used in the optimization of batch programs */
-	final Optimizer compiler;
-
-	/** The actor system used to communicate with the JobManager */
-	private final ActorSystem actorSystem;
-
-	/** Configuration of the client */
-	private final Configuration config;
-
-	/** Timeout for futures */
-	private final FiniteDuration timeout;
-
-	/** Lookup timeout for the job manager retrieval service */
-	private final FiniteDuration lookupTimeout;
-
-	/**
-	 * If != -1, this field specifies the total number of available slots on the cluster
-	 * connected to the client.
-	 */
-	private final int maxSlots;
-
-	/** Flag indicating whether to sysout print execution updates */
-	private boolean printStatusDuringExecution = true;
-
-	/**
-	 * For interactive invocations, the Job ID is only available after the ContextEnvironment has
-	 * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
-	 * which lets us access the last JobID here.
-	 */
-	private JobID lastJobID;
-
-	// ------------------------------------------------------------------------
-	//                            Construction
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a instance that submits the programs to the JobManager defined in the
-	 * configuration. This method will try to resolve the JobManager hostname and throw an exception
-	 * if that is not possible.
-	 *
-	 * @param config The config used to obtain the job-manager's address, and used to configure the optimizer.
-	 *
-	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
-	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
-	 */
-	public Client(Configuration config) throws IOException {
-		this(config, -1);
-	}
-
-	/**
-	 * Creates a new instance of the class that submits the jobs to a job-manager.
-	 * at the given address using the default port.
-	 *
-	 * @param config The configuration for the client-side processes, like the optimizer.
-	 * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1.
-	 *
-	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
-	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
-	 */
-	public Client(Configuration config, int maxSlots) throws IOException {
-		this.config = Preconditions.checkNotNull(config);
-		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
-		this.maxSlots = maxSlots;
-
-		LOG.info("Starting client actor system");
-
-		try {
-			this.actorSystem = JobClient.startJobClientActorSystem(config);
-		} catch (Exception e) {
-			throw new IOException("Could start client actor system.", e);
-		}
-
-		timeout = AkkaUtils.getClientTimeout(config);
-		lookupTimeout = AkkaUtils.getLookupTimeout(config);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Startup & Shutdown
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Shuts down the client. This stops the internal actor system and actors.
-	 */
-	public void shutdown() {
-		if (!this.actorSystem.isTerminated()) {
-			this.actorSystem.shutdown();
-			this.actorSystem.awaitTermination();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Configuration
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Configures whether the client should print progress updates during the execution to {@code System.out}.
-	 * All updates are logged via the SLF4J loggers regardless of this setting.
-	 *
-	 * @param print True to print updates to standard out during execution, false to not print them.
-	 */
-	public void setPrintStatusDuringExecution(boolean print) {
-		this.printStatusDuringExecution = print;
-	}
-
-	/**
-	 * @return whether the client will print progress updates during the execution to {@code System.out}
-	 */
-	public boolean getPrintStatusDuringExecution() {
-		return this.printStatusDuringExecution;
-	}
-
-	/**
-	 * @return -1 if unknown. The maximum number of available processing slots at the Flink cluster
-	 * connected to this client.
-	 */
-	public int getMaxSlots() {
-		return this.maxSlots;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Access to the Program's Plan
-	// ------------------------------------------------------------------------
-
-	public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
-			throws CompilerException, ProgramInvocationException
-	{
-		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-		return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
-	}
-
-	public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
-			throws CompilerException, ProgramInvocationException
-	{
-		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-		if (prog.isUsingProgramEntryPoint()) {
-			return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
-		} else if (prog.isUsingInteractiveMode()) {
-			// temporary hack to support the optimizer plan preview
-			OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
-			if (parallelism > 0) {
-				env.setParallelism(parallelism);
-			}
-
-			return env.getOptimizedPlan(prog);
-		} else {
-			throw new RuntimeException("Couldn't determine program mode.");
-		}
-	}
-
-	public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
-		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-			LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
-			p.setDefaultParallelism(parallelism);
-		}
-		LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
-
-		return compiler.compile(p);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Program submission / execution
-	// ------------------------------------------------------------------------
-
-	public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
-		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-		if (prog.isUsingProgramEntryPoint()) {
-			return runBlocking(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
-		}
-		else if (prog.isUsingInteractiveMode()) {
-			LOG.info("Starting program in interactive mode");
-			ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(),
-					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true,
-					prog.getSavepointPath()));
-
-			// invoke here
-			try {
-				prog.invokeInteractiveModeForExecution();
-			}
-			finally {
-				ContextEnvironment.unsetContext();
-			}
-
-			return new JobSubmissionResult(lastJobID);
-		}
-		else {
-			throw new RuntimeException();
-		}
-	}
-
-	public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism)
-			throws ProgramInvocationException
-	{
-		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-		if (prog.isUsingProgramEntryPoint()) {
-			return runDetached(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
-		}
-		else if (prog.isUsingInteractiveMode()) {
-			LOG.info("Starting program in interactive mode");
-			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
-					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false,
-					prog.getSavepointPath());
-			ContextEnvironment.setAsContext(factory);
-
-			// invoke here
-			try {
-				prog.invokeInteractiveModeForExecution();
-				return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
-			}
-			finally {
-				ContextEnvironment.unsetContext();
-			}
-		}
-		else {
-			throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
-		}
-	}
-
-	public JobExecutionResult runBlocking(JobWithJars program, int parallelism) throws ProgramInvocationException {
-		return runBlocking(program, parallelism, null);
-	}
-
-	/**
-	 * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
-	 * execution is complete, and returns afterwards.
-	 *
-	 * @param program The program to be executed.
-	 * @param parallelism The default parallelism to use when running the program. The default parallelism is used
-	 *                    when the program does not set a parallelism by itself.
-	 *
-	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
-	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
-	 *                                    or if the submission failed. That might be either due to an I/O problem,
-	 *                                    i.e. the job-manager is unreachable, or due to the fact that the
-	 *                                    parallel execution failed.
-	 */
-	public JobExecutionResult runBlocking(JobWithJars program, int parallelism, String savepointPath)
-			throws CompilerException, ProgramInvocationException {
-		ClassLoader classLoader = program.getUserCodeClassLoader();
-		if (classLoader == null) {
-			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
-		}
-
-		OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
-		return runBlocking(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath);
-	}
-
-	public JobSubmissionResult runDetached(JobWithJars program, int parallelism) throws ProgramInvocationException {
-		return runDetached(program, parallelism, null);
-	}
-
-	/**
-	 * Submits a program to the Flink cluster to which this client is connected. The call returns after the
-	 * program was submitted and does not wait for the program to complete.
-	 *
-	 * @param program The program to be executed.
-	 * @param parallelism The default parallelism to use when running the program. The default parallelism is used
-	 *                    when the program does not set a parallelism by itself.
-	 *
-	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
-	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
-	 *                                    or if the submission failed. That might be either due to an I/O problem,
-	 *                                    i.e. the job-manager is unreachable.
-	 */
-	public JobSubmissionResult runDetached(JobWithJars program, int parallelism, String savepointPath)
-			throws CompilerException, ProgramInvocationException {
-		ClassLoader classLoader = program.getUserCodeClassLoader();
-		if (classLoader == null) {
-			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
-		}
-
-		OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism);
-		return runDetached(optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath);
-	}
-
-	public JobExecutionResult runBlocking(
-			FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
-		return runBlocking(compiledPlan, libraries, classpaths, classLoader, null);
-	}
-
-	public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
-			ClassLoader classLoader, String savepointPath) throws ProgramInvocationException
-	{
-		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
-		return runBlocking(job, classLoader);
-	}
-
-	public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
-		return runDetached(compiledPlan, libraries, classpaths, classLoader, null);
-	}
-
-	public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
-			ClassLoader classLoader, String savepointPath) throws ProgramInvocationException
-	{
-		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
-		return runDetached(job, classLoader);
-	}
-
-	public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		LeaderRetrievalService leaderRetrievalService;
-		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
-		} catch (Exception e) {
-			throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
-		}
-
-		try {
-			this.lastJobID = jobGraph.getJobID();
-			return JobClient.submitJobAndWait(actorSystem, leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
-		} catch (JobExecutionException e) {
-			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
-		}
-	}
-
-	public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		ActorGateway jobManagerGateway;
-
-		try {
-			jobManagerGateway = getJobManagerGateway();
-		} catch (Exception e) {
-			throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
-		}
-
-		LOG.info("Checking and uploading JAR files");
-		try {
-			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
-		}
-		catch (IOException e) {
-			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
-		}
-		try {
-			this.lastJobID = jobGraph.getJobID();
-			JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader);
-			return new JobSubmissionResult(jobGraph.getJobID());
-		} catch (JobExecutionException e) {
-				throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
-		}
-	}
-
-	/**
-	 * Cancels a job identified by the job id.
-	 * @param jobId the job id
-	 * @throws Exception In case an error occurred.
-	 */
-	public void cancel(JobID jobId) throws Exception {
-		final ActorGateway jobManagerGateway = getJobManagerGateway();
-
-		final Future<Object> response;
-		try {
-			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
-		} catch (final Exception e) {
-			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
-		}
-
-		final Object result = Await.result(response, timeout);
-
-		if (result instanceof JobManagerMessages.CancellationSuccess) {
-			LOG.info("Job cancellation with ID " + jobId + " succeeded.");
-		} else if (result instanceof JobManagerMessages.CancellationFailure) {
-			final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
-			LOG.info("Job cancellation with ID " + jobId + " failed.", t);
-			throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
-		} else {
-			throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
-		}
-	}
-
-	/**
-	 * Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
-	 * Stopping works only for streaming programs. Be aware, that the program might continue to run for
-	 * a while after sending the stop command, because after sources stopped to emit data all operators
-	 * need to finish processing.
-	 * 
-	 * @param jobId
-	 *            the job ID of the streaming program to stop
-	 * @throws Exception
-	 *             If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal
-	 *             failed. That might be due to an I/O problem, ie, the job-manager is unreachable.
-	 */
-	public void stop(final JobID jobId) throws Exception {
-		final ActorGateway jobManagerGateway = getJobManagerGateway();
-
-		final Future<Object> response;
-		try {
-			response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout);
-		} catch (final Exception e) {
-			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
-		}
-
-		final Object result = Await.result(response, timeout);
-
-		if (result instanceof JobManagerMessages.StoppingSuccess) {
-			LOG.info("Job stopping with ID " + jobId + " succeeded.");
-		} else if (result instanceof JobManagerMessages.StoppingFailure) {
-			final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause();
-			LOG.info("Job stopping with ID " + jobId + " failed.", t);
-			throw new Exception("Failed to stop the job because of \n" + t.getMessage());
-		} else {
-			throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
-		}
-	}
-
-	/**
-	 * Requests and returns the accumulators for the given job identifier. Accumulators can be
-	 * requested while a is running or after it has finished. The default class loader is used
-	 * to deserialize the incoming accumulator results.
-	 * @param jobID The job identifier of a job.
-	 * @return A Map containing the accumulator's name and its value.
-	 */
-	public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
-		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
-	}
-
-	/**
-	 * Requests and returns the accumulators for the given job identifier. Accumulators can be
-	 * requested while a is running or after it has finished.
-	 * @param jobID The job identifier of a job.
-	 * @param loader The class loader for deserializing the accumulator results.
-	 * @return A Map containing the accumulator's name and its value.
-	 */
-	public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
-		ActorGateway jobManagerGateway = getJobManagerGateway();
-
-		Future<Object> response;
-		try {
-			response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
-		} catch (Exception e) {
-			throw new Exception("Failed to query the job manager gateway for accumulators.", e);
-		}
-
-		Object result = Await.result(response, timeout);
-
-		if (result instanceof AccumulatorResultsFound) {
-			Map<String, SerializedValue<Object>> serializedAccumulators =
-					((AccumulatorResultsFound) result).result();
-
-			return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
-
-		} else if (result instanceof AccumulatorResultsErroneous) {
-			throw ((AccumulatorResultsErroneous) result).cause();
-		} else {
-			throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
-		}
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  Sessions
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Tells the JobManager to finish the session (job) defined by the given ID.
-	 * 
-	 * @param jobId The ID that identifies the session.
-	 */
-	public void endSession(JobID jobId) throws Exception {
-		if (jobId == null) {
-			throw new IllegalArgumentException("The JobID must not be null.");
-		}
-		endSessions(Collections.singletonList(jobId));
-	}
-
-	/**
-	 * Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
-	 *
-	 * @param jobIds The IDs that identify the sessions.
-	 */
-	public void endSessions(List<JobID> jobIds) throws Exception {
-		if (jobIds == null) {
-			throw new IllegalArgumentException("The JobIDs must not be null");
-		}
-
-		ActorGateway jobManagerGateway = getJobManagerGateway();
-		
-		for (JobID jid : jobIds) {
-			if (jid != null) {
-				LOG.info("Telling job manager to end the session {}.", jid);
-				jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Internal translation methods
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates the optimized plan for a given program, using this client's compiler.
-	 *
-	 * @param prog The program to be compiled.
-	 * @return The compiled and optimized plan, as returned by the compiler.
-	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
-	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
-	 */
-	private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
-			throws CompilerException, ProgramInvocationException {
-		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
-	}
-
-	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
-		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null);
-	}
-
-	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException {
-		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath);
-	}
-
-	private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, String savepointPath) {
-		JobGraph job;
-		if (optPlan instanceof StreamingPlan) {
-			job = ((StreamingPlan) optPlan).getJobGraph();
-			job.setSavepointPath(savepointPath);
-		} else {
-			JobGraphGenerator gen = new JobGraphGenerator(this.config);
-			job = gen.compileJobGraph((OptimizedPlan) optPlan);
-		}
-
-		for (URL jar : jarFiles) {
-			try {
-				job.addJar(new Path(jar.toURI()));
-			} catch (URISyntaxException e) {
-				throw new RuntimeException("URL is invalid. This should not happen.", e);
-			}
-		}
- 
-		job.setClasspaths(classpaths);
-
-		return job;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Helper methods
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns the {@link ActorGateway} of the current job manager leader using
-	 * the {@link LeaderRetrievalService}.
-	 *
-	 * @return ActorGateway of the current job manager leader
-	 * @throws Exception
-	 */
-	private ActorGateway getJobManagerGateway() throws Exception {
-		LOG.info("Looking up JobManager");
-		LeaderRetrievalService leaderRetrievalService;
-
-		leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
-
-		return LeaderRetrievalUtils.retrieveLeaderGateway(
-			leaderRetrievalService,
-			actorSystem,
-			lookupTimeout);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
new file mode 100644
index 0000000..b56428d
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorSystem;
+
+
+/**
+ * Encapsulates the functionality necessary to submit a program to a remote cluster.
+ */
+public abstract class ClusterClient {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ClusterClient.class);
+
+	/** The optimizer used in the optimization of batch programs */
+	final Optimizer compiler;
+
+	/** The actor system used to communicate with the JobManager */
+	protected final ActorSystem actorSystem;
+
+	/** Configuration of the client */
+	protected final Configuration flinkConfig;
+
+	/** Timeout for futures */
+	protected final FiniteDuration timeout;
+
+	/** Lookup timeout for the job manager retrieval service */
+	private final FiniteDuration lookupTimeout;
+
+	/** Flag indicating whether to sysout print execution updates */
+	private boolean printStatusDuringExecution = true;
+
+	/**
+	 * For interactive invocations, the Job ID is only available after the ContextEnvironment has
+	 * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
+	 * which lets us access the last JobID here.
+	 */
+	private JobID lastJobID;
+
+	/** Switch for blocking/detached job submission of the client */
+	private boolean detachedJobSubmission = false;
+
+	// ------------------------------------------------------------------------
+	//                            Construction
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a instance that submits the programs to the JobManager defined in the
+	 * configuration. This method will try to resolve the JobManager hostname and throw an exception
+	 * if that is not possible.
+	 *
+	 * @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
+	 *
+	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+	 */
+	public ClusterClient(Configuration flinkConfig) throws IOException {
+
+		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
+
+		this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
+		this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
+
+		this.actorSystem = createActorSystem();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Startup & Shutdown
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Method to create the ActorSystem of the Client. May be overriden in subclasses.
+	 * @return ActorSystem
+	 * @throws IOException
+	 */
+	protected ActorSystem createActorSystem() throws IOException {
+
+		if (actorSystem != null) {
+			throw new RuntimeException("This method may only be called once.");
+		}
+
+		// start actor system
+		LOG.info("Starting client actor system.");
+
+		String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+		int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+		if (hostName == null || port == -1) {
+			throw new IOException("The initial JobManager address has not been set correctly.");
+		}
+		InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port);
+
+		// find name of own public interface, able to connect to the JM
+		// try to find address for 2 seconds. log after 400 ms.
+		InetAddress ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400);
+		return AkkaUtils.createActorSystem(flinkConfig,
+			new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0)));
+	}
+
+	/**
+	 * Shuts down the client. This stops the internal actor system and actors.
+	 */
+	public void shutdown() {
+		try {
+			finalizeCluster();
+		} finally {
+			if (!this.actorSystem.isTerminated()) {
+				this.actorSystem.shutdown();
+				this.actorSystem.awaitTermination();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Configuration
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Configures whether the client should print progress updates during the execution to {@code System.out}.
+	 * All updates are logged via the SLF4J loggers regardless of this setting.
+	 *
+	 * @param print True to print updates to standard out during execution, false to not print them.
+	 */
+	public void setPrintStatusDuringExecution(boolean print) {
+		this.printStatusDuringExecution = print;
+	}
+
+	/**
+	 * @return whether the client will print progress updates during the execution to {@code System.out}
+	 */
+	public boolean getPrintStatusDuringExecution() {
+		return this.printStatusDuringExecution;
+	}
+
+	/**
+	 * Gets the current JobManager address from the Flink configuration (may change in case of a HA setup).
+	 * @return The address (host and port) of the leading JobManager
+	 */
+	public InetSocketAddress getJobManagerAddressFromConfig() {
+		try {
+		String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+		int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+		return new InetSocketAddress(hostName, port);
+		} catch (Exception e) {
+			throw new RuntimeException("Failed to retrieve JobManager address", e);
+		}
+	}
+
+	/**
+	 * Gets the current JobManager address (may change in case of a HA setup).
+	 * @return The address (host and port) of the leading JobManager
+	 */
+	public InetSocketAddress getJobManagerAddress() {
+		try {
+			final ActorRef jmActor = getJobManagerGateway().actor();
+			return AkkaUtils.getInetSockeAddressFromAkkaURL(jmActor.path().toSerializationFormat());
+		} catch (Exception e) {
+			throw new RuntimeException("Failed to retrieve JobManager address", e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Access to the Program's Plan
+	// ------------------------------------------------------------------------
+
+	public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
+			throws CompilerException, ProgramInvocationException
+	{
+		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+		return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
+	}
+
+	public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
+			throws CompilerException, ProgramInvocationException
+	{
+		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+		if (prog.isUsingProgramEntryPoint()) {
+			return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
+		} else if (prog.isUsingInteractiveMode()) {
+			// temporary hack to support the optimizer plan preview
+			OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
+			if (parallelism > 0) {
+				env.setParallelism(parallelism);
+			}
+
+			return env.getOptimizedPlan(prog);
+		} else {
+			throw new RuntimeException("Couldn't determine program mode.");
+		}
+	}
+
+	public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
+		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
+			LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
+			p.setDefaultParallelism(parallelism);
+		}
+		LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
+
+		return compiler.compile(p);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Program submission / execution
+	// ------------------------------------------------------------------------
+
+	/**
+	 * General purpose method to run a user jar from the CliFrontend in either blocking or detached mode, depending
+	 * on whether {@code setDetached(true)} or {@code setDetached(false)}.
+	 * @param prog the packaged program
+	 * @param parallelism the parallelism to execute the contained Flink job
+	 * @return The result of the execution
+	 * @throws ProgramInvocationException
+	 */
+	public JobSubmissionResult run(PackagedProgram prog, int parallelism)
+			throws ProgramInvocationException
+	{
+		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+		if (prog.isUsingProgramEntryPoint()) {
+			return run(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
+		}
+		else if (prog.isUsingInteractiveMode()) {
+			LOG.info("Starting program in interactive mode");
+			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
+					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
+					prog.getSavepointPath());
+			ContextEnvironment.setAsContext(factory);
+
+			try {
+				// invoke main method
+				prog.invokeInteractiveModeForExecution();
+				if (isDetached()) {
+					// in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
+					return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
+				}
+				else {
+					// in blocking mode, we execute all Flink jobs contained in the user code and then return here
+					return new JobSubmissionResult(lastJobID);
+				}
+			}
+			finally {
+				ContextEnvironment.unsetContext();
+			}
+		}
+		else {
+			throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
+		}
+	}
+
+	public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
+		return run(program, parallelism, null);
+	}
+
+	/**
+	 * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
+	 * execution is complete, and returns afterwards.
+	 *
+	 * @param program The program to be executed.
+	 * @param parallelism The default parallelism to use when running the program. The default parallelism is used
+	 *                    when the program does not set a parallelism by itself.
+	 *
+	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
+	 *                                    or if the submission failed. That might be either due to an I/O problem,
+	 *                                    i.e. the job-manager is unreachable, or due to the fact that the
+	 *                                    parallel execution failed.
+	 */
+	public JobSubmissionResult run(JobWithJars program, int parallelism, String savepointPath)
+			throws CompilerException, ProgramInvocationException {
+		ClassLoader classLoader = program.getUserCodeClassLoader();
+		if (classLoader == null) {
+			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+		}
+
+		OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
+		return run(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath);
+	}
+
+	public JobSubmissionResult run(
+			FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
+		return run(compiledPlan, libraries, classpaths, classLoader, null);
+	}
+
+	public JobSubmissionResult run(FlinkPlan compiledPlan,
+			List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, String savepointPath)
+		throws ProgramInvocationException
+	{
+		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
+		return submitJob(job, classLoader);
+	}
+
+	/**
+	 * Submits a JobGraph blocking.
+	 * @param jobGraph The JobGraph
+	 * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
+	 * @return JobExecutionResult
+	 * @throws ProgramInvocationException
+	 */
+	public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		LeaderRetrievalService leaderRetrievalService;
+		try {
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+		} catch (Exception e) {
+			throw new ProgramInvocationException("Could not create the leader retrieval service", e);
+		}
+
+		try {
+			this.lastJobID = jobGraph.getJobID();
+			return JobClient.submitJobAndWait(actorSystem,
+				leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
+		} catch (JobExecutionException e) {
+			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
+		}
+	}
+
+	/**
+	 * Submits a JobGraph detached.
+	 * @param jobGraph The JobGraph
+	 * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
+	 * @return JobSubmissionResult
+	 * @throws ProgramInvocationException
+	 */
+	public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		ActorGateway jobManagerGateway;
+
+		try {
+			jobManagerGateway = getJobManagerGateway();
+		} catch (Exception e) {
+			throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
+		}
+
+		try {
+			JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader);
+			return new JobSubmissionResult(jobGraph.getJobID());
+		} catch (JobExecutionException e) {
+			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
+		}
+	}
+
+	/**
+	 * Cancels a job identified by the job id.
+	 * @param jobId the job id
+	 * @throws Exception In case an error occurred.
+	 */
+	public void cancel(JobID jobId) throws Exception {
+		final ActorGateway jobManagerGateway = getJobManagerGateway();
+
+		final Future<Object> response;
+		try {
+			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
+		} catch (final Exception e) {
+			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
+		}
+
+		final Object result = Await.result(response, timeout);
+
+		if (result instanceof JobManagerMessages.CancellationSuccess) {
+			LOG.info("Job cancellation with ID " + jobId + " succeeded.");
+		} else if (result instanceof JobManagerMessages.CancellationFailure) {
+			final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
+			LOG.info("Job cancellation with ID " + jobId + " failed.", t);
+			throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
+		} else {
+			throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
+		}
+	}
+
+	/**
+	 * Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
+	 * Stopping works only for streaming programs. Be aware, that the program might continue to run for
+	 * a while after sending the stop command, because after sources stopped to emit data all operators
+	 * need to finish processing.
+	 * 
+	 * @param jobId
+	 *            the job ID of the streaming program to stop
+	 * @throws Exception
+	 *             If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal
+	 *             failed. That might be due to an I/O problem, ie, the job-manager is unreachable.
+	 */
+	public void stop(final JobID jobId) throws Exception {
+		final ActorGateway jobManagerGateway = getJobManagerGateway();
+
+		final Future<Object> response;
+		try {
+			response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout);
+		} catch (final Exception e) {
+			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
+		}
+
+		final Object result = Await.result(response, timeout);
+
+		if (result instanceof JobManagerMessages.StoppingSuccess) {
+			LOG.info("Job stopping with ID " + jobId + " succeeded.");
+		} else if (result instanceof JobManagerMessages.StoppingFailure) {
+			final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause();
+			LOG.info("Job stopping with ID " + jobId + " failed.", t);
+			throw new Exception("Failed to stop the job because of \n" + t.getMessage());
+		} else {
+			throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
+		}
+	}
+
+	/**
+	 * Requests and returns the accumulators for the given job identifier. Accumulators can be
+	 * requested while a is running or after it has finished. The default class loader is used
+	 * to deserialize the incoming accumulator results.
+	 * @param jobID The job identifier of a job.
+	 * @return A Map containing the accumulator's name and its value.
+	 */
+	public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
+		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
+	}
+
+	/**
+	 * Requests and returns the accumulators for the given job identifier. Accumulators can be
+	 * requested while a is running or after it has finished.
+	 * @param jobID The job identifier of a job.
+	 * @param loader The class loader for deserializing the accumulator results.
+	 * @return A Map containing the accumulator's name and its value.
+	 */
+	public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
+		ActorGateway jobManagerGateway = getJobManagerGateway();
+
+		Future<Object> response;
+		try {
+			response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
+		} catch (Exception e) {
+			throw new Exception("Failed to query the job manager gateway for accumulators.", e);
+		}
+
+		Object result = Await.result(response, timeout);
+
+		if (result instanceof AccumulatorResultsFound) {
+			Map<String, SerializedValue<Object>> serializedAccumulators =
+					((AccumulatorResultsFound) result).result();
+
+			return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
+
+		} else if (result instanceof AccumulatorResultsErroneous) {
+			throw ((AccumulatorResultsErroneous) result).cause();
+		} else {
+			throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Sessions
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Tells the JobManager to finish the session (job) defined by the given ID.
+	 * 
+	 * @param jobId The ID that identifies the session.
+	 */
+	public void endSession(JobID jobId) throws Exception {
+		if (jobId == null) {
+			throw new IllegalArgumentException("The JobID must not be null.");
+		}
+		endSessions(Collections.singletonList(jobId));
+	}
+
+	/**
+	 * Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
+	 *
+	 * @param jobIds The IDs that identify the sessions.
+	 */
+	public void endSessions(List<JobID> jobIds) throws Exception {
+		if (jobIds == null) {
+			throw new IllegalArgumentException("The JobIDs must not be null");
+		}
+
+		ActorGateway jobManagerGateway = getJobManagerGateway();
+		
+		for (JobID jid : jobIds) {
+			if (jid != null) {
+				LOG.info("Telling job manager to end the session {}.", jid);
+				jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal translation methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates the optimized plan for a given program, using this client's compiler.
+	 *
+	 * @param prog The program to be compiled.
+	 * @return The compiled and optimized plan, as returned by the compiler.
+	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
+	 */
+	private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
+			throws CompilerException, ProgramInvocationException {
+		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
+	}
+
+	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
+		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null);
+	}
+
+	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException {
+		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath);
+	}
+
+	private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, String savepointPath) {
+		JobGraph job;
+		if (optPlan instanceof StreamingPlan) {
+			job = ((StreamingPlan) optPlan).getJobGraph();
+			job.setSavepointPath(savepointPath);
+		} else {
+			JobGraphGenerator gen = new JobGraphGenerator(this.flinkConfig);
+			job = gen.compileJobGraph((OptimizedPlan) optPlan);
+		}
+
+		for (URL jar : jarFiles) {
+			try {
+				job.addJar(new Path(jar.toURI()));
+			} catch (URISyntaxException e) {
+				throw new RuntimeException("URL is invalid. This should not happen.", e);
+			}
+		}
+ 
+		job.setClasspaths(classpaths);
+
+		return job;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Helper methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the {@link ActorGateway} of the current job manager leader using
+	 * the {@link LeaderRetrievalService}.
+	 *
+	 * @return ActorGateway of the current job manager leader
+	 * @throws Exception
+	 */
+	protected ActorGateway getJobManagerGateway() throws Exception {
+		LOG.info("Looking up JobManager");
+
+		return LeaderRetrievalUtils.retrieveLeaderGateway(
+			LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
+			actorSystem,
+			lookupTimeout);
+	}
+
+	/**
+	 * Logs and prints to sysout if printing to stdout is enabled.
+	 * @param message The message to log/print
+	 */
+	protected void logAndSysout(String message) {
+		LOG.info(message);
+		if (printStatusDuringExecution) {
+			System.out.println(message);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Abstract methods to be implemented by the cluster specific Client
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns an URL (as a string) to the JobManager web interface
+	 */
+	public abstract String getWebInterfaceURL();
+
+	/**
+	 * Returns the latest cluster status, with number of Taskmanagers and slots
+	 */
+	public abstract GetClusterStatusResponse getClusterStatus();
+
+	/**
+	 * May return new messages from the cluster.
+	 * Messages can be for example about failed containers or container launch requests.
+	 */
+	protected abstract List<String> getNewMessages();
+
+	/**
+	 * Returns a string representation of the cluster.
+	 */
+	protected abstract String getClusterIdentifier();
+
+	/**
+	 * Request the cluster to shut down or disconnect.
+	 */
+	protected abstract void finalizeCluster();
+
+	/**
+	 * Set the mode of this client (detached or blocking job execution).
+	 * @param isDetached If true, the client will submit programs detached via the {@code run} method
+	 */
+	public void setDetached(boolean isDetached) {
+		this.detachedJobSubmission = isDetached;
+	}
+
+	/**
+	 * A flag to indicate whether this clients submits jobs detached.
+	 * @return True if the Client submits detached, false otherwise
+	 */
+	public boolean isDetached() {
+		return detachedJobSubmission;
+	}
+
+	/**
+	 * Return the Flink configuration object
+	 * @return The Flink configuration object
+	 */
+	public Configuration getFlinkConfiguration() {
+		return flinkConfig.clone();
+	}
+
+	/**
+	 * The client may define an upper limit on the number of slots to use
+	 * @return -1 if unknown
+	 */
+	public abstract int getMaxSlots();
+
+	/**
+	 * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform
+	 * some custom job submission logic.
+	 * @param jobGraph The JobGraph to be submitted
+	 * @return JobSubmissionResult
+	 */
+	protected abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
+		throws ProgramInvocationException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index dfb5f2e..fe2d7e0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -34,7 +34,7 @@ import java.util.List;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-	protected final Client client;
+	protected final ClusterClient client;
 
 	protected final List<URL> jarFilesToAttach;
 
@@ -44,8 +44,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 
 	protected final String savepointPath;
 	
-	public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths,
-			ClassLoader userCodeClassLoader, String savepointPath) {
+	public ContextEnvironment(ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths,
+				ClassLoader userCodeClassLoader, String savepointPath) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
 		this.classpathsToAttach = classpaths;
@@ -58,7 +58,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
 				this.userCodeClassLoader);
-		this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism(), savepointPath);
+		this.lastJobExecutionResult = client.run(toRun, getParallelism(), savepointPath).getJobExecutionResult();
 		return this.lastJobExecutionResult;
 	}
 
@@ -66,7 +66,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public String getExecutionPlan() throws Exception {
 		Plan plan = createProgramPlan("unnamed job");
 
-		OptimizedPlan op = Client.getOptimizedPlan(client.compiler, plan, getParallelism());
+		OptimizedPlan op = ClusterClient.getOptimizedPlan(client.compiler, plan, getParallelism());
 		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 		return gen.getOptimizerPlanAsJSON(op);
 	}
@@ -83,7 +83,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 				+ ") : " + getIdString();
 	}
 	
-	public Client getClient() {
+	public ClusterClient getClient() {
 		return this.client;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index e820bad..f9b1fc2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -32,7 +32,7 @@ import java.util.List;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final Client client;
+	private final ClusterClient client;
 
 	private final List<URL> jarFilesToAttach;
 
@@ -42,34 +42,34 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
 	private final int defaultParallelism;
 
-	private final boolean wait;
+	private final boolean isDetached;
 
 	private ExecutionEnvironment lastEnvCreated;
 
 	private String savepointPath;
 
-	public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
+	public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach,
 			List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
-			boolean wait, String savepointPath)
+			boolean isDetached, String savepointPath)
 	{
 		this.client = client;
 		this.jarFilesToAttach = jarFilesToAttach;
 		this.classpathsToAttach = classpathsToAttach;
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.defaultParallelism = defaultParallelism;
-		this.wait = wait;
+		this.isDetached = isDetached;
 		this.savepointPath = savepointPath;
 	}
 
 	@Override
 	public ExecutionEnvironment createExecutionEnvironment() {
-		if (!wait && lastEnvCreated != null) {
+		if (isDetached && lastEnvCreated != null) {
 			throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode");
 		}
 
-		lastEnvCreated = wait ?
-				new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath) :
-				new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
+		lastEnvCreated = isDetached ?
+				new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath):
+				new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
 		if (defaultParallelism > 0) {
 			lastEnvCreated.setParallelism(defaultParallelism);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
index 037c36b..8298933 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -42,7 +42,7 @@ public class DetachedEnvironment extends ContextEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class);
 
 	public DetachedEnvironment(
-			Client remoteConnection,
+			ClusterClient remoteConnection,
 			List<URL> jarFiles,
 			List<URL> classpaths,
 			ClassLoader userCodeClassLoader,
@@ -53,7 +53,7 @@ public class DetachedEnvironment extends ContextEnvironment {
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan p = createProgramPlan(jobName);
-		setDetachedPlan(Client.getOptimizedPlan(client.compiler, p, getParallelism()));
+		setDetachedPlan(ClusterClient.getOptimizedPlan(client.compiler, p, getParallelism()));
 		LOG.warn("Job was executed in detached mode, the results will be available on completion.");
 		this.lastJobExecutionResult = DetachedJobExecutionResult.INSTANCE;
 		return this.lastJobExecutionResult;
@@ -72,7 +72,7 @@ public class DetachedEnvironment extends ContextEnvironment {
 	 * Finishes this Context Environment's execution by explicitly running the plan constructed.
 	 */
 	JobSubmissionResult finalizeExecute() throws ProgramInvocationException {
-		return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
+		return client.run(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
 	}
 
 	public static final class DetachedJobExecutionResult extends JobExecutionResult {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
new file mode 100644
index 0000000..82f350a
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Cluster client for communication with an standalone (on-premise) cluster or an existing cluster that has been
+ * brought up independently of a specific job.
+ */
+public class StandaloneClusterClient extends ClusterClient {
+
+	public StandaloneClusterClient(Configuration config) throws IOException {
+		super(config);
+	}
+
+
+	@Override
+	public String getWebInterfaceURL() {
+		String host = this.getJobManagerAddress().getHostName();
+		int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+		return "http://" +  host + ":" + port;
+	}
+
+	@Override
+	public GetClusterStatusResponse getClusterStatus() {
+		ActorGateway jmGateway;
+		try {
+			jmGateway = getJobManagerGateway();
+			Future<Object> future = jmGateway.ask(GetClusterStatus.getInstance(), timeout);
+			Object result = Await.result(future, timeout);
+			if (result instanceof GetClusterStatusResponse) {
+				return (GetClusterStatusResponse) result;
+			} else {
+				throw new RuntimeException("Received the wrong reply " + result + " from cluster.");
+			}
+		} catch (Exception e) {
+			throw new RuntimeException("Couldn't retrieve the Cluster status.", e);
+		}
+	}
+
+	@Override
+	public List<String> getNewMessages() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public String getClusterIdentifier() {
+		return "Standalone cluster with JobManager running at " + this.getJobManagerAddress();
+	}
+
+	@Override
+	public int getMaxSlots() {
+		return -1;
+	}
+
+	@Override
+	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
+			throws ProgramInvocationException {
+		if (isDetached()) {
+			return super.runDetached(jobGraph, classLoader);
+		} else {
+			return super.run(jobGraph, classLoader);
+		}
+	}
+
+	@Override
+	protected void finalizeCluster() {}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 9d0b691..de85ca8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.client;
 
-import static org.junit.Assert.assertEquals;
+import static org.apache.flink.client.CliFrontendTestUtils.checkJobManagerAddress;
 import static org.junit.Assert.fail;
 
 import static org.mockito.Mockito.*;
 
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
 import org.apache.flink.client.cli.CommandLineOptions;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -46,57 +42,18 @@ public class CliFrontendAddressConfigurationTest {
 
 	@Rule
 	public TemporaryFolder folder = new TemporaryFolder();
-	
+
 	@BeforeClass
 	public static void init() {
 		CliFrontendTestUtils.pipeSystemOutToNull();
 	}
-	
+
 	@Before
 	public void clearConfig() {
 		CliFrontendTestUtils.clearGlobalConfiguration();
 	}
 
 	@Test
-	public void testInvalidConfigAndNoOption() {
-		try {
-			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-			CommandLineOptions options = mock(CommandLineOptions.class);
-
-			frontend.updateConfig(options);
-			Configuration config = frontend.getConfiguration();
-
-			checkJobManagerAddress(config, null, -1);
-
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testInvalidConfigAndOption() {
-		try {
-			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-
-			CommandLineOptions options = mock(CommandLineOptions.class);
-			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
-
-			frontend.updateConfig(options);
-			Configuration config = frontend.getConfiguration();
-
-			InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
-
-			checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
 	public void testValidConfig() {
 		try {
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -112,83 +69,38 @@ public class CliFrontendAddressConfigurationTest {
 					CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
 		}
 		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
 	}
 
-	/**
-	 * Test that the CliFrontent is able to pick up the .yarn-properties file from a specified location.
-	 */
 	@Test
-	public void testYarnConfig() {
+	public void testInvalidConfigAndNoOption() {
 		try {
-			File tmpFolder = folder.newFolder();
-			String currentUser = System.getProperty("user.name");
-
-			// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
-			File confFile = new File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/flink-conf.yaml").getFile());
-			File testConfFile = new File(tmpFolder, "flink-conf.yaml");
-			org.apache.commons.io.FileUtils.copyFile(confFile, testConfFile);
-			String toAppend = "\nyarn.properties-file.location: " + tmpFolder;
-			// append to flink-conf.yaml
-			Files.write(testConfFile.toPath(), toAppend.getBytes(), StandardOpenOption.APPEND);
-			// copy .yarn-properties-<username>
-			File propertiesFile = new File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/.yarn-properties").getFile());
-			File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
-			org.apache.commons.io.FileUtils.copyFile(propertiesFile, testPropertiesFile);
-
-			// start CLI Frontend
-			CliFrontend frontend = new CliFrontend(tmpFolder.getAbsolutePath());
-
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
 			CommandLineOptions options = mock(CommandLineOptions.class);
 
 			frontend.updateConfig(options);
 			Configuration config = frontend.getConfiguration();
 
-			checkJobManagerAddress(
-					config,
-					CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS,
-					CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_PORT);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testInvalidYarnConfig() {
-		try {
-			CliFrontend cli = new CliFrontend(CliFrontendTestUtils.getConfigDirWithInvalidYarnFile());
-
-			CommandLineOptions options = mock(CommandLineOptions.class);
-
-			cli.updateConfig(options);
-
-			Configuration config = cli.getConfiguration();
+			checkJobManagerAddress(config, null, -1);
 
-			checkJobManagerAddress(
-				config,
-				CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS,
-				CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
-	public void testManualOptionsOverridesConfig() {
+	public void testInvalidConfigAndOption() {
 		try {
-			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
 
 			CommandLineOptions options = mock(CommandLineOptions.class);
 			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
 
 			frontend.updateConfig(options);
-
 			Configuration config = frontend.getConfiguration();
 
 			InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
@@ -200,11 +112,11 @@ public class CliFrontendAddressConfigurationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
-	public void testManualOptionsOverridesYarn() {
+	public void testManualOptionsOverridesConfig() {
 		try {
-			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 
 			CommandLineOptions options = mock(CommandLineOptions.class);
 			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
@@ -223,11 +135,4 @@ public class CliFrontendAddressConfigurationTest {
 		}
 	}
 
-	public void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
-		String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-
-		assertEquals(expectedAddress, jobManagerAddress);
-		assertEquals(expectedPort, jobManagerPort);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index 5439742..f47ca69 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -30,11 +30,10 @@ import static org.mockito.Mockito.*;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
 import org.apache.flink.optimizer.DataStatistics;
@@ -328,7 +327,7 @@ public class CliFrontendPackageProgramTest {
 			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c);
 
 			// we expect this to fail with a "ClassNotFoundException"
-			Client.getOptimizedPlanAsJson(compiler, prog, 666);
+			ClusterClient.getOptimizedPlanAsJson(compiler, prog, 666);
 			fail("Should have failed with a ClassNotFoundException");
 		}
 		catch (ProgramInvocationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 56173bd..ceba6cb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -25,11 +25,10 @@ import static org.junit.Assert.*;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.RunOptions;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 
 public class CliFrontendRunTest {
@@ -75,7 +74,7 @@ public class CliFrontendRunTest {
 			// test detached mode
 			{
 				String[] parameters = {"-p", "2", "-d", getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, true);
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, true, true);
 				assertEquals(0, testFrontend.run(parameters));
 			}
 
@@ -96,9 +95,6 @@ public class CliFrontendRunTest {
 			// test configure savepoint path
 			{
 				String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, false, false);
-				assertEquals(0, testFrontend.run(parameters));
-
 				RunOptions options = CliFrontendParser.parseRunCommand(parameters);
 				assertEquals("expectedSavepointPath", options.getSavepointPath());
 			}
@@ -125,22 +121,16 @@ public class CliFrontendRunTest {
 		}
 
 		@Override
-		protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
-			assertTrue(isDetached);
-			assertEquals(this.expectedParallelism, parallelism);
-			assertEquals(this.sysoutLogging, client.getPrintStatusDuringExecution());
-			return 0;
-		}
-
-		@Override
-		protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
-			assertTrue(!isDetached);
+		protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
+			assertEquals(isDetached, client.isDetached());
+			assertEquals(sysoutLogging, client.getPrintStatusDuringExecution());
+			assertEquals(expectedParallelism, parallelism);
 			return 0;
 		}
 
 		@Override
-		protected Client getClient(CommandLineOptions options, String programName, int userParallelism, boolean detached) throws Exception {
-			return Mockito.mock(Client.class);
+		protected ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
+			return TestingClusterClientWithoutActorSystem.create();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 7d01ab6..1872133 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -25,9 +25,12 @@ import java.io.PrintStream;
 import java.lang.reflect.Field;
 import java.net.MalformedURLException;
 import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class CliFrontendTestUtils {
@@ -35,16 +38,11 @@ public class CliFrontendTestUtils {
 	public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount";
 	
 	public static final String TEST_JAR_CLASSLOADERTEST_CLASS = "org.apache.flink.client.testjar.JobWithExternalDependency";
-	
-	
+
 	public static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
-	
+
 	public static final int TEST_JOB_MANAGER_PORT = 55443;
 	
-	public static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
-	
-	public static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
-	
 	
 	public static String getTestJarPath() throws FileNotFoundException, MalformedURLException {
 		File f = new File("target/maven-test-jar.jar");
@@ -68,17 +66,7 @@ public class CliFrontendTestUtils {
 		String confFile = CliFrontendRunTest.class.getResource("/invalidtestconfig/flink-conf.yaml").getFile();
 		return new File(confFile).getAbsoluteFile().getParent();
 	}
-	
-	public static String getConfigDirWithYarnFile() {
-		String confFile = CliFrontendRunTest.class.getResource("/testconfigwithyarn/flink-conf.yaml").getFile();
-		return new File(confFile).getAbsoluteFile().getParent();
-	}
-	
-	public static String getConfigDirWithInvalidYarnFile() {
-		String confFile = CliFrontendRunTest.class.getResource("/testconfigwithinvalidyarn/flink-conf.yaml").getFile();
-		return new File(confFile).getAbsoluteFile().getParent();
-	}
-	
+
 	public static void pipeSystemOutToNull() {
 		System.setOut(new PrintStream(new BlackholeOutputSteam()));
 		System.setErr(new PrintStream(new BlackholeOutputSteam()));
@@ -114,6 +102,14 @@ public class CliFrontendTestUtils {
 		@Override
 		public void write(int b){}
 	}
+
+	public static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
+		String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+		int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+		assertEquals(expectedAddress, jobManagerAddress);
+		assertEquals(expectedPort, jobManagerPort);
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
new file mode 100644
index 0000000..ab608cb
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+/**
+ * A client to use in tests which does not instantiate an ActorSystem.
+ */
+public class TestingClusterClientWithoutActorSystem extends StandaloneClusterClient {
+
+	private TestingClusterClientWithoutActorSystem() throws IOException {
+		super(new Configuration());
+	}
+
+	/**
+	 * Do not instantiate the Actor System to save resources.
+	 * @return Mocked ActorSystem
+	 * @throws IOException
+	 */
+	@Override
+	protected ActorSystem createActorSystem() throws IOException {
+		return Mockito.mock(ActorSystem.class);
+	}
+
+	public static ClusterClient create() {
+		try {
+			return new TestingClusterClientWithoutActorSystem();
+		} catch (IOException e) {
+			throw new RuntimeException("Could not create TestingClientWithoutActorSystem.", e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 14a1fff..4eb5269 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -98,7 +98,7 @@ public class ClientConnectionTest {
 				@Override
 				public void run() {
 					try {
-						new Client(config);
+						new StandaloneClusterClient(config);
 						fail("This should fail with an exception since the JobManager is unreachable.");
 					}
 					catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 4f9b367..96785f4 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -66,7 +66,7 @@ import static org.mockito.Mockito.when;
 
 
 /**
- * Simple and maybe stupid test to check the {@link Client} class.
+ * Simple and maybe stupid test to check the {@link ClusterClient} class.
  */
 public class ClientTest {
 
@@ -127,11 +127,12 @@ public class ClientTest {
 	@Test
 	public void testDetachedMode() throws Exception{
 		jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
-		Client out = new Client(config);
+		ClusterClient out = new StandaloneClusterClient(config);
+		out.setDetached(true);
 
 		try {
 			PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class);
-			out.runDetached(prg, 1);
+			out.run(prg, 1);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -141,7 +142,7 @@ public class ClientTest {
 
 		try {
 			PackagedProgram prg = new PackagedProgram(TestEager.class);
-			out.runDetached(prg, 1);
+			out.run(prg, 1);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -151,7 +152,7 @@ public class ClientTest {
 
 		try {
 			PackagedProgram prg = new PackagedProgram(TestGetRuntime.class);
-			out.runDetached(prg, 1);
+			out.run(prg, 1);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -161,7 +162,7 @@ public class ClientTest {
 
 		try {
 			PackagedProgram prg = new PackagedProgram(TestGetJobID.class);
-			out.runDetached(prg, 1);
+			out.run(prg, 1);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -171,7 +172,7 @@ public class ClientTest {
 
 		try {
 			PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class);
-			out.runDetached(prg, 1);
+			out.run(prg, 1);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -181,7 +182,7 @@ public class ClientTest {
 
 		try {
 			PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class);
-			out.runDetached(prg, 1);
+			out.run(prg, 1);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -198,8 +199,9 @@ public class ClientTest {
 		try {
 			jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
 
-			Client out = new Client(config);
-			JobSubmissionResult result = out.runDetached(program.getPlanWithJars(), 1);
+			ClusterClient out = new StandaloneClusterClient(config);
+			out.setDetached(true);
+			JobSubmissionResult result = out.run(program.getPlanWithJars(), 1);
 
 			assertNotNull(result);
 
@@ -219,10 +221,11 @@ public class ClientTest {
 		try {
 			jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
 
-			Client out = new Client(config);
+			ClusterClient out = new StandaloneClusterClient(config);
+			out.setDetached(true);
 
 			try {
-				out.runDetached(program.getPlanWithJars(), 1);
+				out.run(program.getPlanWithJars(), 1);
 				fail("This should fail with an exception");
 			}
 			catch (ProgramInvocationException e) {
@@ -258,7 +261,9 @@ public class ClientTest {
 			}).when(packagedProgramMock).invokeInteractiveModeForExecution();
 
 			try {
-				new Client(config).runBlocking(packagedProgramMock, 1);
+				ClusterClient client = new StandaloneClusterClient(config);
+				client.setDetached(true);
+				client.run(packagedProgramMock, 1);
 				fail("Creating the local execution environment should not be possible");
 			}
 			catch (InvalidProgramException e) {
@@ -280,7 +285,7 @@ public class ClientTest {
 			assertNotNull(prg.getPreviewPlan());
 			
 			Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
-			OptimizedPlan op = (OptimizedPlan) Client.getOptimizedPlan(optimizer, prg, 1);
+			OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, 1);
 			assertNotNull(op);
 
 			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();


Mime
View raw message