flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-1631] [client] Overhaul of the client.
Date Wed, 04 Mar 2015 17:39:14 GMT
[FLINK-1631] [client] Overhaul of the client.

 - Fix bugs with non-serializable messages
 - Separate parser and action logic
 - Clean up tests
 - Vastly improve logging in CLI client
 - Additional tests for parsing / config setup in the command line client


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5385e48d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5385e48d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5385e48d

Branch: refs/heads/master
Commit: 5385e48d94a2df81c8fd6102a889cf42dd93fe2f
Parents: 0333109
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Mar 3 21:49:37 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Mar 4 18:20:36 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 1211 ++++++++----------
 .../apache/flink/client/cli/CancelOptions.java  |   37 +
 .../flink/client/cli/CliArgsException.java      |   30 +
 .../flink/client/cli/CliFrontendParser.java     |  284 ++++
 .../flink/client/cli/CommandLineOptions.java    |   57 +
 .../apache/flink/client/cli/InfoOptions.java    |   30 +
 .../apache/flink/client/cli/ListOptions.java    |   46 +
 .../apache/flink/client/cli/ProgramOptions.java |   97 ++
 .../org/apache/flink/client/cli/RunOptions.java |   30 +
 .../org/apache/flink/client/program/Client.java |   19 +-
 .../CliFrontendAddressConfigurationTest.java    |  180 +++
 .../flink/client/CliFrontendInfoTest.java       |   39 +-
 .../CliFrontendJobManagerConnectionTest.java    |  166 ---
 .../flink/client/CliFrontendListCancelTest.java |   61 +-
 .../client/CliFrontendPackageProgramTest.java   |  223 ++--
 .../apache/flink/client/CliFrontendRunTest.java |   14 +-
 .../flink/client/CliFrontendTestUtils.java      |   15 +-
 .../ExecutionPlanAfterExecutionTest.java        |    7 +
 .../main/flink-bin/conf/log4j-cli.properties    |    1 +
 .../flink/runtime/client/JobStatusMessage.java  |   59 +
 .../flink/runtime/jobmanager/JobManager.scala   |   29 +-
 .../jobmanager/JobManagerCLIConfiguration.scala |    2 +-
 .../runtime/messages/JobManagerMessages.scala   |   33 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |    9 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |   12 +-
 25 files changed, 1627 insertions(+), 1064 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e438de0..1d9d956 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -24,6 +24,8 @@ import java.io.FileNotFoundException;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,19 +40,20 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.cli.UnrecognizedOptionException;
+import org.apache.flink.client.cli.CancelOptions;
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.InfoOptions;
+import org.apache.flink.client.cli.ListOptions;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -58,18 +61,23 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -80,201 +88,133 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class CliFrontend {
 
-	// run job by deploying Flink into a YARN cluster, if this string is specified as the jobmanager address
-	public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
-
-	// command line interface of the YARN session, with a special initialization here to prefix all options with y/yarn.
-	private static FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn");
-
-	//actions
+	// actions
 	private static final String ACTION_RUN = "run";
 	private static final String ACTION_INFO = "info";
 	private static final String ACTION_LIST = "list";
 	private static final String ACTION_CANCEL = "cancel";
 
-	// general options
-	private static final Option HELP_OPTION = new Option("h", "help", false, "Show the help message for the CLI Frontend or the action.");
-
-	// program (jar file) specific options
-	private static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
-	private static final Option CLASS_OPTION = new Option("c", "class", true, "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the JAR file does not specify the class in its manifest.");
-	private static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value specified in the configuration.");
-	private static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
-
-	private static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. Specify '"+YARN_DEPLOY_JOBMANAGER+"' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a different JobManager than the one specified in the configuration.");
-
-	// info specific options
-
-	// list specific options
-	private static final Option RUNNING_OPTION = new Option("r", "running", false, "Show only running programs and their JobIDs");
-	private static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false, "Show only scheduled prorgrams and their JobIDs");
-
-	// canceling options
-
-	static {
-		initOptions();
-	}
-	
-	// action options all include the general options
-	private static final Options RUN_OPTIONS = getRunOptions(createGeneralOptions());
-	private static final Options INFO_OPTIONS = getInfoOptions(createGeneralOptions());
-	private static final Options LIST_OPTIONS = getListOptions(createGeneralOptions());
-	private static final Options CANCEL_OPTIONS = getCancelOptions(createGeneralOptions());
-	
 	// config dir parameters
 	private static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
 	private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
 	private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
 	
-	/**
-	 * YARN-session related constants
-	 */
+	// YARN-session related constants
 	public static final String YARN_PROPERTIES_FILE = ".yarn-properties";
 	public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
 	public static final String YARN_PROPERTIES_DOP = "degreeOfParallelism";
 	public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
-	// this has to be a regex for String.split()
-	public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
-	
 
-	private CommandLineParser parser;
-	
-	private boolean printHelp;
-	
-	private boolean globalConfigurationLoaded;
+	public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
 
-	private boolean yarnPropertiesLoaded = false;
+	/**
+	 * A special host name used to run a job by deploying Flink into a YARN cluster,
+	 * if this string is specified as the JobManager address
+ 	 */
+	public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
 	
-	private Properties yarnProperties;
 
-	// this flag indicates if the given Job is executed using a YARN cluster,
-	// started for this purpose.
-	private boolean runInYarnCluster = false;
+	// --------------------------------------------------------------------------------------------
+	// --------------------------------------------------------------------------------------------
 
-	private AbstractFlinkYarnCluster yarnCluster = null;
+	private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
 
-	protected String configurationDirectory = null;
+	private final File configDirectory;
 
+	private final Configuration config;
 
-	/**
-	 * Initializes the class
-	 */
-	public CliFrontend() {
-		parser = new PosixParser();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Setup of options
-	// --------------------------------------------------------------------------------------------
+	private final FiniteDuration askTimeout;
+
+	private final FiniteDuration lookupTimeout;
+
+	private InetSocketAddress jobManagerAddress;
+
+	private ActorSystem actorSystem;
+
+	private AbstractFlinkYarnCluster yarnCluster;
 
-	private static void initOptions() {
-		HELP_OPTION.setRequired(false);
 
-		JAR_OPTION.setRequired(false);
-		JAR_OPTION.setArgName("jarfile");
-		
-		CLASS_OPTION.setRequired(false);
-		CLASS_OPTION.setArgName("classname");
-		
-		ADDRESS_OPTION.setRequired(false);
-		ADDRESS_OPTION.setArgName("host:port");
-		
-		PARALLELISM_OPTION.setRequired(false);
-		PARALLELISM_OPTION.setArgName("parallelism");
-		
-		ARGS_OPTION.setRequired(false);
-		ARGS_OPTION.setArgName("programArgs");
-		ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
 
-		RUNNING_OPTION.setRequired(false);
-		SCHEDULED_OPTION.setRequired(false);
-	}
-	
-	static Options createGeneralOptions() {
-		Options options = new Options();
-		options.addOption(HELP_OPTION);
-		// backwards compatibility: ignore verbose flag (-v)
-		options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
-		return options;
-	}
-	
-	// gets the program options with the old flags for jar file and arguments
-	static Options getProgramSpecificOptions(Options options) {
-		options.addOption(JAR_OPTION);
-		options.addOption(CLASS_OPTION);
-		options.addOption(PARALLELISM_OPTION);
-		options.addOption(ARGS_OPTION);
-
-		// also add the YARN options so that the parser can parse them
-		yarnSessionCLi.getYARNSessionCLIOptions(options);
-		return options;
-	}
-	
-	// gets the program options without the old flags for jar file and arguments
-	static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
-		options.addOption(CLASS_OPTION);
-		options.addOption(PARALLELISM_OPTION);
-		return options;
-	}
-	
-	/**
-	 * Builds command line options for the run action.
-	 * 
-	 * @return Command line options for the run action.
-	 */
-	static Options getRunOptions(Options options) {
-		Options o = getProgramSpecificOptions(options);
-		return getJobManagerAddressOption(o);
-	}
-	
-	static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
-		Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
-		return getJobManagerAddressOption(o);
-	}
-	
-	static Options getJobManagerAddressOption(Options options) {
-		options.addOption(ADDRESS_OPTION);
-		return options;
-	}
-	
-	/**
-	 * Builds command line options for the info action.
-	 * 
-	 * @return Command line options for the info action.
-	 */
-	static Options getInfoOptions(Options options) {
-		options = getProgramSpecificOptions(options);
-		options = getJobManagerAddressOption(options);
-		return options;
-	}
-	
-	static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
-		options = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
-		options = getJobManagerAddressOption(options);
-		return options;
-	}
-	
 	/**
-	 * Builds command line options for the list action.
-	 * 
-	 * @return Command line options for the list action.
+	 *
+	 * @throws Exception Thrown if teh configuration directory was not found, the configuration could not
+	 *                   be loaded, or the YARN properties could not be parsed.
 	 */
