flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [5/6] flink git commit: [FLINK-1295][FLINK-883] Allow to deploy 'job only' YARN cluster. Add tests to YARN
Date Fri, 23 Jan 2015 17:43:58 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
deleted file mode 100644
index 1652705..0000000
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.yarn.Messages.StopYarnSession
-
-trait YarnTaskManager extends ActorLogMessages {
-  that: TaskManager =>
-
-  abstract override def receiveWithLogMessages: Receive = {
-    receiveYarnMessages orElse super.receiveWithLogMessages
-  }
-
-  def receiveYarnMessages: Receive = {
-    case StopYarnSession(status) => {
-      context.system.shutdown()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
deleted file mode 100644
index 245651d..0000000
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import akka.actor.{Props, ActorRef, ActorSystem}
-import com.typesafe.config.ConfigFactory
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.taskmanager.TaskManager
-
-object YarnUtils {
-  def createActorSystem(hostname: String, port: Int, configuration: Configuration): ActorSystem = {
-    val akkaConfig = ConfigFactory.parseString(AkkaUtils.getConfigString(hostname, port,
-      configuration) + getConfigString)
-
-    AkkaUtils.createActorSystem(akkaConfig)
-  }
-
-  def createActorSystem(): ActorSystem = {
-    val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString +
-      getConfigString)
-
-    AkkaUtils.createActorSystem(akkaConfig)
-  }
-
-  def getConfigString: String = {
-    """
-    |akka{
-    |  loglevel = "DEBUG"
-    |  stdout-loglevel = "DEBUG"
-    |  log-dead-letters-during-shutdown = off
-    |  log-dead-letters = off
-    |
-    |  actor {
-    |    provider = "akka.remote.RemoteActorRefProvider"
-    |  }
-    |
-    |  remote{
-    |    log-remote-lifecycle-events = off
-    |
-    |    netty{
-    |      tcp{
-    |        transport-class = "akka.remote.transport.netty.NettyTransport"
-    |        tcp-nodelay = on
-    |        maximum-frame-size = 1MB
-    |        execution-pool-size = 4
-    |      }
-    |    }
-    |  }
-    |}""".stripMargin
-  }
-
-  def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = {
-    val (hostname, port, config) = TaskManager.parseArgs(args)
-
-    val actorSystem = createActorSystem(hostname, port, config)
-
-    val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) =
-      TaskManager.parseConfiguration(hostname, config, false)
-
-    (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, jobManagerURL,
-      taskManagerConfig, networkConnectionConfiguration) with YarnTaskManager))(actorSystem))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml
index a20a375..ea45cdb 100644
--- a/flink-addons/pom.xml
+++ b/flink-addons/pom.xml
@@ -59,19 +59,6 @@ under the License.
 				<module>flink-tachyon</module>
 			</modules>
 		</profile>
-
-		<profile>
-			<id>include-yarn</id>
-			<activation>
-				<property>
-					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
-					<!--hadoop2--><name>!hadoop.profile</name>
-				</property>
-			</activation>
-			<modules>
-				<module>flink-yarn</module>
-			</modules>
-		</profile>
 	</profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 85506b2..1a96c9c 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -37,11 +37,6 @@ under the License.
 
 	<dependencies>
 		<dependency>
-			<groupId>commons-cli</groupId>
-			<artifactId>commons-cli</artifactId>
-		</dependency>
-
-		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
@@ -109,6 +104,11 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>commons-cli</groupId>
+			<artifactId>commons-cli</artifactId>
+		</dependency>
+
+		<dependency>
 			<groupId>com.typesafe.akka</groupId>
 			<artifactId>akka-testkit_2.10</artifactId>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 8092513..358783a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -56,6 +56,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -63,7 +64,12 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.util.StringUtils;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.PatternLayout;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -71,34 +77,40 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class CliFrontend {
 
+	// run job by deploying Flink into a YARN cluster, if this string is specified as the jobmanager address
+	public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
+
+	// command line interface of the YARN session, with a special initialization here to prefix all options with y/yarn.
+	private static FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn");
+
 	//actions
 	private static final String ACTION_RUN = "run";
 	private static final String ACTION_INFO = "info";
 	private static final String ACTION_LIST = "list";
 	private static final String ACTION_CANCEL = "cancel";
-	
+
 	// general options
 	private static final Option HELP_OPTION = new Option("h", "help", false, "Show the help for the CLI Frontend.");
 	private static final Option VERBOSE_OPTION = new Option("v", "verbose", false, "Print more detailed error messages.");
-	
+
 	// program (jar file) specific options
 	private static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
 	private static final Option CLASS_OPTION = new Option("c", "class", true, "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the JAR file does not specify the class in its manifest.");
 	private static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value specified in the configuration.");
 	private static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
-	
-	private static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.");
-	
+
+	private static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. Specify '"+YARN_DEPLOY_JOBMANAGER+"' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a different JobManager than the one specified in the configuration.");
+
 	// info specific options
 	private static final Option PLAN_OPTION = new Option("e", "executionplan", false, "Show optimized execution plan of the program (JSON)");
-	
+
 	// list specific options
 	private static final Option RUNNING_OPTION = new Option("r", "running", false, "Show running programs and their JobIDs");
 	private static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false, "Show scheduled prorgrams and their JobIDs");
-	
+
 	// canceling
 	private static final Option ID_OPTION = new Option("i", "jobid", true, "JobID of program to cancel");
-	
+
 	static {
 		initOptions();
 	}