-	static Options getListOptions(Options options) {
-		options.addOption(RUNNING_OPTION);
-		options.addOption(SCHEDULED_OPTION);
-		options = getJobManagerAddressOption(options);
-		return options;
+	public CliFrontend() throws Exception {
+		this(getConfigurationDirectoryFromEnv());
 	}
-	
-	/**
-	 * Builds command line options for the cancel action.
-	 * 
-	 * @return Command line options for the cancel action.
-	 */
-	static Options getCancelOptions(Options options) {
-		options = getJobManagerAddressOption(options);
-		return options;
+
+	public CliFrontend(String configDir) throws Exception {
+
+		// configure the config directory
+		this.configDirectory = new File(configDir);
+		LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath());
+
+		// load the configuration
+		LOG.info("Trying to load configuration file");
+		GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath());
+		this.config = GlobalConfiguration.getConfiguration();
+
+		// load the YARN properties
+		File propertiesFile = new File(configDirectory, YARN_PROPERTIES_FILE);
+		if (propertiesFile.exists()) {
+
+			logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
+
+			Properties yarnProperties = new Properties();
+			try {
+				InputStream is = new FileInputStream(propertiesFile);
+				try {
+					yarnProperties.load(is);
+				}
+				finally {
+					is.close();
+				}
+			}
+			catch (IOException e) {
+				throw new Exception("Cannot read the YARN properties file", e);
+			}
+
+			// configure the default degree of parallelism from YARN
+			String propDegree = yarnProperties.getProperty(YARN_PROPERTIES_DOP);
+			if (propDegree != null) { // maybe the property is not set
+				try {
+					int paraDegree = Integer.parseInt(propDegree);
+					this.config.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree);
+
+					logAndSysout("YARN properties set default parallelism to " + paraDegree);
+				}
+				catch (NumberFormatException e) {
+					throw new Exception("Error while parsing the YARN properties: " +
+							"Property " + YARN_PROPERTIES_DOP + " is not an integer.");
+				}
+			}
+
+			// get the JobManager address from the YARN properties
+			String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+			if (address != null) {
+				try {
+					jobManagerAddress = parseJobManagerAddress(address);
+				}
+				catch (Exception e) {
+					throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
+				}
+
+				logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
+			}
+
+			// handle the YARN client's dynamic properties
+			String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+			List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
+			for (Tuple2<String, String> dynamicProperty : dynamicProperties) {
+				this.config.setString(dynamicProperty.f0, dynamicProperty.f1);
+			}
+		}
+
+		this.askTimeout = AkkaUtils.getTimeout(config);
+		this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
 	}
+
 	
 	// --------------------------------------------------------------------------------------------
 	//  Execute Actions
@@ -286,182 +226,137 @@ public class CliFrontend {
 	 * @param args Command line arguments for the run action.
 	 */
 	protected int run(String[] args) {
-		// Parse command line options
-		CommandLine line;
+		LOG.info("Running 'run' command.");
+
+		RunOptions options;
 		try {
-			line = parser.parse(RUN_OPTIONS, args, true);
-			evaluateGeneralOptions(line);
+			options = CliFrontendParser.parseRunCommand(args);
 		}
-		catch (MissingOptionException e) {
+		catch (CliArgsException e) {
 			return handleArgException(e);
 		}
-		catch (MissingArgumentException e) {
-			return handleArgException(e);
-		}
-		catch (UnrecognizedOptionException e) {
-			return handleArgException(e);
-		}
-		catch (Exception e) {
-			return handleError(e);
+		catch (Throwable t) {
+			return handleError(t);
 		}
-		
-		// ------------ check for help first --------------
-		
-		if (printHelp) {
-			printHelpForRun();
+
+		// evaluate help flag
+		if (options.isPrintHelp()) {
+			CliFrontendParser.printHelpForRun();
 			return 0;
 		}
 
+		if (options.getJarFilePath() == null) {
+			return handleArgException(new CliArgsException("The program JAR file was not specified."));
+		}
+
 		PackagedProgram program;
-		Client client;
 		try {
-			program = buildProgram(line);
-			client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName());
-		} catch (FileNotFoundException e) {
+			LOG.info("Building program from JAR file");
+			program = buildProgram(options);
+		}
+		catch (FileNotFoundException e) {
 			return handleArgException(e);
-		} catch (ProgramInvocationException e) {
+		}
+		catch (ProgramInvocationException e) {
 			return handleError(e);
-		} catch (Throwable t) {
+		}
+		catch (Throwable t) {
 			return handleError(t);
 		}
 
-		int parallelism = -1;
-		if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
-			String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
-			try {
-				parallelism = Integer.parseInt(parString);
-			} catch (NumberFormatException e) {
-				System.out.println("The value " + parString + " is invalid for the degree of parallelism.");
-				return 1;
-			}
+		try {
+			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
 
-			if (parallelism <= 0) {
-				System.out.println("Invalid value for the degree-of-parallelism. Parallelism must be greater than zero.");
-				return 1;
-			}
-		}
+			int parallelism = options.getParallelism();
+			int exitCode = executeProgram(program, client, parallelism);
 
-		int exitCode = executeProgram(program, client, parallelism);
+			if (yarnCluster != null) {
+				List<String> msgs = yarnCluster.getNewMessages();
+				if (msgs != null && msgs.size() > 1) {
 
-		if(runInYarnCluster) {
-			List<String> msgs = yarnCluster.getNewMessages();
-			if(msgs != null && msgs.size() > 1) {
-				System.out.println("The following messages were created by the YARN cluster while running the Job:");
-				for(String msg : msgs) {
-					System.out.println(msg);
+					logAndSysout("The following messages were created by the YARN cluster while running the Job:");
+					for (String msg : msgs) {
+						logAndSysout(msg);
+					}
+				}
+				if (yarnCluster.hasFailed()) {
+					logAndSysout("YARN cluster is in failed state!");
+					logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
 				}
 			}
-			if(yarnCluster.hasFailed()) {
-				System.out.println("YARN cluster is in failed state!");
-				System.out.println("YARN Diagnostics: " + yarnCluster.getDiagnostics());
-			}
-			System.out.println("Shutting down YARN cluster");
-			yarnCluster.shutdown();
-		}
-
-		return exitCode;
-	}
 
-	// --------------------------------------------------------------------------------------------
-
-	protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
-		JobExecutionResult execResult;
-		try {
-			client.setPrintStatusDuringExecution(true);
-			execResult = client.run(program, parallelism, true);
+			return exitCode;
 		}
-		catch (ProgramInvocationException e) {
-			return handleError(e);
+		catch (Throwable t) {
+			return handleError(t);
 		}
 		finally {
-			program.deleteExtractedLibraries();
-		}
-		
-		// we come here after the job has finished
-		if (execResult != null) {
-			System.out.println("Job Runtime: " + execResult.getNetRuntime());
-			Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
-			if (accumulatorsResult.size() > 0) {
-				System.out.println("Accumulator Results: ");
-				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+			if (yarnCluster != null) {
+				logAndSysout("Shutting down YARN cluster");
+				yarnCluster.shutdown();
+			}
+			if (program != null) {
+				program.deleteExtractedLibraries();
 			}
 		}
-		return 0;
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Executes the info action.
 	 * 
 	 * @param args Command line arguments for the info action. 
 	 */
 	protected int info(String[] args) {
+		LOG.info("Running 'info' command.");
+
 		// Parse command line options
-		CommandLine line;
+		InfoOptions options;
 		try {
-			line = parser.parse(INFO_OPTIONS, args, false);
-			evaluateGeneralOptions(line);
-		}
-		catch (MissingOptionException e) {
-			return handleArgException(e);
+			options = CliFrontendParser.parseInfoCommand(args);
 		}
-		catch (MissingArgumentException e) {
+		catch (CliArgsException e) {
 			return handleArgException(e);
 		}
-		catch (UnrecognizedOptionException e) {
-			return handleArgException(e);
-		}
-		catch (Exception e) {
-			return handleError(e);
+		catch (Throwable t) {
+			return handleError(t);
 		}
 
-		if (printHelp) {
-			printHelpForInfo();
+		// evaluate help flag
+		if (options.isPrintHelp()) {
+			CliFrontendParser.printHelpForInfo();
 			return 0;
 		}
 
+		if (options.getJarFilePath() == null) {
+			return handleArgException(new CliArgsException("The program JAR file was not specified."));
+		}
+
 		// -------- build the packaged program -------------
 		
 		PackagedProgram program;
 		try {
-			program = buildProgram(line);
-		} catch (FileNotFoundException e) {
-			return handleError(e);
-		} catch (ProgramInvocationException e) {
-			return handleError(e);
-		} catch (Throwable t) {
-			return handleError(t);
+			LOG.info("Building program from JAR file");
+			program = buildProgram(options);
 		}
-		
-		int parallelism = -1;
-		if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
-			String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
-			try {
-				parallelism = Integer.parseInt(parString);
-			} catch (NumberFormatException e) {
-				System.out.println("The value " + parString + " is invalid for the degree of parallelism.");
-				return 1;
-			}
-			
-			if (parallelism <= 0) {
-				System.out.println("Invalid value for the degree-of-parallelism. Parallelism must be greater than zero.");
-				return 1;
-			}
+		catch (Throwable t) {
+			return handleError(t);
 		}
 		
 		try {
-			Client client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName());
+			int parallelism = options.getParallelism();
+
+			LOG.info("Creating program plan dump");
+			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
 			String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism);
 
 			if (jsonPlan != null) {
 				System.out.println("----------------------- Execution Plan -----------------------");
 				System.out.println(jsonPlan);
 				System.out.println("--------------------------------------------------------------");
-			} else {
-				System.out.println("JSON plan could not be compiled.");
 			}
-			
+			else {
+				System.out.println("JSON plan could not be generated.");
+			}
 			return 0;
 		}
 		catch (Throwable t) {
@@ -478,33 +373,27 @@ public class CliFrontend {
 	 * @param args Command line arguments for the list action.
 	 */
 	protected int list(String[] args) {
-		// Parse command line options
-		CommandLine line;
+		LOG.info("Running 'list' command.");
+
+		ListOptions options;
 		try {
-			line = parser.parse(LIST_OPTIONS, args, false);
-			evaluateGeneralOptions(line);
-		}
-		catch (MissingOptionException e) {
-			return handleArgException(e);
+			options = CliFrontendParser.parseListCommand(args);
 		}
-		catch (MissingArgumentException e) {
+		catch (CliArgsException e) {
 			return handleArgException(e);
 		}
-		catch (UnrecognizedOptionException e) {
-			return handleArgException(e);
-		}
-		catch (Exception e) {
-			return handleError(e);
+		catch (Throwable t) {
+			return handleError(t);
 		}
-		
-		if (printHelp) {
-			printHelpForList();
+
+		// evaluate help flag
+		if (options.isPrintHelp()) {
+			CliFrontendParser.printHelpForList();
 			return 0;
 		}
-		
-		// get list options
-		boolean running = line.hasOption(RUNNING_OPTION.getOpt());
-		boolean scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
+
+		boolean running = options.getRunning();
+		boolean scheduled = options.getScheduled();
 		
 		// print running and scheduled jobs if not option supplied
 		if (!running && !scheduled) {
@@ -513,87 +402,87 @@ public class CliFrontend {
 		}
 		
 		try {
-			ActorRef jobManager = getJobManager(line, getGlobalConfiguration());
-			if (jobManager == null) {
-				return 1;
-			}
+			ActorRef jobManager = getJobManager(options);
 
-			final Future<Object> response = Patterns.ask(jobManager,
-					JobManagerMessages.getRequestRunningJobs(), new Timeout(getAkkaTimeout()));
+			LOG.info("Connecting to JobManager to retrieve list of jobs");
+			Future<Object> response = Patterns.ask(jobManager,
+					JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
 
 			Object result;
 			try {
-				result = Await.result(response, getAkkaTimeout());
-			} catch (Exception exception) {
-				throw new IOException("Could not retrieve running jobs from job manager.",
-						exception);
+				result = Await.result(response, askTimeout);
+			}
+			catch (Exception e) {
+				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
 			}
 
-			if (!(result instanceof RunningJobs)) {
-				throw new RuntimeException("ReqeustRunningJobs requires a response of type " +
-						"RunningJobs. Instead the response is of type " + result.getClass() + ".");
-			} else {
-				Iterable<ExecutionGraph> jobs = ((RunningJobs) result).asJavaIterable();
+			if (result instanceof RunningJobsStatus) {
+				LOG.info("Successfully retrieved list of jobs");
+
+				List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
 
-				ArrayList<ExecutionGraph> runningJobs = null;
-				ArrayList<ExecutionGraph> scheduledJobs = null;
+				ArrayList<JobStatusMessage> runningJobs = null;
+				ArrayList<JobStatusMessage> scheduledJobs = null;
 				if (running) {
-					runningJobs = new ArrayList<ExecutionGraph>();
+					runningJobs = new ArrayList<JobStatusMessage>();
 				}
 				if (scheduled) {
-					scheduledJobs = new ArrayList<ExecutionGraph>();
+					scheduledJobs = new ArrayList<JobStatusMessage>();
 				}
 
-				for (ExecutionGraph rj : jobs) {
-
-					if (running && rj.getState().equals(JobStatus.RUNNING)) {
+				for (JobStatusMessage rj : jobs) {
+					if (running && rj.getJobState().equals(JobStatus.RUNNING)) {
 						runningJobs.add(rj);
 					}
-					if (scheduled && rj.getState().equals(JobStatus.CREATED)) {
+					if (scheduled && rj.getJobState().equals(JobStatus.CREATED)) {
 						scheduledJobs.add(rj);
 					}
 				}
 
 				SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
-				Comparator<ExecutionGraph> njec = new Comparator<ExecutionGraph>(){
-
+				Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){
 					@Override
-					public int compare(ExecutionGraph o1, ExecutionGraph o2) {
-						return (int)(o1.getStatusTimestamp(o1.getState())-o2.getStatusTimestamp(o2
-								.getState()));
+					public int compare(JobStatusMessage o1, JobStatusMessage o2) {
+						return (int)(o1.getStartTime()-o2.getStartTime());
 					}
 				};
 
 				if (running) {
 					if(runningJobs.size() == 0) {
 						System.out.println("No running jobs.");
-					} else {
+					}
+					else {
 						Collections.sort(runningJobs, njec);
 
 						System.out.println("------------------------ Running Jobs ------------------------");
-						for(ExecutionGraph rj : runningJobs) {
-							System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
-									+" : "+rj.getJobID().toString()+" : "+rj.getJobName());
+						for (JobStatusMessage rj : runningJobs) {
+							System.out.println(df.format(new Date(rj.getStartTime()))
+									+ " : " + rj.getJobId() + " : " + rj.getJobName());
 						}
 						System.out.println("--------------------------------------------------------------");
 					}
 				}
 				if (scheduled) {
-					if(scheduledJobs.size() == 0) {
+					if (scheduledJobs.size() == 0) {
 						System.out.println("No scheduled jobs.");
-					} else {
+					}
+					else {
 						Collections.sort(scheduledJobs, njec);
 
 						System.out.println("----------------------- Scheduled Jobs -----------------------");
-						for(ExecutionGraph rj : scheduledJobs) {
-							System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
-									+" : "+rj.getJobID().toString()+" : "+rj.getJobName());
+						for(JobStatusMessage rj : scheduledJobs) {
+							System.out.println(df.format(new Date(rj.getStartTime()))
+									+ " : " + rj.getJobId() + " : " + rj.getJobName());
 						}
 						System.out.println("--------------------------------------------------------------");
 					}
 				}
 				return 0;
 			}
+			else {
+				throw new Exception("ReqeustRunningJobs requires a response of type " +
+						"RunningJobs. Instead the response is of type " + result.getClass() + ".");
+			}
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -601,452 +490,299 @@ public class CliFrontend {
 	}
 	
 	/**
-	 * Executes the cancel action.
+	 * Executes the CANCEL action.
 	 * 
 	 * @param args Command line arguments for the cancel action.
 	 */
 	protected int cancel(String[] args) {
-		// Parse command line options
-		CommandLine line;
+		LOG.info("Running 'cancel' command.");
+
+		CancelOptions options;
 		try {
-			line = parser.parse(CANCEL_OPTIONS, args, false);
-			evaluateGeneralOptions(line);
-		}
-		catch (MissingOptionException e) {
-			return handleArgException(e);
+			options = CliFrontendParser.parseCancelCommand(args);
 		}
-		catch (MissingArgumentException e) {
+		catch (CliArgsException e) {
 			return handleArgException(e);
 		}
-		catch (UnrecognizedOptionException e) {
-			return handleArgException(e);
-		}
-		catch (Exception e) {
-			return handleError(e);
+		catch (Throwable t) {
+			return handleError(t);
 		}
-		
-		if (printHelp) {
-			printHelpForCancel();
+
+		// evaluate help flag
+		if (options.isPrintHelp()) {
+			CliFrontendParser.printHelpForCancel();
 			return 0;
 		}
 		
-		String[] cleanedArgs = line.getArgs();
+		String[] cleanedArgs = options.getArgs();
 		JobID jobId;
 
 		if (cleanedArgs.length > 0) {
 			String jobIdString = cleanedArgs[0];
 			try {
 				jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
+				LOG.error("Error: The value for the Job ID is not a valid ID.");
 				System.out.println("Error: The value for the Job ID is not a valid ID.");
 				return 1;
 			}
-		} else {
+		}
+		else {
+			LOG.error("Missing JobID in the command line arguments.");
 			System.out.println("Error: Specify a Job ID to cancel a job.");
 			return 1;
 		}
 		
 		try {
-			ActorRef jobManager = getJobManager(line, getGlobalConfiguration());
-
-			if (jobManager == null) {
-				return 1;
-			}
-
-			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId),
-					new Timeout(getAkkaTimeout()));
+			ActorRef jobManager = getJobManager(options);
+			Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
 
 			try {
-				Await.ready(response, getAkkaTimeout());
-			} catch (Exception exception) {
-				throw new IOException("Canceling the job with job ID " + jobId + " failed.",
-						exception);
+				Await.result(response, askTimeout);
+				return 0;
+			}
+			catch (Exception e) {
+				throw new Exception("Canceling the job with ID " + jobId + " failed.", e);
 			}
-
-			return 0;
 		}
 		catch (Throwable t) {
 			return handleError(t);
 		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Interaction with programs and JobManager
+	// --------------------------------------------------------------------------------------------
+
+	protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
+		LOG.info("Starting execution or program");
+		JobExecutionResult execResult;
+		try {
+			client.setPrintStatusDuringExecution(true);
+			execResult = client.run(program, parallelism, true);
+		}
+		catch (ProgramInvocationException e) {
+			return handleError(e);
+		}
+		finally {
+			program.deleteExtractedLibraries();
+		}
+
+		LOG.info("Program execution finished");
+
+		// we come here after the job has finished
+		if (execResult != null) {
+			System.out.println("Job Runtime: " + execResult.getNetRuntime());
+			Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
+			if (accumulatorsResult.size() > 0) {
+				System.out.println("Accumulator Results: ");
+				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+			}
+		}
+		return 0;
+	}
+
 	/**
-	 * @param line
-	 * 
+	 * Creates a Packaged program from the given command line options.
+	 *
 	 * @return A PackagedProgram (upon success)
 	 * @throws java.io.FileNotFoundException, org.apache.flink.client.program.ProgramInvocationException, java.lang.Throwable
 	 */
-	protected PackagedProgram buildProgram(CommandLine line) throws FileNotFoundException, ProgramInvocationException {
-		String[] programArgs = line.hasOption(ARGS_OPTION.getOpt()) ?
-				line.getOptionValues(ARGS_OPTION.getOpt()) :
-				line.getArgs();
-	
-		// take the jar file from the option, or as the first trailing parameter (if available)
-		String jarFilePath;
-		if (line.hasOption(JAR_OPTION.getOpt())) {
-			jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
-		}
-		else if (programArgs.length > 0) {
-			jarFilePath = programArgs[0];
-			programArgs = Arrays.copyOfRange(programArgs, 1, programArgs.length);
-		}
-		else {
-			throw new FileNotFoundException("Error: Jar file was not specified.");
+	protected PackagedProgram buildProgram(ProgramOptions options)
+			throws FileNotFoundException, ProgramInvocationException
+	{
+		String[] programArgs = options.getProgramArgs();
+		String jarFilePath = options.getJarFilePath();
+
+		if (jarFilePath == null) {
+			throw new IllegalArgumentException("The program JAR file was not specified.");
 		}
-		
+
 		File jarFile = new File(jarFilePath);
 		
 		// Check if JAR file exists
 		if (!jarFile.exists()) {
-			throw new FileNotFoundException("Error: Jar file does not exist: " + jarFile);
+			throw new FileNotFoundException("JAR file does not exist: " + jarFile);
 		}
 		else if (!jarFile.isFile()) {
-			throw new FileNotFoundException("Error: Jar file is not a file: " + jarFile);
+			throw new FileNotFoundException("JAR file is not a file: " + jarFile);
 		}
 		
 		// Get assembler class
-		String entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
-				line.getOptionValue(CLASS_OPTION.getOpt()) :
-				null;
+		String entryPointClass = options.getEntryPointClassName();
 
 		return entryPointClass == null ?
 				new PackagedProgram(jarFile, programArgs) :
 				new PackagedProgram(jarFile, entryPointClass, programArgs);
 	}
-	
-	protected String getJobManagerAddressString(CommandLine line) throws IOException {
-		Configuration configuration = getGlobalConfiguration();
-		
-		// first, check if the address comes from the command line option
-		if (line.hasOption(ADDRESS_OPTION.getOpt())) {
+
+
+	protected InetSocketAddress getJobManagerAddress(CommandLineOptions options) throws Exception {
+
+		// first, check if the address is specified as an option
+		if (options.getJobManagerAddress() != null) {
+			return parseJobManagerAddress(options.getJobManagerAddress());
+		}
+
+		// second, check whether the address was already parsed, or configured through the YARN properties
+		if (jobManagerAddress == null) {
+			// config file must have the address
+			String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+
+			// verify that there is a jobmanager address and port in the configuration
+			if (jobManagerHost == null) {
+				throw new Exception("Found no configuration in the config directory '" + configDirectory
+						+ "' that specifies the JobManager address.");
+			}
+
+			int jobManagerPort;
 			try {
-				return line.getOptionValue(ADDRESS_OPTION.getOpt());
+				jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
 			}
-			catch (Exception e) {
-				System.out.println("Error: The JobManager address has an invalid format. " + e.getMessage());
-				return null;
+			catch (NumberFormatException e) {
+				throw new Exception("Invalid value for the JobManager port (" +
+						ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + ") in the configuration.");
 			}
-		}
-		else {
-			Properties yarnProps = getYarnProperties();
-			if(yarnProps != null) {
-				try {
-					String address = yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
-					System.out.println("Found a yarn properties file (" + YARN_PROPERTIES_FILE + ") file, "
-							+ "using \""+address+"\" to connect to the JobManager");
-					return address;
-				} catch (Exception e) {
-					System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. "
-								+ e.getMessage());
-					return null;
-				}
-			} else {
-				// regular config file gives the address
-				String jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-				
-				// verify that there is a jobmanager address and port in the configuration
-				if (jobManagerAddress == null) {
-					System.out.println("Error: Found no configuration in the config directory '" +
-							getConfigurationDirectory() + "' that specifies the JobManager address.");
-					return null;
-				}
-				
-				int jobManagerPort;
-				try {
-					jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-				} catch (NumberFormatException e) {
-					System.out.println("Invalid value for the JobManager IPC port (" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
-							") in the configuration.");
-					return null;
-				}
-				
-				if (jobManagerPort == -1) {
-					System.out.println("Error: Found no configuration in the config directory '" +
-							getConfigurationDirectory() + "' that specifies the JobManager port.");
-					return null;
-				}
-				
-				return jobManagerAddress + ":" + jobManagerPort;
+
+			if (jobManagerPort == -1) {
+				throw new Exception("Found no configuration in the config directory '" + configDirectory
+						+ "' that specifies the JobManager port.");
 			}
-		}
-	}
-	
-	protected ActorRef getJobManager(CommandLine line, Configuration config) throws IOException {
-		//TODO: Get ActorRef from YarnCluster if we are in YARN mode.
-		String jobManagerAddressStr = getJobManagerAddressString(line);
-		if (jobManagerAddressStr == null) {
-			return null;
-		}
 
-		final ActorSystem actorSystem;
-		try {
-			scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
-			actorSystem = AkkaUtils.createActorSystem(config, new Some<scala.Tuple2<String, Object>>(systemEndpoint));
-		}
-		catch (Exception e) {
-			throw new IOException("Could not start actor system to communicate with JobManager", e);
+			jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort);
 		}
 
-		try {
-			InetSocketAddress address = RemoteExecutor.getInetFromHostport(jobManagerAddressStr);
-			return JobManager.getJobManagerRemoteReference(address, actorSystem, config);
-		}
-		finally {
-			actorSystem.shutdown();
-		}
+		return jobManagerAddress;
 	}
 	
+	protected ActorRef getJobManager(CommandLineOptions options) throws Exception {
+		//TODO: Get ActorRef from YarnCluster if we are in YARN mode.
 
-	public String getConfigurationDirectory() {
-		if (configurationDirectory == null) {
-			configurationDirectory = getConfigurationDirectoryFromEnv();
-		}
-		return configurationDirectory;
-	}
+		InetSocketAddress address = getJobManagerAddress(options);
 
-	/**
-	 * Reads configuration settings. The default path can be overridden
-	 * by setting the ENV variable "FLINK_CONF_DIR".
-	 *
-	 * @return Flink's global configuration
-	 */
-	protected Configuration getGlobalConfiguration() {
-		if (!globalConfigurationLoaded) {
-			String location = getConfigurationDirectory();
-			GlobalConfiguration.loadConfiguration(location);
-			// set default parallelization degree
-			Properties yarnProps;
+		// start an actor system if needed
+		if (this.actorSystem == null) {
+			LOG.info("Starting actor system to communicate with JobManager");
 			try {
-				yarnProps = getYarnProperties();
-				if(yarnProps != null) {
-					String propDegree = yarnProps.getProperty(YARN_PROPERTIES_DOP);
-					int paraDegree = -1;
-					if(propDegree != null) { // maybe the property is not set
-						paraDegree = Integer.valueOf(propDegree);
-					}
-					Configuration c = GlobalConfiguration.getConfiguration();
-					if(paraDegree != -1) {
-						c.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree);
-					}
-					// handle the YARN client's dynamic properties
-					String dynamicPropertiesEncoded = yarnProps.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
-					List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
-					for(Tuple2<String, String> dynamicProperty : dynamicProperties) {
-						c.setString(dynamicProperty.f0, dynamicProperty.f1);
-					}
-					GlobalConfiguration.includeConfiguration(c); // update config
-				}
-			} catch (IOException e) {
-				e.printStackTrace();
-				System.err.println("Error while loading YARN properties: " + e.getMessage());
+				scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
+				this.actorSystem = AkkaUtils.createActorSystem(config,
+						new Some<scala.Tuple2<String, Object>>(systemEndpoint));
+			}
+			catch (Exception e) {
+				throw new IOException("Could not start actor system to communicate with JobManager", e);
 			}
 
-			globalConfigurationLoaded = true;
+			LOG.info("Actor system successfully started");
 		}
-		return GlobalConfiguration.getConfiguration();
-	}
-	public static String getConfigurationDirectoryFromEnv() {
-		String location;
-		if (System.getenv(ENV_CONFIG_DIRECTORY) != null) {
-			location = System.getenv(ENV_CONFIG_DIRECTORY);
-		} else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
-			location = CONFIG_DIRECTORY_FALLBACK_1;
-		} else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
-			location = CONFIG_DIRECTORY_FALLBACK_2;
-		} else {
-			throw new RuntimeException("The configuration directory was not found. Please configure the '" +
-					ENV_CONFIG_DIRECTORY + "' environment variable properly.");
-		}
-		return location;
-	}
 
-	protected FiniteDuration getAkkaTimeout(){
-		Configuration config = getGlobalConfiguration();
-		return AkkaUtils.getTimeout(config);
+		LOG.info("Trying to lookup JobManager");
+		ActorRef jmActor = JobManager.getJobManagerRemoteReference(address, actorSystem, lookupTimeout);
+		LOG.info("JobManager is at " + jmActor.path());
+		return jmActor;
 	}
 	
-	public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
-		List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();
-		if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
-			String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-			for(String propLine : propertyLines) {
-				if(propLine == null) {
-					continue;
-				}
-				String[] kv = propLine.split("=");
-				if(kv != null && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
-					ret.add(new Tuple2<String, String>(kv[0], kv[1]));
-				}
-			}
-		}
-		return ret;
-	}
-	
-	protected Properties getYarnProperties() throws IOException {
-		if(!yarnPropertiesLoaded) {
-			String loc = getConfigurationDirectory();
-			File propertiesFile = new File(loc + '/' + YARN_PROPERTIES_FILE);
-			if (propertiesFile.exists()) {
-				Properties props = new Properties();
-				InputStream is = new FileInputStream( propertiesFile );
-				props.load(is);
-				yarnProperties = props;
-				is.close();
-			} else {
-				yarnProperties = null;
-			}
-			yarnPropertiesLoaded = true;
-		}
-		return yarnProperties;
-	}
+
 	
-	protected Client getClient(CommandLine line, ClassLoader classLoader, String programName) throws IOException {
-		String jmAddrString = getJobManagerAddressString(line);
-		InetSocketAddress jobManagerAddress = null;
-		if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
-			System.out.println("YARN cluster mode detected. Switching Log4j output to console");
+	protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName) throws Exception {
+
+		InetSocketAddress jobManagerAddress;
+
+		if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
+			logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
 
-			this.runInYarnCluster = true;
 			// user wants to run Flink in YARN cluster.
-			AbstractFlinkYarnClient flinkYarnClient = yarnSessionCLi.createFlinkYarnClient(line);
-			if(flinkYarnClient == null) {
+			CommandLine commandLine = options.getCommandLine();
+			AbstractFlinkYarnClient flinkYarnClient =
+					CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
+
+			if (flinkYarnClient == null) {
 				throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
 			}
 			try {
-				yarnCluster = flinkYarnClient.deploy("Flink Application: "+programName);
-			} catch(Exception e) {
+				yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName);
+			}
+			catch(Exception e) {
 				throw new RuntimeException("Error deploying the YARN cluster", e);
 			}
+
 			jobManagerAddress = yarnCluster.getJobManagerAddress();
-			System.out.println("YARN cluster started");
-			System.out.println("JobManager web interface address "+yarnCluster.getWebInterfaceURL());
-			System.out.println("Waiting until all TaskManagers have connected");
+
+			logAndSysout("YARN cluster started");
+			logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
+			logAndSysout("Waiting until all TaskManagers have connected");
+
 			while(true) {
 				FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
-				if(status != null) {
+				if (status != null) {
 					if (status.getNumberOfTaskManagers() < flinkYarnClient.getTaskManagerCount()) {
-						System.out.println("TaskManager status  (" + status.getNumberOfTaskManagers()+"/"+flinkYarnClient.getTaskManagerCount()+")");
+						logAndSysout("TaskManager status (" + status.getNumberOfTaskManagers() + "/" + flinkYarnClient.getTaskManagerCount() + ")");
 					} else {
-						System.out.println("Enough TaskManagers are connected");
+						logAndSysout("All TaskManagers are connected");
 						break;
 					}
 				} else {
-					System.out.println("No status updates from YARN cluster received so far. Waiting ...");
+					logAndSysout("No status updates from YARN cluster received so far. Waiting ...");
 				}
+
 				try {
 					Thread.sleep(500);
-				} catch (InterruptedException e) {
-					System.err.println("Thread as interrupted"); Thread.currentThread().interrupt();
+				}
+				catch (InterruptedException e) {
+					LOG.error("Interrupted while waiting for TaskManagers");
+					System.err.println("Thread is interrupted");
+					Thread.currentThread().interrupt();
 				}
 			}
-		} else {
-			jobManagerAddress = RemoteExecutor.getInetFromHostport(jmAddrString);
 		}
-		return new Client(jobManagerAddress, getGlobalConfiguration(), classLoader);
+		else {
+			jobManagerAddress = getJobManagerAddress(options);
+		}
+		return new Client(jobManagerAddress, config, classLoader);
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Logging and Exception Handling
+	// --------------------------------------------------------------------------------------------
+
 	/**
-	 * Prints the help for the client.
+	 * Displays an exception message for incorrect command line arguments.
+	 *
+	 * @param e The exception to display.
+	 * @return The return code for the process.
 	 */
-	private void printHelp() {
-		System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
-		System.out.println();
-		System.out.println("The following actions are available:");
-
-		/* The only general option is -h and the help messages are always printed on errors.
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setWidth(80);
-		formatter.setLeftPadding(5);
-		formatter.setSyntaxPrefix("  general options:");
-		formatter.printHelp(" ", GENERAL_OPTIONS);
-		*/
-		
-		printHelpForRun();
-		printHelpForInfo();
-		printHelpForList();
-		printHelpForCancel();
-
-		System.out.println();
-	}
-	
-	private void printHelpForRun() {
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setLeftPadding(5);
-		formatter.setWidth(80);
-
-		System.out.println("\nAction \"run\" compiles and runs a program.");
-		System.out.println("\n  Syntax: run [OPTIONS] <jar-file> <arguments>");
-		formatter.setSyntaxPrefix("  \"run\" action options:");
-		formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
-		formatter.setSyntaxPrefix("  Additional arguments if -m "+YARN_DEPLOY_JOBMANAGER+" is set:");
-		Options yarnOpts = new Options();
-		yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
-		formatter.printHelp(" ", yarnOpts);
-		System.out.println();
-	}
-	
-	private void printHelpForInfo() {
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setLeftPadding(5);
-		formatter.setWidth(80);
-		
-		System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON).");
-		System.out.println("\n  Syntax: info [OPTIONS] <jar-file> <arguments>");
-		formatter.setSyntaxPrefix("  \"info\" action options:");
-		formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
-		System.out.println();
-	}
-	
-	private void printHelpForList() {
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setLeftPadding(5);
-		formatter.setWidth(80);
-		
-		System.out.println("\nAction \"list\" lists running and scheduled programs.");
-		System.out.println("\n  Syntax: list [OPTIONS]");
-		formatter.setSyntaxPrefix("  \"list\" action options:");
-		formatter.printHelp(" ", getListOptions(new Options()));
-		System.out.println();
-	}
-	
-	private void printHelpForCancel() {
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setLeftPadding(5);
-		formatter.setWidth(80);
-		
-		System.out.println("\nAction \"cancel\" cancels a running program.");
-		System.out.println("\n  Syntax: cancel [OPTIONS] <Job ID>");
-		formatter.setSyntaxPrefix("  \"cancel\" action options:");
-		formatter.printHelp(" ", getCancelOptions(new Options()));
-		System.out.println();
-	}
-	
 	private int handleArgException(Exception e) {
+		LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage()));
+
 		System.out.println(e.getMessage());
 		System.out.println();
-		System.out.println("Specify the help option (-h or --help) to get help on the command.");
+		System.out.println("Use the help option (-h or --help) to get help on the command.");
 		return 1;
 	}
 	/**
-	 * Displays exceptions.
+	 * Displays an exception message.
 	 * 
 	 * @param t The exception to display.
+	 * @return The return code for the process.
 	 */
 	private int handleError(Throwable t) {
+		LOG.error("Error while running the command.", t);
+
 		t.printStackTrace();
 		System.err.println();
 		System.err.println("The exception above occurred while trying to run your command.");
 		return 1;
 	}
 
+	private void logAndSysout(String message) {
+		LOG.info(message);
+		System.out.println(message);
+	}
 
-	
+	// --------------------------------------------------------------------------------------------
+	//  Entry point for executable
+	// --------------------------------------------------------------------------------------------
 
-	private void evaluateGeneralOptions(CommandLine line) {
-		// check help flag
-		this.printHelp = line.hasOption(HELP_OPTION.getOpt());
-	}
-	
 	/**
 	 * Parses the command line arguments and starts the requested action.
 	 * 
@@ -1057,7 +793,7 @@ public class CliFrontend {
 		
 		// check for action
 		if (args.length < 1) {
-			printHelp();
+			CliFrontendParser.printHelp();
 			System.out.println("Please specify an action.");
 			return 1;
 		}
@@ -1071,8 +807,11 @@ public class CliFrontend {
 		// do action
 		if (action.equals(ACTION_RUN)) {
 			// run() needs to run in a secured environment for the optimizer.
-			if(SecurityUtils.isSecurityEnabled()) {
-				System.out.println("Secure Hadoop setup detected.");
+			if (SecurityUtils.isSecurityEnabled()) {
+				String message = "Secure Hadoop environment setup detected. Running in secure context.";
+				LOG.info(message);
+				System.out.println(message);
+
 				try {
 					return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
 						@Override
@@ -1085,16 +824,21 @@ public class CliFrontend {
 				}
 			}
 			return run(params);
-		} else if (action.equals(ACTION_LIST)) {
+		}
+		else if (action.equals(ACTION_LIST)) {
 			return list(params);
-		} else if (action.equals(ACTION_INFO)) {
+		}
+		else if (action.equals(ACTION_INFO)) {
 			return info(params);
-		} else if (action.equals(ACTION_CANCEL)) {
+		}
+		else if (action.equals(ACTION_CANCEL)) {
 			return cancel(params);
-		} else if (action.equals("-h") || action.equals("--help")) {
-			printHelp();
+		}
+		else if (action.equals("-h") || action.equals("--help")) {
+			CliFrontendParser.printHelp();
 			return 0;
-		} else {
+		}
+		else {
 			System.out.printf("\"%s\" is not a valid action.\n", action);
 			System.out.println();
 			System.out.println("Valid actions are \"run\", \"list\", \"info\", or \"cancel\".");
@@ -1104,15 +848,92 @@ public class CliFrontend {
 		}
 	}
 
+	public void shutdown() {
+		ActorSystem sys = this.actorSystem;
+		if (sys != null) {
+			this.actorSystem = null;
+			sys.shutdown();
+		}
+	}
 
 	/**
 	 * Submits the job based on the arguments
 	 */
-	public static void main(String[] args) throws ParseException {
+	public static void main(String[] args) {
+		EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
+		EnvironmentInformation.checkJavaVersion();
+
+		try {
+			CliFrontend cli = new CliFrontend();
+			int retCode = cli.parseParameters(args);
+			System.exit(retCode);
+		}
+		catch (Throwable t) {
+			LOG.error("Fatal error while running command line interface.", t);
+			t.printStackTrace();
+			System.exit(31);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Miscellaneous Utilities
+	// --------------------------------------------------------------------------------------------
+
+	private static InetSocketAddress parseJobManagerAddress(String hostAndPort) {
+		URI uri;
+		try {
+			uri = new URI("my://" + hostAndPort);
+		} catch (URISyntaxException e) {
+			throw new RuntimeException("Malformed address " + hostAndPort, e);
+		}
+		String host = uri.getHost();
+		int port = uri.getPort();
+		if (host == null || port == -1) {
+			throw new RuntimeException("Address is missing hostname or port " + hostAndPort);
+		}
+		return new InetSocketAddress(host, port);
+	}
 
-		CliFrontend cli = new CliFrontend();
-		int retCode = cli.parseParameters(args);
-		System.exit(retCode);
+	public static String getConfigurationDirectoryFromEnv() {
+		String location = System.getenv(ENV_CONFIG_DIRECTORY);
+
+		if (location != null) {
+			if (new File(location).exists()) {
+				return location;
+			}
+			else {
+				throw new RuntimeException("The config directory '" + location + "', specified in the '" +
+						ENV_CONFIG_DIRECTORY + "' environment variable, does not exist.");
+			}
+		}
+		else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
+			location = CONFIG_DIRECTORY_FALLBACK_1;
+		}
+		else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
+			location = CONFIG_DIRECTORY_FALLBACK_2;
+		}
+		else {
+			throw new RuntimeException("The configuration directory was not specified. " +
+					"Please specify the directory containing the configuration file through the '" +
+					ENV_CONFIG_DIRECTORY + "' environment variable.");
+		}
+		return location;
 	}
 
+	public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
+		List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();
+		if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
+			String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+			for(String propLine : propertyLines) {
+				if(propLine == null) {
+					continue;
+				}
+				String[] kv = propLine.split("=");
+				if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
+					ret.add(new Tuple2<String, String>(kv[0], kv[1]));
+				}
+			}
+		}
+		return ret;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
new file mode 100644
index 0000000..22e9ece
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the CANCEL command
+ */
+public class CancelOptions extends CommandLineOptions {
+
+	private final String[] args;
+
+	public CancelOptions(CommandLine line) {
+		super(line);
+		this.args = line.getArgs();
+	}
+
+	public String[] getArgs() {
+		return args == null ? new String[0] : args;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
new file mode 100644
index 0000000..932c66d
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+/**
+ * Special exception that is thrown when the command line parsing fails.
+ */
+public class CliArgsException extends Exception {
+
+	private static final long serialVersionUID = 1L;
+
+	public CliArgsException(String message) {
+		super(message);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
new file mode 100644
index 0000000..0f6ad24
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+
+/**
+ * A simple command line parser (based on Apache Commons CLI) that extracts command
+ * line options.
+ */
+public class CliFrontendParser {
+
+	/** command line interface of the YARN session, with a special initialization here
+	 *  to prefix all options with y/yarn. */
+	private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn");
+
+
+	static final Option HELP_OPTION = new Option("h", "help", false,
+												"Show the help message for the CLI Frontend or the action.");
+
+	static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
+
+	static final Option CLASS_OPTION = new Option("c", "class", true,
+			"Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
+					"JAR file does not specify the class in its manifest.");
+
+	static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
+			"The parallelism with which to run the program. Optional flag to override the default value " +
+					"specified in the configuration.");
+
+	static final Option ARGS_OPTION = new Option("a", "arguments", true,
+			"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
+
+	static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
+			"Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER
+					+ "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
+					"different JobManager than the one specified in the configuration.");
+
+	// list specific options
+	static final Option RUNNING_OPTION = new Option("r", "running", false,
+			"Show only running programs and their JobIDs");
+
+	static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false,
+			"Show only scheduled programs and their JobIDs");
+
+	static {
+		HELP_OPTION.setRequired(false);
+
+		JAR_OPTION.setRequired(false);
+		JAR_OPTION.setArgName("jarfile");
+
+		CLASS_OPTION.setRequired(false);
+		CLASS_OPTION.setArgName("classname");
+
+		ADDRESS_OPTION.setRequired(false);
+		ADDRESS_OPTION.setArgName("host:port");
+
+		PARALLELISM_OPTION.setRequired(false);
+		PARALLELISM_OPTION.setArgName("parallelism");
+
+		ARGS_OPTION.setRequired(false);
+		ARGS_OPTION.setArgName("programArgs");
+		ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
+
+		RUNNING_OPTION.setRequired(false);
+		SCHEDULED_OPTION.setRequired(false);
+	}
+
+	private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options()));
+	private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options()));
+	private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options()));
+	private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options()));
+
+
+	private static Options buildGeneralOptions(Options options) {
+		options.addOption(HELP_OPTION);
+		// backwards compatibility: ignore verbose flag (-v)
+		options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
+		return options;
+	}
+
+	public static Options getProgramSpecificOptions(Options options) {
+		options.addOption(JAR_OPTION);
+		options.addOption(CLASS_OPTION);
+		options.addOption(PARALLELISM_OPTION);
+		options.addOption(ARGS_OPTION);
+
+		// also add the YARN options so that the parser can parse them
+		yarnSessionCLi.getYARNSessionCLIOptions(options);
+		return options;
+	}
+
+	private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
+		options.addOption(CLASS_OPTION);
+		options.addOption(PARALLELISM_OPTION);
+		return options;
+	}
+
+	private static Options getRunOptions(Options options) {
+		Options o = getProgramSpecificOptions(options);
+		return getJobManagerAddressOption(o);
+	}
+
+	private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
+		Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+		return getJobManagerAddressOption(o);
+	}
+
+	private static Options getJobManagerAddressOption(Options options) {
+		options.addOption(ADDRESS_OPTION);
+		return options;
+	}
+
+	private static Options getInfoOptions(Options options) {
+		options = getProgramSpecificOptions(options);
+		options = getJobManagerAddressOption(options);
+		return options;
+	}
+
+	private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
+		options = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+		options = getJobManagerAddressOption(options);
+		return options;
+	}
+
+	private static Options getListOptions(Options options) {
+		options.addOption(RUNNING_OPTION);
+		options.addOption(SCHEDULED_OPTION);
+		options = getJobManagerAddressOption(options);
+		return options;
+	}
+
+	private static Options getCancelOptions(Options options) {
+		options = getJobManagerAddressOption(options);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Help
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Prints the help for the client.
+	 */
+	public static void printHelp() {
+		System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
+		System.out.println();
+		System.out.println("The following actions are available:");
+
+		printHelpForRun();
+		printHelpForInfo();
+		printHelpForList();
+		printHelpForCancel();
+
+		System.out.println();
+	}
+
+	public static void printHelpForRun() {
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setLeftPadding(5);
+		formatter.setWidth(80);
+
+		System.out.println("\nAction \"run\" compiles and runs a program.");
+		System.out.println("\n  Syntax: run [OPTIONS] <jar-file> <arguments>");
+		formatter.setSyntaxPrefix("  \"run\" action options:");
+		formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
+		formatter.setSyntaxPrefix("  Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
+		Options yarnOpts = new Options();
+		yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
+		formatter.printHelp(" ", yarnOpts);
+		System.out.println();
+	}
+
+	public static void printHelpForInfo() {
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setLeftPadding(5);
+		formatter.setWidth(80);
+
+		System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON).");
+		System.out.println("\n  Syntax: info [OPTIONS] <jar-file> <arguments>");
+		formatter.setSyntaxPrefix("  \"info\" action options:");
+		formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
+		System.out.println();
+	}
+
+	public static void printHelpForList() {
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setLeftPadding(5);
+		formatter.setWidth(80);
+
+		System.out.println("\nAction \"list\" lists running and scheduled programs.");
+		System.out.println("\n  Syntax: list [OPTIONS]");
+		formatter.setSyntaxPrefix("  \"list\" action options:");
+		formatter.printHelp(" ", getListOptions(new Options()));
+		System.out.println();
+	}
+
+	public static void printHelpForCancel() {
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setLeftPadding(5);
+		formatter.setWidth(80);
+
+		System.out.println("\nAction \"cancel\" cancels a running program.");
+		System.out.println("\n  Syntax: cancel [OPTIONS] <Job ID>");
+		formatter.setSyntaxPrefix("  \"cancel\" action options:");
+		formatter.printHelp(" ", getCancelOptions(new Options()));
+		System.out.println();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Line Parsing
+	// --------------------------------------------------------------------------------------------
+
+	public static RunOptions parseRunCommand(String[] args) throws CliArgsException {
+		try {
+			PosixParser parser = new PosixParser();
+			CommandLine line = parser.parse(RUN_OPTIONS, args, true);
+			return new RunOptions(line);
+		}
+		catch (ParseException e) {
+			throw new CliArgsException(e.getMessage());
+		}
+	}
+
+	public static ListOptions parseListCommand(String[] args) throws CliArgsException {
+		try {
+			PosixParser parser = new PosixParser();
+			CommandLine line = parser.parse(LIST_OPTIONS, args, false);
+			return new ListOptions(line);
+		}
+		catch (ParseException e) {
+			throw new CliArgsException(e.getMessage());
+		}
+	}
+
+	public static CancelOptions parseCancelCommand(String[] args) throws CliArgsException {
+		try {
+			PosixParser parser = new PosixParser();
+			CommandLine line = parser.parse(CANCEL_OPTIONS, args, false);
+			return new CancelOptions(line);
+		}
+		catch (ParseException e) {
+			throw new CliArgsException(e.getMessage());
+		}
+	}
+
+	public static InfoOptions parseInfoCommand(String[] args) throws CliArgsException {
+		try {
+			PosixParser parser = new PosixParser();
+			CommandLine line = parser.parse(INFO_OPTIONS, args, false);
+			return new InfoOptions(line);
+		}
+		catch (ParseException e) {
+			throw new CliArgsException(e.getMessage());
+		}
+	}
+
+	public static FlinkYarnSessionCli getFlinkYarnSessionCli() {
+		return yarnSessionCLi;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
new file mode 100644
index 0000000..f6f6319
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
+/**
+ * Base class for all options parsed from the command line.
+ * Contains options for printing help and the JobManager address.
+ */
+public abstract class CommandLineOptions {
+
+	private final CommandLine commandLine;
+
+	private final String jobManagerAddress;
+
+	private final boolean printHelp;
+
+
+	protected CommandLineOptions(CommandLine line) {
+		this.commandLine = line;
+		this.printHelp = line.hasOption(HELP_OPTION.getOpt());
+		this.jobManagerAddress = line.hasOption(ADDRESS_OPTION.getOpt()) ?
+				line.getOptionValue(ADDRESS_OPTION.getOpt()) : null;
+	}
+
+	public CommandLine getCommandLine() {
+		return commandLine;
+	}
+
+	public boolean isPrintHelp() {
+		return printHelp;
+	}
+
+	public String getJobManagerAddress() {
+		return jobManagerAddress;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
new file mode 100644
index 0000000..83f5c38
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the INFO command
+ */
+public class InfoOptions extends ProgramOptions {
+
+	public InfoOptions(CommandLine line) throws CliArgsException {
+		super(line);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
new file mode 100644
index 0000000..45f39a4
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.apache.flink.client.cli.CliFrontendParser.RUNNING_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SCHEDULED_OPTION;
+
+/**
+ * Command line options for the LIST command
+ */
+public class ListOptions extends CommandLineOptions {
+
+	private final boolean running;
+	private final boolean scheduled;
+
+	public ListOptions(CommandLine line) {
+		super(line);
+		this.running = line.hasOption(RUNNING_OPTION.getOpt());
+		this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
+	}
+
+	public boolean getRunning() {
+		return running;
+	}
+
+	public boolean getScheduled() {
+		return scheduled;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
new file mode 100644
index 0000000..5b24a41
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import java.util.Arrays;
+
+import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+
+/**
+ * Base class for command line options that refer to a JAR file program.
+ */
+public abstract class ProgramOptions extends CommandLineOptions {
+
+	private final String jarFilePath;
+
+	private final String entryPointClass;
+
+	private final String[] programArgs;
+
+	private final int parallelism;
+
+	protected ProgramOptions(CommandLine line) throws CliArgsException {
+		super(line);
+
+		String[] args = line.hasOption(ARGS_OPTION.getOpt()) ?
+				line.getOptionValues(ARGS_OPTION.getOpt()) :
+				line.getArgs();
+
+		if (line.hasOption(JAR_OPTION.getOpt())) {
+			this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
+		}
+		else if (args.length > 0) {
+			jarFilePath = args[0];
+			args = Arrays.copyOfRange(args, 1, args.length);
+		}
+		else {
+			jarFilePath = null;
+		}
+
+		this.programArgs = args;
+
+		this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
+				line.getOptionValue(CLASS_OPTION.getOpt()) : null;
+
+		if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
+			String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
+			try {
+				parallelism = Integer.parseInt(parString);
+				if (parallelism <= 0) {
+					throw new NumberFormatException();
+				}
+			}
+			catch (NumberFormatException e) {
+				throw new CliArgsException("The parallelism must be a positive number: " + parString);
+			}
+		}
+		else {
+			parallelism = -1;
+		}
+	}
+
+	public String getJarFilePath() {
+		return jarFilePath;
+	}
+
+	public String getEntryPointClassName() {
+		return entryPointClass;
+	}
+
+	public String[] getProgramArgs() {
+		return programArgs;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
new file mode 100644
index 0000000..2e4eb31
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the RUN command.
+ */
+public class RunOptions extends ProgramOptions {
+
+	public RunOptions(CommandLine line) throws CliArgsException {
+		super(line);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 5a032a0..f4a2dc9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -25,7 +25,6 @@ import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.List;
 
-import akka.remote.AssociationErrorEvent;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -219,8 +218,7 @@ public class Client {
 	}
 	
 	private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
-		JobGraph job = null;
-
+		JobGraph job;
 		if (optPlan instanceof StreamingPlan) {
 			job = ((StreamingPlan) optPlan).getJobGraph();
 		} else {
@@ -356,21 +354,6 @@ public class Client {
 		return new JobExecutionResult(-1, null);
 	}
 
-	private Throwable getAssociationError(List<AssociationErrorEvent> eventLog) {
-		int len = eventLog.size();
-		if (len > 0) {
-			AssociationErrorEvent e = eventLog.get(len - 1);
-			Throwable cause = e.getCause();
-			if (cause instanceof akka.remote.InvalidAssociation) {
-				return cause.getCause();
-			} else {
-				return cause;
-			}
-		} else {
-			return null;
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 	
 	public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {


Mime
View raw message