@@ -126,6 +138,8 @@ public class CliFrontend {
 	public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
 	// this has to be a regex for String.split()
 	public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
+	private static final String DEFAULT_LOG4J_PATTERN_LAYOUT = "%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n";
+
 	
 
 	private CommandLineParser parser;
@@ -139,6 +153,15 @@ public class CliFrontend {
 	
 	private Properties yarnProperties;
 
+	// this flag indicates if the given Job is executed using a YARN cluster,
+	// started for this purpose.
+	private boolean runInYarnCluster = false;
+
+	private AbstractFlinkYarnCluster yarnCluster = null;
+
+	protected String configurationDirectory = null;
+
+
 	/**
 	 * Initializes the class
 	 */
@@ -193,6 +216,9 @@ public class CliFrontend {
 		options.addOption(CLASS_OPTION);
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(ARGS_OPTION);
+
+		// also add the YARN options so that the parser can parse them
+		yarnSessionCLi.getYARNSessionCLIOptions(options);
 		return options;
 	}
 	
@@ -309,7 +335,7 @@ public class CliFrontend {
 				return 1;
 			}
 			
-			Client client = getClient(line, program.getUserCodeClassLoader());
+			Client client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName());
 			if (client == null) {
 				printHelpForRun();
 				return 1;
@@ -332,8 +358,25 @@ public class CliFrontend {
 					return 1;
 				}
 			}
-		
-			return executeProgram(program, client, parallelism);
+			int programResult = executeProgram(program, client, parallelism);
+			// check if the program has been executed in a "job only" YARN cluster.
+			if(runInYarnCluster) {
+				List<String> msgs = yarnCluster.getNewMessages();
+				if(msgs != null && msgs.size() > 1) {
+					System.out.println("The following messages were created by the YARN cluster while running the Job:");
+					for(String msg : msgs) {
+						System.out.println(msg);
+					}
+				}
+				if(yarnCluster.hasFailed()) {
+					System.out.println("YARN cluster is in failed state!");
+					System.out.println("YARN Diagnostics: " + yarnCluster.getDiagnostics());
+				}
+				System.out.println("Shutting down YARN cluster");
+				yarnCluster.shutdown();
+			}
+
+			return programResult;
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -443,7 +486,7 @@ public class CliFrontend {
 		try {
 			// check for json plan request
 			if (plan) {
-				Client client = getClient(line, program.getUserCodeClassLoader());
+				Client client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName());
 				String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism);
 				
 				if (jsonPlan != null) {
@@ -693,14 +736,13 @@ public class CliFrontend {
 		}
 	}
 	
-	protected InetSocketAddress getJobManagerAddress(CommandLine line) throws IOException {
+	protected String getJobManagerAddressString(CommandLine line) throws IOException {
 		Configuration configuration = getGlobalConfiguration();
 		
 		// first, check if the address comes from the command line option
 		if (line.hasOption(ADDRESS_OPTION.getOpt())) {
 			try {
-				String address = line.getOptionValue(ADDRESS_OPTION.getOpt());
-				return RemoteExecutor.getInetFromHostport(address);
+				return line.getOptionValue(ADDRESS_OPTION.getOpt());
 			}
 			catch (Exception e) {
 				System.out.println("Error: The JobManager address has an invalid format. " + e.getMessage());
@@ -714,9 +756,9 @@ public class CliFrontend {
 					String address = yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
 					System.out.println("Found a yarn properties file (" + YARN_PROPERTIES_FILE + ") file, "
 							+ "using \""+address+"\" to connect to the JobManager");
-					return RemoteExecutor.getInetFromHostport(address);
+					return address;
 				} catch (Exception e) {
-					System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. " 
+					System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. "
 								+ e.getMessage());
 					return null;
 				}
@@ -726,7 +768,7 @@ public class CliFrontend {
 				
 				// verify that there is a jobmanager address and port in the configuration
 				if (jobManagerAddress == null) {
-					System.out.println("Error: Found no configuration in the config directory '" + 
+					System.out.println("Error: Found no configuration in the config directory '" +
 							getConfigurationDirectory() + "' that specifies the JobManager address.");
 					return null;
 				}
@@ -741,29 +783,37 @@ public class CliFrontend {
 				}
 				
 				if (jobManagerPort == -1) {
-					System.out.println("Error: Found no configuration in the config directory '" + 
+					System.out.println("Error: Found no configuration in the config directory '" +
 							getConfigurationDirectory() + "' that specifies the JobManager port.");
 					return null;
 				}
 				
-				return new InetSocketAddress(jobManagerAddress, jobManagerPort);
+				return jobManagerAddress + ":" + jobManagerPort;
 			}
 		}
 	}
 	
 	protected ActorRef getJobManager(CommandLine line) throws IOException {
-		InetSocketAddress jobManagerAddress = getJobManagerAddress(line);
-		if (jobManagerAddress == null) {
+		//TODO: Get ActorRef from YarnCluster if we are in YARN mode.
+		String jobManagerAddressStr = getJobManagerAddressString(line);
+		if (jobManagerAddressStr == null) {
 			return null;
 		}
 
-		return JobManager.getJobManager(jobManagerAddress,
+		return JobManager.getJobManager(RemoteExecutor.getInetFromHostport(jobManagerAddressStr),
 				ActorSystem.create("CliFrontendActorSystem", AkkaUtils
 						.getDefaultActorSystemConfig()),getAkkaTimeout());
 	}
 	
-	
-	protected String getConfigurationDirectory() {
+
+	public String getConfigurationDirectory() {
+		if(configurationDirectory == null) {
+			configurationDirectory = getConfigurationDirectoryFromEnv();
+		}
+		return configurationDirectory;
+	}
+
+	public static String getConfigurationDirectoryFromEnv() {
 		String location = null;
 		if (System.getenv(ENV_CONFIG_DIRECTORY) != null) {
 			location = System.getenv(ENV_CONFIG_DIRECTORY);
@@ -860,8 +910,50 @@ public class CliFrontend {
 		return yarnProperties;
 	}
 	
-	protected Client getClient(CommandLine line, ClassLoader classLoader) throws IOException {
-		return new Client(getJobManagerAddress(line), getGlobalConfiguration(), classLoader);
+	protected Client getClient(CommandLine line, ClassLoader classLoader, String programName) throws IOException {
+		String jmAddrString = getJobManagerAddressString(line);
+		InetSocketAddress jobManagerAddress = null;
+		if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
+			System.out.println("YARN cluster mode detected. Switching Log4j output to console");
+			LogManager.getRootLogger().addAppender(new ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT)));
+
+			this.runInYarnCluster = true;
+			// user wants to run Flink in YARN cluster.
+			AbstractFlinkYarnClient flinkYarnClient = yarnSessionCLi.createFlinkYarnClient(line);
+			if(flinkYarnClient == null) {
+				throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
+			}
+			try {
+				yarnCluster = flinkYarnClient.deploy("Flink Application: "+programName);
+			} catch(Exception e) {
+				throw new RuntimeException("Error deploying the YARN cluster", e);
+			}
+			jobManagerAddress = yarnCluster.getJobManagerAddress();
+			System.out.println("YARN cluster started");
+			System.out.println("JobManager web interface address "+yarnCluster.getWebInterfaceURL());
+			System.out.println("Waiting until all TaskManagers have connected");
+			while(true) {
+				FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
+				if(status != null) {
+					if (status.getNumberOfTaskManagers() < flinkYarnClient.getTaskManagerCount()) {
+						System.out.println("TaskManager status  (" + status.getNumberOfTaskManagers()+"/"+flinkYarnClient.getTaskManagerCount()+")");
+					} else {
+						System.out.println("Enough TaskManagers are connected");
+						break;
+					}
+				} else {
+					System.out.println("No status updates from YARN cluster received so far. Waiting ...");
+				}
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e) {
+					System.err.println("Thread as interrupted"); Thread.currentThread().interrupt();
+				}
+			}
+		} else {
+			jobManagerAddress = RemoteExecutor.getInetFromHostport(jmAddrString);
+		}
+		return new Client(jobManagerAddress, getGlobalConfiguration(), classLoader);
 	}
 
 	/**
@@ -891,6 +983,10 @@ public class CliFrontend {
 		System.out.println("\n  Syntax: run [OPTIONS] <jar-file> <arguments>");
 		formatter.setSyntaxPrefix("  \"run\" action arguments:");
 		formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
+		formatter.setSyntaxPrefix("  additional arguments if -m "+YARN_DEPLOY_JOBMANAGER+" is set:");
+		Options yarnOpts = new Options();
+		yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
+		formatter.printHelp(" ", yarnOpts);
 	}
 	
 	private void printHelpForInfo() {
@@ -990,14 +1086,15 @@ public class CliFrontend {
 		}
 	}
 
-	
 
 	/**
 	 * Submits the job based on the arguments
 	 */
 	public static void main(String[] args) throws ParseException {
+
 		CliFrontend cli = new CliFrontend();
 		int retCode = cli.parseParameters(args);
 		System.exit(retCode);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
new file mode 100644
index 0000000..6546ef0
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Class handling the command line interface to the YARN session.
+ */
+public class FlinkYarnSessionCli {
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
+
+	//------------------------------------ Constants   -------------------------
+
+	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
+	public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
+	public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
+
+
+	private static final int CLIENT_POLLING_INTERVALL = 3;
+
+
+	//------------------------------------ Command Line argument options -------------------------
+	// the prefix transformation is used by the CliFrontend static constructor.
+	private final Option QUERY;
+	// --- or ---
+	private final Option QUEUE;
+	private final Option SHIP_PATH;
+	private final Option FLINK_JAR;
+	private final Option JM_MEMORY;
+	private final Option TM_MEMORY;
+	private final Option CONTAINER;
+	private final Option SLOTS;
+
+	/**
+	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
+	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
+	 */
+	private final Option DYNAMIC_PROPERTIES;
+
+	private AbstractFlinkYarnCluster yarnCluster = null;
+
+	public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
+		QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
+		QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
+		SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
+		FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
+		JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+		TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
+		CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
+		SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
+		DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
+	}
+
+	public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
+
+		AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
+		if(flinkYarnClient == null) {
+			return null;
+		}
+
+		if(!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
+			LOG.error("Missing required argument " + CONTAINER.getOpt());
+			printUsage();
+			return null;
+		}
+		flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt())));
+
+		// Jar Path
+		Path localJarPath;
+		if(cmd.hasOption(FLINK_JAR.getOpt())) {
+			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
+			if(!userPath.startsWith("file://")) {
+				userPath = "file://" + userPath;
+			}
+			localJarPath = new Path(userPath);
+		} else {
+			LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar");
+			localJarPath = new Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
+			if(!localJarPath.toString().contains("uberjar")) {
+				// we need to have a proper uberjar because otherwise we don't have the required classes available on the cluster.
+				// most likely the user did try to start yarn in a regular hadoop2 flink build (not a yarn package) (using ./bin/flink -m yarn-cluster)
+				LOG.error("The detected jar file '"+localJarPath+"' is not a uberjar.");
+				return null;
+			}
+		}
+
+		flinkYarnClient.setLocalJarPath(localJarPath);
+
+		// Conf Path
+		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
+		GlobalConfiguration.loadConfiguration(confDirPath);
+		flinkYarnClient.setConfigurationDirectory(confDirPath);
+		File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
+		if(!confFile.exists()) {
+			LOG.error("Unable to locate configuration file in "+confFile);
+			return null;
+		}
+		Path confPath = new Path(confFile.getAbsolutePath());
+
+		flinkYarnClient.setConfigurationFilePath(confPath);
+
+		List<File> shipFiles = new ArrayList<File>();
+		// path to directory to ship
+		if(cmd.hasOption(SHIP_PATH.getOpt())) {
+			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
+			File shipDir = new File(shipPath);
+			if(shipDir.isDirectory()) {
+				shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
+					@Override
+					public boolean accept(File dir, String name) {
+						return !(name.equals(".") || name.equals(".."));
+					}
+				})));
+			} else {
+				LOG.warn("Ship directory is not a directory. Ignoring it.");
+			}
+		}
+
+		//check if there is a logback or log4j file
+		if(confDirPath.length() > 0) {
+			File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
+			if(logback.exists()) {
+				shipFiles.add(logback);
+				flinkYarnClient.setConfigurationFilePath(new Path(logback.toURI()));
+			}
+			File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
+			if(log4j.exists()) {
+				shipFiles.add(log4j);
+				if(flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
+					// this means there is already a logback configuration file --> fail
+					LOG.error("The configuration directory ('"+confDirPath+"') contains both LOG4J and Logback configuration files." +
+							"Please delete or rename one of them.");
+					return null;
+				} // else
+				flinkYarnClient.setConfigurationFilePath(new Path(log4j.toURI()));
+			}
+		}
+
+		flinkYarnClient.setShipFiles(shipFiles);
+
+		// queue
+		if(cmd.hasOption(QUEUE.getOpt())) {
+			flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
+		}
+
+		// JobManager Memory
+		if(cmd.hasOption(JM_MEMORY.getOpt())) {
+			int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+			flinkYarnClient.setJobManagerMemory(jmMemory);
+		}
+
+		// Task Managers memory
+		if(cmd.hasOption(TM_MEMORY.getOpt())) {
+			int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
+			flinkYarnClient.setTaskManagerMemory(tmMemory);
+		}
+
+		if(cmd.hasOption(SLOTS.getOpt())) {
+			int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
+			flinkYarnClient.setTaskManagerSlots(slots);
+		}
+
+		String[] dynamicProperties = null;
+		if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+		}
+		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+
+		flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
+
+		return flinkYarnClient;
+	}
+
+
+	private void printUsage() {
+		System.out.println("Usage:");
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setWidth(200);
+		formatter.setLeftPadding(5);
+		formatter.setSyntaxPrefix("   Required");
+		Options req = new Options();
+		req.addOption(CONTAINER);
+		formatter.printHelp(" ", req);
+
+		formatter.setSyntaxPrefix("   Optional");
+		Options opt = new Options();
+		opt.addOption(JM_MEMORY);
+		opt.addOption(TM_MEMORY);
+		opt.addOption(QUERY);
+		opt.addOption(QUEUE);
+		opt.addOption(SLOTS);
+		opt.addOption(DYNAMIC_PROPERTIES);
+		formatter.printHelp(" ", opt);
+	}
+
+	public static AbstractFlinkYarnClient getFlinkYarnClient() {
+		AbstractFlinkYarnClient yarnClient = null;
+		try {
+			Class<AbstractFlinkYarnClient> yarnClientClass = (Class<AbstractFlinkYarnClient>) Class.forName("org.apache.flink.yarn.FlinkYarnClient");
+			yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
+		} catch (ClassNotFoundException e) {
+			System.err.println("Unable to locate the Flink YARN Client. Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: "+e.getMessage());
+			e.printStackTrace(System.err);
+			return null; // make it obvious
+		}
+		return yarnClient;
+	}
+
+	private static void writeYarnProperties(Properties properties, File propertiesFile) {
+		try {
+			OutputStream out = new FileOutputStream(propertiesFile);
+			properties.store(out, "Generated YARN properties file");
+			out.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error writing the properties file", e);
+		}
+		propertiesFile.setReadable(true, false); // readable for all.
+	}
+
+	public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
+		final String HELP = "Available commands:\n" +
+				"help - show these commands\n" +
+				"stop - stop the YARN session";
+		int numTaskmanagers = 0;
+		try {
+			BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+			while (true) {
+				// ------------------ check if there are updates by the cluster -----------
+
+				FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
+				if(status != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
+					System.err.println("Number of connected TaskManagers changed to "+status.getNumberOfTaskManagers()+". "
+							+ "Slots available: "+status.getNumberOfSlots());
+					numTaskmanagers = status.getNumberOfTaskManagers();
+				}
+
+				List<String> messages = yarnCluster.getNewMessages();
+				if(messages != null && messages.size() > 0) {
+					System.err.println("New messages from the YARN cluster: ");
+					for(String msg : messages) {
+						System.err.println(msg);
+					}
+				}
+
+				if(yarnCluster.hasFailed()) {
+					System.err.println("The YARN cluster has failed");
+				}
+
+				// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
+				long startTime = System.currentTimeMillis();
+				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
+						&& !in.ready()) {
+					Thread.sleep(200);
+				}
+				//------------- handle interactive command by user. ----------------------
+
+				if (in.ready()) {
+					String command = in.readLine();
+					if(command.equals("quit") || command.equals("stop")) {
+						break; // leave loop, cli will stop cluster.
+					} else if(command.equals("help"))  {
+						System.err.println(HELP);
+					} else {
+						System.err.println("Unknown command '"+command+"'. Showing help: \n"+HELP);
+					}
+				}
+				if(yarnCluster.hasBeenStopped()) {
+					LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
+					break;
+				}
+			}
+		} catch(Exception e) {
+			LOG.warn("Exception while running the interactive command line interface", e);
+			return;
+		}
+	}
+
+	public static void main(String[] args) {
+		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
+		System.exit(cli.run(args));
+	}
+
+	public void getYARNSessionCLIOptions(Options options) {
+		options.addOption(FLINK_JAR);
+		options.addOption(JM_MEMORY);
+		options.addOption(TM_MEMORY);
+		options.addOption(CONTAINER);
+		options.addOption(QUEUE);
+		options.addOption(QUERY);
+		options.addOption(SHIP_PATH);
+		options.addOption(SLOTS);
+		options.addOption(DYNAMIC_PROPERTIES);
+	}
+
+	public int run(String[] args) {
+
+		//
+		//	Command Line Options
+		//
+		Options options = new Options();
+		getYARNSessionCLIOptions(options);
+
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = null;
+		try {
+			cmd = parser.parse(options, args);
+		} catch(Exception e) {
+			System.out.println(e.getMessage());
+			printUsage();
+			return 1;
+		}
+
+		// Query cluster for metrics
+		if(cmd.hasOption(QUERY.getOpt())) {
+			AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
+			String description = null;
+			try {
+				description = flinkYarnClient.getClusterDescription();
+			} catch (Exception e) {
+				System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage());
+				e.printStackTrace(System.err);
+				return 1;
+			}
+			System.out.println(description);
+			return 0;
+		} else {
+			AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd);
+
+			if(flinkYarnClient == null) {
+				System.err.println("Error while starting the YARN Client. Please check log output!");
+				return 1;
+			}
+
+
+			try {
+				yarnCluster = flinkYarnClient.deploy(null);
+			} catch (Exception e) {
+				System.err.println("Error while deploying YARN cluster: "+e.getMessage());
+				e.printStackTrace(System.err);
+				return 1;
+			}
+			//------------------ Cluster deployed, handle connection details
+			String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" +yarnCluster.getJobManagerAddress().getPort();
+			System.err.println("Flink JobManager is now running on " + jobManagerAddress);
+			System.err.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
+			// file that we write into the conf/ dir containing the jobManager address and the dop.
+			String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
+			File yarnPropertiesFile = new File(confDirPath + File.separator + CliFrontend.YARN_PROPERTIES_FILE);
+
+			Properties yarnProps = new Properties();
+			yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
+			if(flinkYarnClient.getTaskManagerSlots() != -1) {
+				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()) );
+			}
+			// add dynamic properties
+			if(flinkYarnClient.getDynamicPropertiesEncoded() != null) {
+				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, flinkYarnClient.getDynamicPropertiesEncoded());
+			}
+			writeYarnProperties(yarnProps, yarnPropertiesFile);
+
+			//------------------ Cluster running, let user control it ------------
+
+			runInteractiveCli(yarnCluster);
+
+			LOG.info("Command Line Interface requested session shutdown");
+			yarnCluster.shutdown();
+
+			try {
+				yarnPropertiesFile.delete();
+			} catch (Exception e) {
+				LOG.warn("Exception while deleting the JobManager address file", e);
+			}
+		}
+		return 0;
+	}
+
+	/**
+	 * Utility method for tests.
+	 */
+	public void stop() {
+		if(yarnCluster != null) {
+			LOG.info("Command line interface is shutting down the yarnCluster");
+			yarnCluster.shutdown();
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 00fba95..d8f1bf7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -317,7 +317,6 @@ public class Client {
 		}
 
 		try {
-
 			if (wait) {
 				return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index b6d4542..1bc533f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -113,7 +113,7 @@ public class CliFrontendInfoTest {
 		}
 
 		@Override
-		protected Client getClient(CommandLine line, ClassLoader loader) throws IOException {
+		protected Client getClient(CommandLine line, ClassLoader loader, String programName) throws IOException {
 			try {
 				return new TestClient(expectedDop);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
index 6a59019..ef7dff6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
@@ -29,7 +29,6 @@ import java.net.InetSocketAddress;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.CliFrontendTestUtils.TestingCliFrontend;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -55,7 +54,7 @@ public class CliFrontendJobManagerConnectionTest {
 				
 			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
 			
-			assertTrue(frontend.getJobManagerAddress(line) == null);
+			assertTrue(frontend.getJobManagerAddressString(line) == null);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -72,7 +71,7 @@ public class CliFrontendJobManagerConnectionTest {
 				
 			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDir());
 			
-			InetSocketAddress address = frontend.getJobManagerAddress(line);
+			InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
 			
 			assertNotNull(address);
 			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
@@ -93,7 +92,7 @@ public class CliFrontendJobManagerConnectionTest {
 				
 			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
 			
-			InetSocketAddress address = frontend.getJobManagerAddress(line);
+			InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
 			
 			assertNotNull(address);
 			assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
@@ -114,7 +113,7 @@ public class CliFrontendJobManagerConnectionTest {
 				
 			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithInvalidYarnFile());
 			
-			assertTrue(frontend.getJobManagerAddress(line) == null);
+			assertTrue(frontend.getJobManagerAddressString(line) == null);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -131,7 +130,7 @@ public class CliFrontendJobManagerConnectionTest {
 				
 			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDir());
 			
-			InetSocketAddress address = frontend.getJobManagerAddress(line);
+			InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
 			
 			assertNotNull(address);
 			assertEquals("10.221.130.22", address.getAddress().getHostAddress());
@@ -152,7 +151,7 @@ public class CliFrontendJobManagerConnectionTest {
 				
 			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
 			
-			InetSocketAddress address = frontend.getJobManagerAddress(line);
+			InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
 			
 			assertNotNull(address);
 			assertEquals("10.221.130.22", address.getAddress().getHostAddress());

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index b9af927..0cd7104 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -62,7 +62,7 @@ public class CliFrontendListCancelTest {
 			// test unrecognized option
 			{
 				String[] parameters = {"-v", "-l"};
-				CliFrontend testFrontend = new CliFrontend();
+				CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend();
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode == 2);
 			}
@@ -70,7 +70,7 @@ public class CliFrontendListCancelTest {
 			// test missing job id
 			{
 				String[] parameters = {};
-				CliFrontend testFrontend = new CliFrontend();
+				CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend();
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode != 0);
 			}
@@ -104,7 +104,7 @@ public class CliFrontendListCancelTest {
 			// test unrecognized option
 			{
 				String[] parameters = {"-v", "-k"};
-				CliFrontend testFrontend = new CliFrontend();
+				CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend();
 				int retCode = testFrontend.list(parameters);
 				assertTrue(retCode == 2);
 			}
@@ -112,7 +112,7 @@ public class CliFrontendListCancelTest {
 			// test missing flags
 			{
 				String[] parameters = {};
-				CliFrontend testFrontend = new CliFrontend();
+				CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend();
 				int retCode = testFrontend.list(parameters);
 				assertTrue(retCode != 0);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 9d4c6ae..95f6cb8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -113,19 +113,13 @@ public class CliFrontendTestUtils {
 	
 	public static class TestingCliFrontend extends CliFrontend {
 		
-		public final String configDir;
-		
+
 		public TestingCliFrontend() {
 			this(getConfigDir());
 		}
 		
 		public TestingCliFrontend(String configDir) {
-			this.configDir = configDir;
-		}
-		
-		@Override
-		protected String getConfigurationDirectory() {
-			return this.configDir;
+			this.configurationDirectory = configDir;
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d482e3c..969329e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -193,6 +193,20 @@ public final class ConfigConstants {
 	 */
 	public static final String JOBCLIENT_POLLING_INTERVAL_KEY = "jobclient.polling.interval";
 
+	// ------------------------ YARN Configuration ------------------------
+
+	/**
+	 * Percentage of heap space to remove from containers started by YARN.
+	 */
+	public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio";
+
+	/**
+	 * Upper bound for heap cutoff on YARN.
+	 * The "yarn.heap-cutoff-ratio" is removing a certain ratio from the heap.
+	 * This value is limiting this cutoff to a absolute value.
+	 */
+	public static final String YARN_HEAP_LIMIT_CAP = "yarn.heap-limit-cap";
+
 	// ------------------------ Hadoop Configuration ------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index fb45466..c3c7ae8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -242,7 +242,6 @@ public abstract class FileSystem {
 					// by now we know that the HadoopFileSystem wrapper can wrap the file system.
 					fs = instantiateHadoopFileSystemWrapper(wrapperClass);
 					fs.initialize(uri);
-					System.out.println("Initializing new instance of wrapper for "+wrapperClass);
 					CACHE.put(wrappedKey, fs);
 
 				} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index e753a05..91359c2 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -197,10 +197,12 @@ under the License.
 												implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
 											<resource>reference.conf</resource>
 										</transformer>
+										<!-- The service transformer is needed to merge META-INF/services files -->
+										<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
 										<transformer
 												implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
 											<manifestEntries>
-												<Main-Class>org.apache.flink.yarn.Client</Main-Class>
+												<Main-Class>org.apache.flink.yarn.FlinkYarnClient</Main-Class>
 											</manifestEntries>
 										</transformer>
 									</transformers>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/src/main/flink-bin/bin/flink
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink b/flink-dist/src/main/flink-bin/bin/flink
index e5dd3c6..12dd6b7 100755
--- a/flink-dist/src/main/flink-bin/bin/flink
+++ b/flink-dist/src/main/flink-bin/bin/flink
@@ -50,4 +50,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
 
 export FLINK_CONF_DIR
 
-$JAVA_RUN $JVM_ARGS $log_setting -classpath $CC_CLASSPATH org.apache.flink.client.CliFrontend $*
+# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
+$JAVA_RUN $JVM_ARGS $log_setting -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH org.apache.flink.client.CliFrontend $*

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 21da505..bf0775f 100644
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
 
 export FLINK_CONF_DIR
 
-$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
+$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../ship/ -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 9e0a55b..026758d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -34,7 +34,6 @@ import javax.servlet.http.HttpServletResponse;
 
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.Instance;
 
@@ -60,13 +59,13 @@ public class SetupInfoServlet extends HttpServlet {
 	private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class);
 	
 	
-	final private Configuration globalC;
+	final private Configuration configuration;
 	final private ActorRef jobmanager;
 	final private FiniteDuration timeout;
 	
 	
-	public SetupInfoServlet(ActorRef jm, FiniteDuration timeout) {
-		globalC = GlobalConfiguration.getConfiguration();
+	public SetupInfoServlet(Configuration conf, ActorRef jm, FiniteDuration timeout) {
+		configuration = conf;
 		this.jobmanager = jm;
 		this.timeout = timeout;
 	}
@@ -74,7 +73,6 @@ public class SetupInfoServlet extends HttpServlet {
 	@Override
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
 			throws ServletException, IOException {
-		
 		resp.setStatus(HttpServletResponse.SC_OK);
 		resp.setContentType("application/json");
 		
@@ -86,15 +84,15 @@ public class SetupInfoServlet extends HttpServlet {
 	}
 	
 	private void writeGlobalConfiguration(HttpServletResponse resp) throws IOException {
-		
-		Set<String> keys = globalC.keySet();
+		Set<String> keys = configuration.keySet();
 		List<String> list = new ArrayList<String>(keys);
 		Collections.sort(list);
 		
 		JSONObject obj = new JSONObject();
 		for (String k : list) {
 			try {
-				obj.put(k, globalC.getString(k, ""));
+
+				obj.put(k, configuration.getString(k, ""));
 			} catch (JSONException e) {
 				LOG.warn("Json object creation failed", e);
 			}
@@ -151,7 +149,7 @@ public class SetupInfoServlet extends HttpServlet {
 	private static final Comparator<Instance> INSTANCE_SORTER = new Comparator<Instance>() {
 		@Override
 		public int compare(Instance o1, Instance o2) {
-			return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
+		return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
 		}
 	};
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 24dbaf7..2b92f9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.eclipse.jetty.http.security.Constraint;
 import org.eclipse.jetty.security.ConstraintMapping;
 import org.eclipse.jetty.security.ConstraintSecurityHandler;
@@ -88,7 +87,7 @@ public class WebInfoServer {
 		
 		// if no explicit configuration is given, use the global configuration
 		if (config == null) {
-			config = GlobalConfiguration.getConfiguration();
+			throw new IllegalArgumentException("No Configuration has been passed to the web server");
 		}
 		
 		this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
@@ -133,7 +132,7 @@ public class WebInfoServer {
 		servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager,
 				archive, timeout)), "/jobsInfo");
 		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
-		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager, timeout)),
+		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)),
 				"/setupInfo");
 		servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
 
@@ -206,4 +205,8 @@ public class WebInfoServer {
 		server.stop();
 	}
 
+	public Server getServer() {
+		return server;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index ec2633c..5a5f515 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -76,7 +76,7 @@ public class NetUtils {
 						case ADDRESS:
 							if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
 								if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
-									LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+									LOG.info("Determined " + i + " as the machine's own IP address");
 									return i;
 								}
 							}
@@ -86,7 +86,7 @@ public class NetUtils {
 						case SLOW_CONNECT:
 							boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
 							if (correct) {
-								LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+								LOG.info("Determined " + i + " as the machine's own IP address");
 								return i;
 							}
 							break;

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
new file mode 100644
index 0000000..7f2b14e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.yarn;
+
+import org.apache.hadoop.fs.Path;
+import java.io.File;
+import java.util.List;
+
+public abstract class AbstractFlinkYarnClient {
+
+	// ---- Setter for YARN Cluster properties ----- //
+	public abstract void setJobManagerMemory(int memoryMB);
+	public abstract void setTaskManagerMemory(int memoryMB);
+	public abstract void setTaskManagerSlots(int slots);
+	public abstract int getTaskManagerSlots();
+	public abstract void setQueue(String queue);
+	public abstract void setLocalJarPath(Path localJarPath);
+	public abstract void setConfigurationFilePath(Path confPath);
+	public abstract void setFlinkLoggingConfigurationPath(Path logConfPath);
+	public abstract Path getFlinkLoggingConfigurationPath();
+	public abstract void setTaskManagerCount(int tmCount);
+	public abstract int getTaskManagerCount();
+	public abstract void setConfigurationDirectory(String confDirPath);
+	// List of files to transfer to the YARN containers.
+	public abstract void setShipFiles(List<File> shipFiles);
+	public abstract void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded);
+	public abstract String getDynamicPropertiesEncoded();
+
+	// ---- Operations on the YARN cluster ----- //
+	public abstract String getClusterDescription() throws Exception;
+
+	public abstract AbstractFlinkYarnCluster deploy(String clusterName) throws Exception;
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
new file mode 100644
index 0000000..58eaf1d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public abstract class AbstractFlinkYarnCluster {
+
+	public abstract InetSocketAddress getJobManagerAddress();
+
+	public abstract String getWebInterfaceURL();
+
+	public abstract void shutdown();
+
+	public abstract boolean hasBeenStopped();
+
+	public abstract FlinkYarnClusterStatus getClusterStatus();
+
+	public abstract boolean hasFailed();
+
+	/**
+	 * @return Diagnostics if the Cluster is in "failed" state.
+	 */
+	public abstract String getDiagnostics();
+
+	public abstract List<String> getNewMessages();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
new file mode 100644
index 0000000..2aaaaa0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.yarn;
+
+import java.io.Serializable;
+
+
+public class FlinkYarnClusterStatus implements Serializable {
+	private int numberOfTaskManagers;
+	private int numberOfSlots;
+
+	public FlinkYarnClusterStatus() {
+	}
+
+	public FlinkYarnClusterStatus(int numberOfTaskManagers, int numberOfSlots) {
+		this.numberOfTaskManagers = numberOfTaskManagers;
+		this.numberOfSlots = numberOfSlots;
+	}
+
+	public int getNumberOfTaskManagers() {
+		return numberOfTaskManagers;
+	}
+
+	public void setNumberOfTaskManagers(int numberOfTaskManagers) {
+		this.numberOfTaskManagers = numberOfTaskManagers;
+	}
+
+	public int getNumberOfSlots() {
+		return numberOfSlots;
+	}
+
+	public void setNumberOfSlots(int numberOfSlots) {
+		this.numberOfSlots = numberOfSlots;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		FlinkYarnClusterStatus that = (FlinkYarnClusterStatus) o;
+
+		if (numberOfSlots != that.numberOfSlots) {
+			return false;
+		}
+		if (numberOfTaskManagers != that.numberOfTaskManagers) {
+			return false;
+		}
+
+		return true;
+	}
+
+	@Override
+	public int hashCode() {
+		int result = numberOfTaskManagers;
+		result = 31 * result + numberOfSlots;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "FlinkYarnClusterStatus{" +
+				"numberOfTaskManagers=" + numberOfTaskManagers +
+				", numberOfSlots=" + numberOfSlots +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index cf678b0..1f2791c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -53,6 +53,9 @@ object AkkaUtils {
   }
 
   def createActorSystem(akkaConfig: Config): ActorSystem = {
+    if(LOG.isDebugEnabled) {
+      LOG.debug(s"Using akka config to create actor system: $akkaConfig")
+    }
     ActorSystem.create("flink", akkaConfig)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 6a4beed..195a0b6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -76,7 +76,8 @@ class JobClientListener(client: ActorRef) extends Actor with ActorLogMessages wi
       client ! Failure(new JobExecutionException(msg, false))
       self ! PoisonPill
     case msg =>
-      println(msg.toString)
+      // we have to use System.out.println here to avoid erroneous behavior for output redirection
+      System.out.println(msg.toString)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cd1119d..37a41a5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -437,6 +437,13 @@ class JobManager(val configuration: Configuration)
     }
   }
 
+  /**
+   * Handle unmatched messages with an exception.
+   */
+  override def unhandled(message: Any): Unit = {
+    throw new RuntimeException("Received unknown message " + message)
+  }
+
   private def removeJob(jobID: JobID): Unit = {
     currentJobs.remove(jobID) match {
       case Some((eg, _)) => archive ! ArchiveExecutionGraph(jobID, eg)

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
new file mode 100644
index 0000000..0cccf3a
--- /dev/null
+++ b/flink-yarn-tests/pom.xml
@@ -0,0 +1,121 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<!--
+	There is a separate "flink-yarn-tests" package that expects the "flink-dist" package
+	to be build before.
+	We need the YARN fat jar build by flink-dist for the tests.
+	-->
+	
+	<artifactId>flink-yarn-tests</artifactId>
+	<name>flink-yarn-tests</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>hadoop-core</artifactId>
+					<groupId>org.apache.hadoop</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-yarn</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-yarn-client</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-yarn-common</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-yarn-server-tests</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minicluster</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-mapreduce-client-core</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Set the root directory for all tests to the project root.
+			We need this to be able to locate the final build (in flink-dist)
+			-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<!-- Enforce single threaded execution due to port conflicts with the mini yarn cluster -->
+					<forkCount>1</forkCount>
+					<workingDirectory>../</workingDirectory>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<workingDirectory>../</workingDirectory>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..9fd2541
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+public class UtilsTest {
+
+	@Test
+	public void testUberjarLocator() {
+		File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter());
+		Assert.assertNotNull(dir);
+		dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
+		Assert.assertTrue(dir.exists());
+		Assert.assertTrue(dir.isDirectory());
+		Assert.assertTrue(dir.toString().contains("flink-dist"));
+		List<String> files = Arrays.asList(dir.list());
+		Assert.assertTrue(files.contains("lib"));
+		Assert.assertTrue(files.contains("bin"));
+		Assert.assertTrue(files.contains("conf"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
new file mode 100644
index 0000000..25e1aa2
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This test starts a MiniYARNCluster with a CapacityScheduler.
+ * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team".
+ */
+public class YARNSessionCapacitySchedulerIT extends YarnTestBase {
+	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerIT.class);
+
+	@BeforeClass
+	public static void setup() {
+		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
+		yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
+		yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
+		yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
+		startYARNWithConfig(yarnConfiguration);
+	}
+
+	/**
+	 * Test regular operation, including command line parameter parsing.
+	 */
+	@Test
+	public void testClientStartup() {
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+						"-n", "1",
+						"-jm", "512",
+						"-tm", "1024", "-qu", "qa-team"},
+				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+	}
+
+
+	/**
+	 * Test deployment to non-existing queue. (user-reported error)
+	 * Deployment to the queue is possible because there are no queues, so we don't check.
+	 */
+	@Test
+	public void testNonexistingQueue() {
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "1",
+				"-jm", "512",
+				"-tm", "1024",
+				"-qu", "doesntExist"}, "Error while deploying YARN cluster: The specified queue 'doesntExist' does not exist. Available queues: default, qa-team, ", RunTypes.YARN_SESSION);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
new file mode 100644
index 0000000..5f8ae87
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+
+/**
+ * This test starts a MiniYARNCluster with a FIFO scheudler.
+ * There are no queues for that scheduler.
+ */
+public class YARNSessionFIFOIT extends YarnTestBase {
+	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+
+	/*
+	Override init with FIFO scheduler.
+	 */
+	@BeforeClass
+	public static void setup() {
+		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
+		startYARNWithConfig(yarnConfiguration);
+	}
+	/**
+	 * Test regular operation, including command line parameter parsing.
+	 */
+	@Test
+	public void testClientStartup() {
+		LOG.info("Starting testClientStartup()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+						"-n", "1",
+						"-jm", "512",
+						"-tm", "1024"},
+				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+		LOG.info("Finished testClientStartup()");
+	}
+
+	/**
+	 * Test querying the YARN cluster.
+	 *
+	 * This test validates through 666*2 cores in the "cluster".
+	 */
+	@Test
+	public void testQueryCluster() {
+		LOG.info("Starting testQueryCluster()");
+		runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
+		LOG.info("Finished testQueryCluster()");
+	}
+
+	/**
+	 * Test deployment to non-existing queue. (user-reported error)
+	 * Deployment to the queue is possible because there are no queues, so we don't check.
+	 */
+	@Test
+	public void testNonexistingQueue() {
+		LOG.info("Starting testNonexistingQueue()");
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "1",
+				"-jm", "512",
+				"-tm", "1024",
+				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+		LOG.info("Finished testNonexistingQueue()");
+	}
+
+	/**
+	 * Test requesting more resources than available.
+	 */
+	@Test
+	public void testMoreNodesThanAvailable() {
+		LOG.info("Starting testMoreNodesThanAvailable()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "10",
+				"-jm", "512",
+				"-tm", "1024"}, "Error while deploying YARN cluster: This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available.", RunTypes.YARN_SESSION);
+		LOG.info("Finished testMoreNodesThanAvailable()");
+	}
+
+	/**
+	 * The test cluster has the following resources:
+	 * - 2 Nodes with 4096 MB each.
+	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+	 *
+	 * We allocate:
+	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
+	 * 5 TaskManagers with 1585 MB
+	 *
+	 * user sees a total request of: 8181 MB (fits)
+	 * system sees a total request of: 8437 (doesn't fit due to min alloc mb)
+	 */
+	@Test
+	public void testResourceComputation() {
+		LOG.info("Starting testResourceComputation()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "5",
+				"-jm", "256",
+				"-tm", "1585"}, "Error while deploying YARN cluster: This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.", RunTypes.YARN_SESSION);
+		LOG.info("Finished testResourceComputation()");
+	}
+
+	/**
+	 * The test cluster has the following resources:
+	 * - 2 Nodes with 4096 MB each.
+	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+	 *
+	 * We allocate:
+	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
+	 * 2 TaskManagers with 3840 MB
+	 *
+	 * the user sees a total request of: 7936 MB (fits)
+	 * the system sees a request of: 8192 MB (fits)
+	 * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit.
+	 *
+	 * --> check if the system properly rejects allocating this session.
+	 */
+	@Test
+	public void testfullAlloc() {
+		LOG.info("Starting testfullAlloc()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "2",
+				"-jm", "256",
+				"-tm", "3840"}, "Error while deploying YARN cluster: There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
+				"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]", RunTypes.YARN_SESSION);
+		LOG.info("Finished testfullAlloc()");
+	}
+
+	/**
+	 * Test per-job yarn cluster
+	 *
+	 * This also tests the prefixed CliFrontend options for the YARN case
+	 */
+	@Test
+	public void perJobYarnCluster() {
+		LOG.info("Starting perJobYarnCluster()");
+		File exampleJarLocation = YarnTestBase.findFile(".", new ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount here.
+		runWithArgs(new String[] {"run", "-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(),
+				"-yn", "1",
+				"-yjm", "512",
+				"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND);
+		LOG.info("Finished perJobYarnCluster()");
+	}
+
+	/**
+	 * Test the YARN Java API
+	 */
+	@Test
+	public void testJavaAPI() {
+		final int WAIT_TIME = 15;
+		LOG.info("Starting testJavaAPI()");
+
+		AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
+		flinkYarnClient.setTaskManagerCount(1);
+		flinkYarnClient.setJobManagerMemory(512);
+		flinkYarnClient.setTaskManagerMemory(512);
+		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+		String confDirPath = System.getenv("FLINK_CONF_DIR");
+		flinkYarnClient.setConfigurationDirectory(confDirPath);
+		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
+
+		// deploy
+		AbstractFlinkYarnCluster yarnCluster = null;
+		try {
+			yarnCluster = flinkYarnClient.deploy(null);
+		} catch (Exception e) {
+			System.err.println("Error while deploying YARN cluster: "+e.getMessage());
+			e.printStackTrace(System.err);
+			Assert.fail();
+		}
+		FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
+		for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException e) {
+				LOG.warn("Interrupted", e);
+				Thread.interrupted();
+			}
+			FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
+			if(status != null && status.equals(expectedStatus)) {
+				LOG.info("Cluster reached status " + status);
+				break; // all good, cluster started
+			}
+			if(second > WAIT_TIME) {
+				// we waited for 15 seconds. cluster didn't come up correctly
+				Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds");
+			}
+		}
+
+		// use the cluster
+		Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+		Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+
+		LOG.info("Shutting down cluster. All tests passed");
+		// shutdown cluster
+		yarnCluster.shutdown();
+		LOG.info("Finished testJavaAPI()");
+	}
+}


Mime
View raw message