flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [4/6] flink git commit: [FLINK-1295][FLINK-883] Allow to deploy 'job only' YARN cluster. Add tests to YARN
Date Fri, 23 Jan 2015 17:43:57 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
new file mode 100644
index 0000000..b12952a
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+
+	private final static PrintStream originalStdout = System.out;
+	private final static PrintStream originalStderr = System.err;
+
+
+	// Temp directory which is deleted after the unit test.
+	private static TemporaryFolder tmp = new TemporaryFolder();
+
+	protected static MiniYARNCluster yarnCluster = null;
+
+	protected static File flinkUberjar;
+
+	protected static final Configuration yarnConfiguration;
+	static {
+		yarnConfiguration = new YarnConfiguration();
+		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways
+		yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+		yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
+		yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+		yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+		yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+		yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+		yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
+		// so we have to change the number of cores for testing.
+	}
+
+	// This code is taken from: http://stackoverflow.com/a/7201825/568695
+	// it changes the environment variables of this JVM. Use only for testing purposes!
+	private static void setEnv(Map<String, String> newenv) {
+		try {
+			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
+			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
+			theEnvironmentField.setAccessible(true);
+			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
+			env.putAll(newenv);
+			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+			theCaseInsensitiveEnvironmentField.setAccessible(true);
+			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
+			cienv.putAll(newenv);
+		} catch (NoSuchFieldException e) {
+			try {
+				Class[] classes = Collections.class.getDeclaredClasses();
+				Map<String, String> env = System.getenv();
+				for (Class cl : classes) {
+					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+						Field field = cl.getDeclaredField("m");
+						field.setAccessible(true);
+						Object obj = field.get(env);
+						Map<String, String> map = (Map<String, String>) obj;
+						map.clear();
+						map.putAll(newenv);
+					}
+				}
+			} catch (Exception e2) {
+				throw new RuntimeException(e2);
+			}
+		} catch (Exception e1) {
+			throw new RuntimeException(e1);
+		}
+	}
+
+	/**
+	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
+	 */
+	@After
+	public void sleep() {
+		try {
+			Thread.sleep(500);
+		} catch (InterruptedException e) {
+			Assert.fail("Should not happen");
+		}
+	}
+
+	@Before
+	public void checkClusterEmpty() throws IOException, YarnException {
+		YarnClient yarnClient = YarnClient.createYarnClient();
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
+		List<ApplicationReport> apps = yarnClient.getApplications();
+		for(ApplicationReport app : apps) {
+			if(app.getYarnApplicationState() != YarnApplicationState.FINISHED) {
+				Assert.fail("There is at least one application on the cluster is not finished");
+			}
+		}
+	}
+
+	/**
+	 * Locate a file or diretory directory
+	 */
+	public static File findFile(String startAt, FilenameFilter fnf) {
+		File root = new File(startAt);
+		String[] files = root.list();
+		if(files == null) {
+			return null;
+		}
+		for(String file : files) {
+
+			File f = new File(startAt + File.separator + file);
+			if(f.isDirectory()) {
+				File r = findFile(f.getAbsolutePath(), fnf);
+				if(r != null) {
+					return r;
+				}
+			} else if (fnf.accept(f.getParentFile(), f.getName())) {
+				return f;
+			}
+
+		}
+		return null;
+	}
+
+	/**
+	 * Filter to find root dir of the flink-yarn dist.
+	 */
+	public static class RootDirFilenameFilter implements FilenameFilter {
+		@Override
+		public boolean accept(File dir, String name) {
+			return name.endsWith("yarn-uberjar.jar") && dir.toString().contains("/lib");
+		}
+	}
+	public static class ContainsName implements FilenameFilter {
+		private String name;
+		private String excludeInPath = null;
+
+		public ContainsName(String name) {
+			this.name = name;
+		}
+
+		public ContainsName(String name, String excludeInPath) {
+			this.name = name;
+			this.excludeInPath = excludeInPath;
+		}
+
+		@Override
+		public boolean accept(File dir, String name) {
+			if(excludeInPath == null) {
+				return name.contains(this.name);
+			} else {
+				return name.contains(this.name) && !dir.toString().contains(excludeInPath);
+			}
+		}
+	}
+
+	public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
+		tmp.create();
+		File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
+
+		FileWriter writer = new FileWriter(yarnSiteXML);
+		yarnConf.writeXml(writer);
+		writer.flush();
+		writer.close();
+		return yarnSiteXML;
+	}
+
+	public static void startYARNWithConfig(Configuration conf) {
+		flinkUberjar = findFile(".", new RootDirFilenameFilter());
+		Assert.assertNotNull(flinkUberjar);
+		String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
+
+		if (!flinkUberjar.exists()) {
+			Assert.fail("Unable to locate yarn-uberjar.jar");
+		}
+
+		try {
+			LOG.info("Starting up MiniYARN cluster");
+			if (yarnCluster == null) {
+				yarnCluster = new MiniYARNCluster(YARNSessionFIFOIT.class.getName(), 2, 1, 1);
+
+				yarnCluster.init(conf);
+				yarnCluster.start();
+			}
+
+			Map<String, String> map = new HashMap<String, String>(System.getenv());
+			File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName("flink-conf.yaml"));
+			Assert.assertNotNull(flinkConfFilePath);
+			map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent());
+			File yarnConfFile = writeYarnSiteConfigXML(conf);
+			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
+			setEnv(map);
+
+			Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
+		} catch (Exception ex) {
+			ex.printStackTrace();
+			LOG.error("setup failure", ex);
+			Assert.fail();
+		}
+	}
+
+	/**
+	 * Default @BeforeClass impl. Overwrite this for passing a different configuration
+	 */
+	@BeforeClass
+	public static void setup() {
+		startYARNWithConfig(yarnConfiguration);
+	}
+
+	// -------------------------- Runner -------------------------- //
+
+	private static ByteArrayOutputStream outContent;
+	private static ByteArrayOutputStream errContent;
+	enum RunTypes {
+		YARN_SESSION, CLI_FRONTEND
+	}
+
+	protected void runWithArgs(String[] args, String expect, RunTypes type) {
+		LOG.info("Running with args "+ Arrays.toString(args));
+
+		outContent = new ByteArrayOutputStream();
+		errContent = new ByteArrayOutputStream();
+		System.setOut(new PrintStream(outContent));
+		System.setErr(new PrintStream(errContent));
+
+
+		final int START_TIMEOUT_SECONDS = 60;
+
+		Runner runner = new Runner(args, type);
+		runner.start();
+
+		boolean expectedStringSeen = false;
+		for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException e) {
+				Assert.fail("Interruption not expected");
+			}
+			// check output for correct TaskManager startup.
+			if(outContent.toString().contains(expect)
+					|| errContent.toString().contains(expect) ) {
+				expectedStringSeen = true;
+				LOG.info("Found expected output in redirected streams");
+				// send "stop" command to command line interface
+				runner.sendStop();
+				// wait for the thread to stop
+				try {
+					runner.join(1000);
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while stopping runner", e);
+				}
+				LOG.warn("stopped");
+				break;
+			}
+			// check if thread died
+			if(!runner.isAlive()) {
+				sendOutput();
+				Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue());
+			}
+		}
+
+		sendOutput();
+		Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
+				"expected string did not show up", expectedStringSeen);
+		LOG.info("Test was successful");
+	}
+
+	private static void sendOutput() {
+		System.setOut(originalStdout);
+		System.setErr(originalStderr);
+
+		LOG.info("Sending stdout content through logger: \n\n"+outContent.toString()+"\n\n");
+		LOG.info("Sending stderr content through logger: \n\n"+errContent.toString()+"\n\n");
+	}
+
+	public static class Runner extends Thread {
+		private final String[] args;
+		private int returnValue;
+		private RunTypes type;
+		private FlinkYarnSessionCli yCli;
+
+		public Runner(String[] args, RunTypes type) {
+			this.args = args;
+			this.type = type;
+		}
+
+		public int getReturnValue() {
+			return returnValue;
+		}
+
+		@Override
+		public void run() {
+			switch(type) {
+				case YARN_SESSION:
+					yCli = new FlinkYarnSessionCli("", "");
+					returnValue = yCli.run(args);
+					break;
+				case CLI_FRONTEND:
+					CliFrontend cli = new CliFrontend();
+					returnValue = cli.parseParameters(args);
+					break;
+				default:
+					throw new RuntimeException("Unknown type " + type);
+			}
+
+			if(returnValue != 0) {
+				Assert.fail("The YARN session returned with non-null value="+returnValue);
+			}
+		}
+
+		public void sendStop() {
+			if(yCli != null) {
+				yCli.stop();
+			}
+		}
+	}
+
+	// -------------------------- Tear down -------------------------- //
+
+	@AfterClass
+	public static void tearDown() {
+		//shutdown YARN cluster
+		if (yarnCluster != null) {
+			LOG.info("shutdown MiniYarn cluster");
+			yarnCluster.stop();
+			yarnCluster = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..26d6a12
--- /dev/null
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, file
+
+# Log all infos in the given file
+log4j.appender.file=org.apache.log4j.ConsoleAppender
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
new file mode 100644
index 0000000..1569f15
--- /dev/null
+++ b/flink-yarn/pom.xml
@@ -0,0 +1,228 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+	
+	<artifactId>flink-yarn</artifactId>
+	<name>flink-yarn</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>hadoop-core</artifactId>
+					<groupId>org.apache.hadoop</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-actor_2.10</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-remote_2.10</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-camel_2.10</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.camel</groupId>
+			<artifactId>camel-stream</artifactId>
+			<version>2.14.0</version>
+		</dependency>
+
+		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-yarn-client</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-mapreduce-client-core</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
new file mode 100644
index 0000000..c922963
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * All classes in this package contain code taken from
+ * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and
+ * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ *
+ * The Flink jar is uploaded to HDFS by this client.
+ * The application master and all the TaskManager containers get the jar file downloaded
+ * by YARN into their local fs.
+ *
+ */
+public class FlinkYarnClient extends AbstractFlinkYarnClient {
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class);
+
+	/**
+	 * Constants,
+	 * all starting with ENV_ are used as environment variables to pass values from the Client
+	 * to the Application Master.
+	 */
+	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+	public final static String ENV_APP_ID = "_APP_ID";
+	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+	public static final String ENV_SLOTS = "_SLOTS";
+	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
+
+	private static final String DEFAULT_QUEUE_NAME = "default";
+
+
+	/**
+	 * Minimum memory requirements, checked by the Client.
+	 */
+	private static final int MIN_JM_MEMORY = 128;
+	private static final int MIN_TM_MEMORY = 128;
+
+	private Configuration conf;
+	private YarnClient yarnClient;
+	private YarnClientApplication yarnApplication;
+
+
+	/**
+	 * Files (usually in a distributed file system) used for the YARN session of Flink.
+	 * Contains configuration files and jar files.
+	 */
+	private Path sessionFilesDir;
+
+	/**
+	 * If the user has specified a different number of slots, we store them here
+	 */
+	private int slots = -1;
+
+	private int jobManagerMemoryMb = 512;
+
+	private int taskManagerMemoryMb = 512;
+
+	private int taskManagerCount = 1;
+
+	private String yarnQueue = DEFAULT_QUEUE_NAME;
+
+	private String configurationDirectory;
+
+	private Path flinkConfigurationPath;
+
+	private Path flinkLoggingConfigurationPath; // optional
+
+	private Path flinkJarPath;
+
+	private String dynamicPropertiesEncoded;
+
+	private List<File> shipFiles = new ArrayList<File>();
+
+
+	public FlinkYarnClient() {
+		// Check if security is enabled
+		if(UserGroupInformation.isSecurityEnabled()) {
+			throw new RuntimeException("Flink YARN client does not have security support right now."
+					+ "File a bug, we will fix it asap");
+		}
+		conf = Utils.initializeYarnConfiguration();
+		if(this.yarnClient == null) {
+			// Create yarnClient
+			yarnClient = YarnClient.createYarnClient();
+			yarnClient.init(conf);
+			yarnClient.start();
+		}
+	}
+
+	@Override
+	public void setJobManagerMemory(int memoryMb) {
+		if(memoryMb < MIN_JM_MEMORY) {
+			throw new IllegalArgumentException("The JobManager memory is below the minimum required memory amount "
+					+ "of "+MIN_JM_MEMORY+" MB");
+		}
+		this.jobManagerMemoryMb = memoryMb;
+	}
+
+	@Override
+	public void setTaskManagerMemory(int memoryMb) {
+		if(memoryMb < MIN_TM_MEMORY) {
+			throw new IllegalArgumentException("The TaskManager memory is below the minimum required memory amount "
+					+ "of "+MIN_TM_MEMORY+" MB");
+		}
+		this.taskManagerMemoryMb = memoryMb;
+	}
+
+	@Override
+	public void setTaskManagerSlots(int slots) {
+		if(slots <= 0) {
+			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
+		}
+		this.slots = slots;
+	}
+
+	@Override
+	public int getTaskManagerSlots() {
+		return this.slots;
+	}
+
+	@Override
+	public void setQueue(String queue) {
+		this.yarnQueue = queue;
+	}
+
+	@Override
+	public void setLocalJarPath(Path localJarPath) {
+		if(!localJarPath.toString().endsWith("jar")) {
+			throw new IllegalArgumentException("The passed jar path ('"+localJarPath+"') does not end with the 'jar' extension");
+		}
+		this.flinkJarPath = localJarPath;
+	}
+
+	@Override
+	public void setConfigurationFilePath(Path confPath) {
+		flinkConfigurationPath = confPath;
+	}
+
+	public void setConfigurationDirectory(String configurationDirectory) {
+		this.configurationDirectory = configurationDirectory;
+	}
+
+	@Override
+	public void setFlinkLoggingConfigurationPath(Path logConfPath) {
+		flinkLoggingConfigurationPath = logConfPath;
+	}
+
+	@Override
+	public Path getFlinkLoggingConfigurationPath() {
+		return flinkLoggingConfigurationPath;
+	}
+
+	@Override
+	public void setTaskManagerCount(int tmCount) {
+		if(tmCount < 1) {
+			throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
+		}
+		this.taskManagerCount = tmCount;
+	}
+
+	@Override
+	public int getTaskManagerCount() {
+		return this.taskManagerCount;
+	}
+
+	@Override
+	public void setShipFiles(List<File> shipFiles) {
+		this.shipFiles.addAll(shipFiles);
+	}
+
+	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
+		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
+	}
+
+	@Override
+	public String getDynamicPropertiesEncoded() {
+		return this.dynamicPropertiesEncoded;
+	}
+
+
+	public void isReadyForDepoyment() throws YarnDeploymentException {
+		if(taskManagerCount <= 0) {
+			throw new YarnDeploymentException("Taskmanager count must be positive");
+		}
+		if(this.flinkJarPath == null) {
+			throw new YarnDeploymentException("The Flink jar path is null");
+		}
+		if(this.configurationDirectory == null) {
+			throw new YarnDeploymentException("Configuration directory not set");
+		}
+		if(this.flinkConfigurationPath == null) {
+			throw new YarnDeploymentException("Configuration path not set");
+		}
+
+	}
+
+	public static boolean allocateResource(int[] nodeManagers, int toAllocate) {
+		for(int i = 0; i < nodeManagers.length; i++) {
+			if(nodeManagers[i] >= toAllocate) {
+				nodeManagers[i] -= toAllocate;
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * This method will block until the ApplicationMaster/JobManager have been
+	 * deployed on YARN.
+	 */
+	@Override
+	public AbstractFlinkYarnCluster deploy(String clusterName) throws Exception {
+		isReadyForDepoyment();
+
+		LOG.info("Using values:");
+		LOG.info("\tTaskManager count = " + taskManagerCount);
+		LOG.info("\tJobManager memory = " + jobManagerMemoryMb);
+		LOG.info("\tTaskManager memory = " + taskManagerMemoryMb);
+
+		// Create application via yarnClient
+		yarnApplication = yarnClient.createApplication();
+		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+
+		// ------------------ Check if the specified queue exists --------------
+
+		List<QueueInfo> queues = yarnClient.getAllQueues();
+		if(queues.size() > 0) { // check only if there are queues configured.
+			boolean queueFound = false;
+			for (QueueInfo queue : queues) {
+				if (queue.getQueueName().equals(this.yarnQueue)) {
+					queueFound = true;
+					break;
+				}
+			}
+			if (!queueFound) {
+				String queueNames = "";
+				for (QueueInfo queue : queues) {
+					queueNames += queue.getQueueName() + ", ";
+				}
+				throw new YarnDeploymentException("The specified queue '" + this.yarnQueue + "' does not exist. " +
+						"Available queues: " + queueNames);
+			}
+		} else {
+			LOG.debug("The YARN cluster does not have any queues configured");
+		}
+
+		// ------------------ Check if the YARN Cluster has the requested resources --------------
+
+		// the yarnMinAllocationMB specifies the smallest possible container allocation size.
+		// all allocations below this value are automatically set to this value.
+		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+		if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
+			LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+					+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '"+yarnMinAllocationMB+"'. Please increase the memory size." +
+					"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
+					"you requested will start.");
+		}
+
+		// set the memory to minAllocationMB to do the next checks correctly
+		if(jobManagerMemoryMb < yarnMinAllocationMB) {
+			jobManagerMemoryMb =  yarnMinAllocationMB;
+		}
+		if(taskManagerMemoryMb < yarnMinAllocationMB) {
+			taskManagerMemoryMb =  yarnMinAllocationMB;
+		}
+
+		Resource maxRes = appResponse.getMaximumResourceCapability();
+		final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
+		if(jobManagerMemoryMb > maxRes.getMemory() ) {
+			failSessionDuringDeployment();
+			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
+					+ "Maximum Memory: "+maxRes.getMemory() + "MB Requested: "+jobManagerMemoryMb+"MB. " + NOTE);
+		}
+
+		if(taskManagerMemoryMb > maxRes.getMemory() ) {
+			failSessionDuringDeployment();
+			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
+					+ "Maximum Memory: " + maxRes.getMemory() + " Requested: "+taskManagerMemoryMb + "MB. " + NOTE);
+		}
+
+
+		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
+		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+			failSessionDuringDeployment();
+			throw new YarnDeploymentException("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+					+ "There are currently only " + freeClusterMem.totalFreeMemory+"MB available.");
+
+		}
+		if( taskManagerMemoryMb > freeClusterMem.containerLimit) {
+			failSessionDuringDeployment();
+			throw new YarnDeploymentException("The requested amount of memory for the TaskManagers ("+taskManagerMemoryMb+"MB) is more than "
+					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
+		}
+		if( jobManagerMemoryMb > freeClusterMem.containerLimit) {
+			failSessionDuringDeployment();
+			throw new YarnDeploymentException("The requested amount of memory for the JobManager ("+jobManagerMemoryMb+"MB) is more than "
+					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
+		}
+
+		// ----------------- check if the requested containers fit into the cluster.
+
+		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
+		// first, allocate the jobManager somewhere.
+		if(!allocateResource(nmFree, jobManagerMemoryMb)) {
+			failSessionDuringDeployment();
+			throw new YarnDeploymentException("Unable to find a NodeManager that can fit the JobManager/Application master. " +
+					"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: "+Arrays.toString(freeClusterMem.nodeManagersFree));
+		}
+		// allocate TaskManagers
+		for(int i = 0; i < taskManagerCount; i++) {
+			if(!allocateResource(nmFree, taskManagerMemoryMb)) {
+				failSessionDuringDeployment();
+				throw new YarnDeploymentException("There is not enough memory available in the YARN cluster. " +
+						"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
+						"NodeManagers available: "+Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+						"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
+						"the following NodeManagers are available: " + Arrays.toString(nmFree) );
+			}
+		}
+
+		// ------------------ Prepare Application Master Container  ------------------------------
+
+		// respect custom JVM options in the YAML file
+		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+		String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+		boolean hasLogback = new File(logbackFile).exists();
+		String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+
+		boolean hasLog4j = new File(log4jFile).exists();
+		if(hasLogback) {
+			shipFiles.add(new File(logbackFile));
+		}
+		if(hasLog4j) {
+			shipFiles.add(new File(log4jFile));
+		}
+
+		// Set up the container launch context for the application master
+		ContainerLaunchContext amContainer = Records
+				.newRecord(ContainerLaunchContext.class);
+
+		String amCommand = "$JAVA_HOME/bin/java"
+					+ " -Xmx"+Utils.calculateHeapSize(jobManagerMemoryMb)+"M " +javaOpts;
+
+		if(hasLogback || hasLog4j) {
+			amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-main.log\"";
+		}
+
+		if(hasLogback) {
+			amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+		}
+		if(hasLog4j) {
+			amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+		}
+
+		amCommand 	+= " "+ApplicationMaster.class.getName()+" "
+					+ " 1>"
+					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
+					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
+		amContainer.setCommands(Collections.singletonList(amCommand));
+
+		LOG.debug("Application Master start command: "+amCommand);
+
+		// intialize HDFS
+		// Copy the application master jar to the filesystem
+		// Create a local resource to point to the destination jar path
+		final FileSystem fs = FileSystem.get(conf);
+
+		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
+		if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+				fs.getScheme().startsWith("file")) {
+			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+					+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
+					+ "The Flink YARN client needs to store its files in a distributed file system");
+		}
+
+		// Set-up ApplicationSubmissionContext for the application
+		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
+		final ApplicationId appId = appContext.getApplicationId();
+
+		// Setup jar for ApplicationMaster
+		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+		Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
+		Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
+		localResources.put("flink.jar", appMasterJar);
+		localResources.put("flink-conf.yaml", flinkConf);
+
+
+		// setup security tokens (code from apache storm)
+		final Path[] paths = new Path[3 + shipFiles.size()];
+		StringBuffer envShipFileList = new StringBuffer();
+		// upload ship files
+		for (int i = 0; i < shipFiles.size(); i++) {
+			File shipFile = shipFiles.get(i);
+			LocalResource shipResources = Records.newRecord(LocalResource.class);
+			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
+			paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
+					shipLocalPath, shipResources, fs.getHomeDirectory());
+			localResources.put(shipFile.getName(), shipResources);
+
+			envShipFileList.append(paths[3 + i]);
+			if(i+1 < shipFiles.size()) {
+				envShipFileList.append(',');
+			}
+		}
+
+		paths[0] = remotePathJar;
+		paths[1] = remotePathConf;
+		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+		fs.setPermission(sessionFilesDir, permission); // set permission for path.
+		Utils.setTokensFor(amContainer, paths, this.conf);
+
+
+		amContainer.setLocalResources(localResources);
+		fs.close();
+
+		// Setup CLASSPATH for ApplicationMaster
+		Map<String, String> appMasterEnv = new HashMap<String, String>();
+		Utils.setupEnv(conf, appMasterEnv);
+		// set configuration values
+		appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, String.valueOf(taskManagerCount));
+		appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
+		appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() );
+		appMasterEnv.put(FlinkYarnClient.ENV_APP_ID, appId.toString());
+		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
+		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
+		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
+		appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots));
+		if(dynamicPropertiesEncoded != null) {
+			appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		}
+
+		amContainer.setEnvironment(appMasterEnv);
+
+		// Set up resource type requirements for ApplicationMaster
+		Resource capability = Records.newRecord(Resource.class);
+		capability.setMemory(jobManagerMemoryMb);
+		capability.setVirtualCores(1);
+
+		if(clusterName == null) {
+			clusterName = "Flink session with "+taskManagerCount+" TaskManagers";
+		}
+
+		appContext.setApplicationName(clusterName); // application name
+		appContext.setApplicationType("Apache Flink");
+		appContext.setAMContainerSpec(amContainer);
+		appContext.setResource(capability);
+		appContext.setQueue(yarnQueue);
+
+
+		LOG.info("Submitting application master " + appId);
+		yarnClient.submitApplication(appContext);
+
+		LOG.info("Waiting for the cluster to be allocated");
+		int waittime = 0;
+		loop: while( true ) {
+			ApplicationReport report = yarnClient.getApplicationReport(appId);
+			YarnApplicationState appState = report.getYarnApplicationState();
+			switch(appState) {
+				case FAILED:
+				case FINISHED:
+				case KILLED:
+					throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
+							+ appState +" during deployment. \n" +
+							"Diagnostics from YARN: "+report.getDiagnostics() + "\n" +
+							"If log aggregation is enabled on your cluster, use this command to further invesitage the issue:\n" +
+							"yarn logs -applicationId "+appId);
+					//break ..
+				case RUNNING:
+					LOG.info("YARN application has been deployed successfully.");
+					break loop;
+				default:
+					LOG.info("Deploying cluster, current state "+appState);
+					if(waittime > 60000) {
+						LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
+					}
+
+			}
+			waittime += 1000;
+			Thread.sleep(1000);
+		}
+		// the Flink cluster is deployed in YARN. Represent cluster
+		return new FlinkYarnCluster(yarnClient, appId, conf, sessionFilesDir);
+	}
+
+	/**
+	 * Kills YARN application and stops YARN client.
+	 *
+	 * Use this method to kill the App before it has been properly deployed
+	 */
+	private void failSessionDuringDeployment() {
+		LOG.info("Killing YARN application");
+
+		try {
+			yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
+		} catch (Exception e) {
+			// we only log a debug message here because the "killApplication" call is a best-effort
+			// call (we don't know if the application has been deployed when the error occured).
+			LOG.debug("Error while killing YARN application", e);
+		}
+		yarnClient.stop();
+	}
+
+
+	private static class ClusterResourceDescription {
+		final public int totalFreeMemory;
+		final public int containerLimit;
+		final public int[] nodeManagersFree;
+
+		public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
+			this.totalFreeMemory = totalFreeMemory;
+			this.containerLimit = containerLimit;
+			this.nodeManagersFree = nodeManagersFree;
+		}
+	}
+
+	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
+		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+
+		int totalFreeMemory = 0;
+		int containerLimit = 0;
+		int[] nodeManagersFree = new int[nodes.size()];
+
+		for(int i = 0; i < nodes.size(); i++) {
+			NodeReport rep = nodes.get(i);
+			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+			nodeManagersFree[i] = free;
+			totalFreeMemory += free;
+			if(free > containerLimit) {
+				containerLimit = free;
+			}
+		}
+		return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
+	}
+
+
+
+	public String getClusterDescription() throws Exception {
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintStream ps = new PrintStream(baos);
+
+		YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+
+		ps.append("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
+		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+		final String format = "|%-16s |%-16s %n";
+		ps.printf("|Property         |Value          %n");
+		ps.println("+---------------------------------------+");
+		int totalMemory = 0;
+		int totalCores = 0;
+		for(NodeReport rep : nodes) {
+			final Resource res = rep.getCapability();
+			totalMemory += res.getMemory();
+			totalCores += res.getVirtualCores();
+			ps.format(format, "NodeID", rep.getNodeId());
+			ps.format(format, "Memory", res.getMemory()+" MB");
+			ps.format(format, "vCores", res.getVirtualCores());
+			ps.format(format, "HealthReport", rep.getHealthReport());
+			ps.format(format, "Containers", rep.getNumContainers());
+			ps.println("+---------------------------------------+");
+		}
+		ps.println("Summary: totalMemory "+totalMemory+" totalCores "+totalCores);
+		List<QueueInfo> qInfo = yarnClient.getAllQueues();
+		for(QueueInfo q : qInfo) {
+			ps.println("Queue: "+q.getQueueName()+", Current Capacity: "+q.getCurrentCapacity()+" Max Capacity: "+q.getMaximumCapacity()+" Applications: "+q.getApplications().size());
+		}
+		yarnClient.stop();
+		return baos.toString();
+	}
+
+	public static class YarnDeploymentException extends RuntimeException {
+		public YarnDeploymentException() {
+		}
+
+		public YarnDeploymentException(String message) {
+			super(message);
+		}
+
+		public YarnDeploymentException(String message, Throwable cause) {
+			super(message, cause);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
new file mode 100644
index 0000000..98abd5e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import static akka.pattern.Patterns.ask;
+
+import akka.actor.Props;
+import akka.util.Timeout;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Awaitable;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCluster.class);
+
+	private static final int POLLING_THREAD_INTERVAL_MS = 1000;
+
+	private YarnClient yarnClient;
+	private Thread actorRunner;
+	private Thread clientShutdownHook = new ClientShutdownHook();
+	private PollingThread pollingRunner;
+	private Configuration hadoopConfig;
+	// (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown.
+	private Path sessionFilesDir;
+	private InetSocketAddress jobManagerAddress;
+
+	//---------- Class internal fields -------------------
+
+	private ActorSystem actorSystem;
+	private ActorRef applicationClient;
+	private ApplicationReport intialAppReport;
+	private static FiniteDuration akkaDuration = Duration.apply(5, TimeUnit.SECONDS);
+	private static Timeout akkaTimeout = Timeout.durationToTimeout(akkaDuration);
+
+	public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId,
+							Configuration hadoopConfig, Path sessionFilesDir) throws IOException, YarnException {
+		this.yarnClient = yarnClient;
+		this.hadoopConfig = hadoopConfig;
+		this.sessionFilesDir = sessionFilesDir;
+
+		// get one application report manually
+		intialAppReport = yarnClient.getApplicationReport(appId);
+		String jobManagerHost = intialAppReport.getHost();
+		int jobManagerPort = intialAppReport.getRpcPort();
+		this.jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort);
+
+		// start actor system
+		LOG.info("Start actor system.");
+		InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM
+		actorSystem = YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, GlobalConfiguration.getConfiguration()); // set port automatically.
+
+		// start application client
+		LOG.info("Start application client.");
+
+		applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class));
+
+		// instruct ApplicationClient to start a periodical status polling
+		applicationClient.tell(new Messages.LocalRegisterClient(jobManagerHost + ":" + jobManagerPort), applicationClient);
+
+
+		// add hook to ensure proper shutdown
+		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
+
+		actorRunner = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				// blocks until ApplicationMaster has been stopped
+				actorSystem.awaitTermination();
+
+				// get final application report
+				try {
+					ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+
+					LOG.info("Application " + appId + " finished with state " + appReport
+							.getYarnApplicationState() + " and final state " + appReport
+							.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
+					if(appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+							== YarnApplicationState.KILLED	) {
+						LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
+						LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
+								+ "the full application log using this command:\n"
+								+ "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n"
+								+ "(It sometimes takes a few seconds until the logs are aggregated)");
+					}
+				} catch(Exception e) {
+					LOG.warn("Error while getting final application report", e);
+				}
+			}
+		});
+		actorRunner.setDaemon(true);
+		actorRunner.start();
+
+		pollingRunner = new PollingThread(yarnClient, appId);
+		pollingRunner.setDaemon(true);
+		pollingRunner.start();
+	}
+
+	// -------------------------- Interaction with the cluster ------------------------
+
+	@Override
+	public InetSocketAddress getJobManagerAddress() {
+		return jobManagerAddress;
+	}
+
+	@Override
+	public String getWebInterfaceURL() {
+		return this.intialAppReport.getTrackingUrl();
+	}
+
+
+	@Override
+	public FlinkYarnClusterStatus getClusterStatus() {
+		if(hasBeenStopped()) {
+			throw new RuntimeException("The FlinkYarnCluster has alread been stopped");
+		}
+		Future<Object> clusterStatusOption = ask(applicationClient, Messages.LocalGetYarnClusterStatus$.MODULE$, akkaTimeout);
+		Object clusterStatus = awaitUtil(clusterStatusOption, "Unable to get Cluster status from Application Client");
+		if(clusterStatus instanceof None$) {
+			return null;
+		} else if(clusterStatus instanceof Some) {
+			return (FlinkYarnClusterStatus) (((Some) clusterStatus).get());
+		} else {
+			throw new RuntimeException("Unexpected type: "+clusterStatus.getClass().getCanonicalName());
+		}
+	}
+
+	@Override
+	public boolean hasFailed() {
+		if(pollingRunner == null) {
+			LOG.warn("FlinkYarnCluster.hasFailed() has been called on an uninitialized cluster." +
+					"The system might be in an erroneous state");
+		}
+		ApplicationReport lastReport = pollingRunner.getLastReport();
+		if(lastReport == null) {
+			LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster. that didn't receive a status so far." +
+					"The system might be in an erroneous state");
+			return false;
+		} else {
+			return (lastReport.getYarnApplicationState() == YarnApplicationState.FAILED ||
+					lastReport.getYarnApplicationState() == YarnApplicationState.KILLED);
+		}
+	}
+
+	@Override
+	public String getDiagnostics() {
+		if (!hasFailed()) {
+			LOG.warn("getDiagnostics() called for cluster which is not in failed state");
+		}
+		ApplicationReport lastReport = pollingRunner.getLastReport();
+		if (lastReport == null) {
+			LOG.warn("Last report is null");
+			return null;
+		} else {
+			return lastReport.getDiagnostics();
+		}
+	}
+
+	@Override
+	public List<String> getNewMessages() {
+		if(hasBeenStopped()) {
+			throw new RuntimeException("The FlinkYarnCluster has alread been stopped");
+		}
+		List<String> ret = new ArrayList<String>();
+		// get messages from ApplicationClient (locally)
+		while(true) {
+			Future<Object> messageOptionFuture = ask(applicationClient, Messages.LocalGetYarnMessage$.MODULE$, akkaTimeout);
+			Object messageOption = awaitUtil(messageOptionFuture, "Error getting new messages from Appliation Client");
+			if(messageOption instanceof None$) {
+				break;
+			} else if(messageOption instanceof org.apache.flink.yarn.Messages.YarnMessage) {
+				Messages.YarnMessage msg = (Messages.YarnMessage) messageOption;
+				ret.add("["+msg.date()+"] "+msg.message());
+			} else {
+				LOG.warn("LocalGetYarnMessage returned unexpected type: "+messageOption);
+			}
+		}
+		return ret;
+	}
+
+	private static <T> T awaitUtil(Awaitable<T> awaitable, String message) {
+		try {
+			return Await.result(awaitable, akkaDuration);
+		} catch (Exception e) {
+			throw new RuntimeException(message, e);
+		}
+	}
+
+	// -------------------------- Shutdown handling ------------------------
+
+	private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
+	@Override
+	public void shutdown() {
+		shutdownInternal(true);
+	}
+
+	private void shutdownInternal(boolean removeShutdownHook) {
+		if(hasBeenShutDown.getAndSet(true)) {
+			return;
+		}
+		// the session is being stopped explicitly.
+		if(removeShutdownHook) {
+			Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+		}
+		if(actorSystem != null){
+			LOG.info("Sending shutdown request to the Application Master");
+			if(applicationClient != ActorRef.noSender()) {
+				Future<Object> future = ask(applicationClient, new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED), akkaTimeout);
+				awaitUtil(future, "Error while stopping YARN Application Client");
+			}
+
+			actorSystem.shutdown();
+			actorSystem.awaitTermination();
+
+			actorSystem = null;
+		}
+
+		LOG.info("Deleting files in "+sessionFilesDir );
+		try {
+			FileSystem shutFS = FileSystem.get(hadoopConfig);
+			shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
+			shutFS.close();
+		}catch(IOException e){
+			LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
+		}
+
+		try {
+			actorRunner.join(1000); // wait for 1 second
+		} catch (InterruptedException e) {
+			LOG.warn("Shutdown of the actor runner was interrupted", e);
+			Thread.currentThread().interrupt();
+		}
+		try {
+			pollingRunner.stopRunner();
+			pollingRunner.join(1000);
+		} catch(InterruptedException e) {
+			LOG.warn("Shutdown of the polling runner was interrupted", e);
+			Thread.currentThread().interrupt();
+		}
+
+		LOG.info("YARN Client is shutting down");
+		yarnClient.stop(); // actorRunner is using the yarnClient.
+		yarnClient = null; // set null to clearly see if somebody wants to access it afterwards.
+	}
+
+	@Override
+	public boolean hasBeenStopped() {
+		return hasBeenShutDown.get();
+	}
+
+
+	public class ClientShutdownHook extends Thread {
+		@Override
+		public void run() {
+			shutdownInternal(false);
+		}
+	}
+
+	// -------------------------- Polling ------------------------
+
+	public static class PollingThread extends Thread {
+
+		AtomicBoolean running = new AtomicBoolean(true);
+		private YarnClient yarnClient;
+		private ApplicationId appId;
+
+		// ------- status information stored in the polling thread
+		private Object lock = new Object();
+		private ApplicationReport lastReport;
+
+
+		public PollingThread(YarnClient yarnClient, ApplicationId appId) {
+			this.yarnClient = yarnClient;
+			this.appId = appId;
+		}
+
+		public void stopRunner() {
+			if(!running.get()) {
+				LOG.warn("Polling thread was already stopped");
+			}
+			running.set(false);
+		}
+
+		public ApplicationReport getLastReport() {
+			synchronized (lock) {
+				return lastReport;
+			}
+		}
+
+		@Override
+		public void run() {
+			while (running.get() && yarnClient.isInState(Service.STATE.STARTED)) {
+				try {
+					ApplicationReport report = yarnClient.getApplicationReport(appId);
+					synchronized (lock) {
+						lastReport = report;
+					}
+				} catch (Exception e) {
+					LOG.warn("Error while getting application report", e);
+					// TODO: do more here.
+				}
+				try {
+					Thread.sleep(FlinkYarnCluster.POLLING_THREAD_INTERVAL_MS);
+				} catch (InterruptedException e) {
+					LOG.error("Polling thread got interrupted", e);
+					Thread.currentThread().interrupt(); // pass interrupt.
+				}
+			}
+			if(running.get() && !yarnClient.isInState(Service.STATE.STARTED)) {
+				// == if the polling thread is still running but the yarn client is stopped.
+				LOG.warn("YARN client is unexpected in state "+yarnClient.getServiceState());
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
new file mode 100644
index 0000000..8bb2668
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class Utils {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+	private static final int DEFAULT_HEAP_LIMIT_CAP = 500;
+	private static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.8f;
+
+	/**
+	 * Calculate the heap size for the JVMs to start in the containers.
+	 * Since JVMs are allocating more than just the heap space, and YARN is very
+	 * fast at killing processes that use memory beyond their limit, we have to come
+	 * up with a good heapsize.
+	 * This code takes 85% of the given amount of memory (in MB). If the amount we removed by these 85%
+	 * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB.
+	 * 
+	 */
+	public static int calculateHeapSize(int memory) {
+		float memoryCutoffRatio = GlobalConfiguration.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+		int heapLimitCap = GlobalConfiguration.getInteger(ConfigConstants.YARN_HEAP_LIMIT_CAP, DEFAULT_HEAP_LIMIT_CAP);
+
+		int heapLimit = (int)((float)memory * memoryCutoffRatio);
+		if( (memory - heapLimit) > heapLimitCap) {
+			heapLimit = memory-heapLimitCap;
+		}
+		return heapLimit;
+	}
+	
+	private static void addPathToConfig(Configuration conf, File path) {
+		// chain-in a new classloader
+		URL fileUrl = null;
+		try {
+			fileUrl = path.toURI().toURL();
+		} catch (MalformedURLException e) {
+			throw new RuntimeException("Erroneous config file path", e);
+		}
+		URL[] urls = {fileUrl};
+		ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader());
+		conf.setClassLoader(cl);
+	}
+	
+	private static void setDefaultConfValues(Configuration conf) {
+		if(conf.get("fs.hdfs.impl",null) == null) {
+			conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+		}
+		if(conf.get("fs.file.impl",null) == null) {
+			conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+		}
+	}
+	
+	public static Configuration initializeYarnConfiguration() {
+		Configuration conf = new YarnConfiguration();
+		String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+		if(configuredHadoopConfig != null) {
+			LOG.info("Using hadoop configuration path from " + ConfigConstants.PATH_HADOOP_CONFIG + " setting.");
+			addPathToConfig(conf, new File(configuredHadoopConfig));
+			setDefaultConfValues(conf);
+			return conf;
+		}
+		String[] envs = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", "HADOOP_CONF_PATH" };
+		for(int i = 0; i < envs.length; ++i) {
+			String confPath = System.getenv(envs[i]);
+			if (confPath != null) {
+				LOG.info("Found "+envs[i]+", adding it to configuration");
+				addPathToConfig(conf, new File(confPath));
+				setDefaultConfValues(conf);
+				return conf;
+			}
+		}
+		LOG.info("Could not find HADOOP_CONF_PATH, using HADOOP_HOME.");
+		String hadoopHome = null;
+		try {
+			hadoopHome = Shell.getHadoopHome();
+		} catch (IOException e) {
+			throw new RuntimeException("Unable to get hadoop home. Please set HADOOP_HOME variable!", e);
+		}
+		File tryConf = new File(hadoopHome+"/etc/hadoop");
+		if(tryConf.exists()) {
+			LOG.info("Found configuration using hadoop home.");
+			addPathToConfig(conf, tryConf);
+		} else {
+			tryConf = new File(hadoopHome+"/conf");
+			if(tryConf.exists()) {
+				addPathToConfig(conf, tryConf);
+			}
+		}
+		setDefaultConfValues(conf);
+		return conf;
+	}
+	
+	public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
+		addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
+		for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+			addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
+		}
+	}
+	
+	
+	/**
+	 * 
+	 * @return Path to remote file (usually hdfs)
+	 * @throws IOException
+	 */
+	public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir)
+			throws IOException {
+		// copy to HDFS
+		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
+		
+		Path dst = new Path(homedir, suffix);
+		
+		LOG.info("Copying from "+localRsrcPath+" to "+dst );
+		fs.copyFromLocalFile(localRsrcPath, dst);
+		registerLocalResource(fs, dst, appMasterJar);
+		return dst;
+	}
+	
+	public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
+		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
+		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
+		localResource.setSize(jarStat.getLen());
+		localResource.setTimestamp(jarStat.getModificationTime());
+		localResource.setType(LocalResourceType.FILE);
+		localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+	}
+
+	public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
+		Credentials credentials = new Credentials();
+		// for HDFS
+		TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
+		// for user
+		UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
+		
+		Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
+		for(Token<? extends TokenIdentifier> token : usrTok) {
+			final Text id = new Text(token.getIdentifier());
+			LOG.info("Adding user token "+id+" with "+token);
+			credentials.addToken(id, token);
+		}
+		DataOutputBuffer dob = new DataOutputBuffer();
+		credentials.writeTokenStorageToStream(dob);
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
+		}
+
+		ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+		amContainer.setTokens(securityTokens);
+	}
+	
+	public static void logFilesInCurrentDirectory(final Logger logger) {
+		new File(".").list(new FilenameFilter() {
+			
+			@Override
+			public boolean accept(File dir, String name) {
+				logger.info(dir.getAbsolutePath()+"/"+name);
+				return true;
+			}
+		});
+	}
+	
+	/**
+	 * Copied method from org.apache.hadoop.yarn.util.Apps
+	 * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
+	 * by https://issues.apache.org/jira/browse/YARN-1931
+	 */
+	public static void addToEnvironment(Map<String, String> environment,
+			String variable, String value) {
+		String val = environment.get(variable);
+		if (val == null) {
+			val = value;
+		} else {
+			val = val + File.pathSeparator + value;
+		}
+		environment.put(StringInterner.weakIntern(variable),
+				StringInterner.weakIntern(val));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
new file mode 100644
index 0000000..3f1cc23
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.appMaster;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.yarn.YarnUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.yarn.FlinkYarnClient;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import scala.Tuple2;
+
+
+public class YarnTaskManagerRunner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class);
+
+	public static void main(final String[] args) throws IOException {
+		Map<String, String> envs = System.getenv();
+		final String yarnClientUsername = envs.get(FlinkYarnClient.ENV_CLIENT_USERNAME);
+		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+
+		// configure local directory
+		final String[] newArgs = Arrays.copyOf(args, args.length + 2);
+		newArgs[newArgs.length-2] = "--tempDir";
+		newArgs[newArgs.length-1] = localDirs;
+		LOG.info("Setting log path "+localDirs);
+		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+				+ " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					Tuple2<ActorSystem, ActorRef> tuple = YarnUtils
+							.startActorSystemAndTaskManager(newArgs);
+
+					tuple._1().awaitTermination();
+				} catch (Exception e) {
+					LOG.error("Error while running the TaskManager", e);
+				}
+				return null;
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
new file mode 100644
index 0000000..22f4c02
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
+import org.apache.flink.yarn.Messages._
+import scala.collection.mutable
+import scala.concurrent.duration._
+
+class ApplicationClient
+
+  extends Actor with ActorLogMessages with ActorLogging {
+  import context._
+
+  val INITIAL_POLLING_DELAY = 0 seconds
+  val WAIT_FOR_YARN_INTERVAL = 2 seconds
+  val POLLING_INTERVAL = 3 seconds
+
+  var yarnJobManager: Option[ActorRef] = None
+  var pollingTimer: Option[Cancellable] = None
+  implicit var timeout: FiniteDuration = 0 seconds
+  var running = false
+  var messagesQueue : mutable.Queue[YarnMessage] = mutable.Queue[YarnMessage]()
+  var latestClusterStatus : Option[FlinkYarnClusterStatus] = None
+  var stopMessageReceiver : Option[ActorRef] = None
+
+  override def preStart(): Unit = {
+    super.preStart()
+
+    timeout = new FiniteDuration(GlobalConfiguration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+  }
+
+  override def postStop(): Unit = {
+    log.info("Stopped Application client.")
+    pollingTimer foreach {
+      _.cancel()
+    }
+
+    pollingTimer = None
+  }
+
+  override def receiveWithLogMessages: Receive = {
+    // ----------------------------- Registration -> Status updates -> shutdown ----------------
+    case LocalRegisterClient(address: String) => {
+      val jmAkkaUrl = JobManager.getRemoteAkkaURL(address)
+
+      yarnJobManager = Some(AkkaUtils.getReference(jmAkkaUrl)(system, timeout))
+      yarnJobManager match {
+        case Some(jm) => {
+          // the message came from the FlinkYarnCluster. We send the message to the JobManager.
+          // it is important not to forward the message because the JobManager is storing the
+          // sender as the Application Client (this class).
+          jm ! RegisterClient
+
+          // schedule a periodic status report from the JobManager
+          // request the number of task managers and slots from the job manager
+          pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY,
+            WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, PollYarnClusterStatus))
+        }
+        case None => throw new RuntimeException("Registration at JobManager/ApplicationMaster " +
+          "failed. Job Manager RPC connection has not properly been initialized");
+      }
+    }
+    case msg: StopYarnSession => {
+      log.info("Stop yarn session.")
+      stopMessageReceiver = Some(sender())
+      yarnJobManager foreach {
+        _ forward msg
+      }
+    }
+    case JobManagerStopped => {
+      log.info("Remote JobManager has been stopped successfully. " +
+        "Stopping local application client")
+      stopMessageReceiver foreach {
+        _ ! JobManagerStopped
+      }
+      // stop ourselves
+      context.system.shutdown()
+    }
+
+    // handle the responses from the PollYarnClusterStatus messages to the yarn job mgr
+    case status: FlinkYarnClusterStatus => {
+      latestClusterStatus = Some(status)
+    }
+
+    // locally get cluster status
+    case LocalGetYarnClusterStatus => {
+      sender() ! latestClusterStatus
+    }
+
+    // -----------------  handle messages from the cluster -------------------
+    // receive remote messages
+    case msg: YarnMessage => {
+      messagesQueue.enqueue(msg)
+    }
+    // locally forward messages
+    case LocalGetYarnMessage => {
+      sender() ! (if( messagesQueue.size == 0) None else messagesQueue.dequeue)
+    }
+    case _ =>
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000..fd67b01
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import java.io.{PrintWriter, FileWriter, BufferedWriter}
+import java.security.PrivilegedAction
+
+import akka.actor._
+import org.apache.flink.client.CliFrontend
+import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants}
+import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager}
+import org.apache.flink.yarn.Messages.StartYarnSession
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.slf4j.LoggerFactory
+
+import scala.io.Source
+
+object ApplicationMaster {
+  import scala.collection.JavaConversions._
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  val CONF_FILE = "flink-conf.yaml"
+  val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
+
+  def main(args: Array[String]): Unit ={
+    val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
+    LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName}" +
+      s" setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}")
+
+    val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
+
+    for(token <- UserGroupInformation.getCurrentUser.getTokens){
+      ugi.addToken(token)
+    }
+
+    ugi.doAs(new PrivilegedAction[Object] {
+      override def run(): Object = {
+        var actorSystem: ActorSystem = null
+        var jobManager: ActorRef = ActorRef.noSender
+
+        try {
+          val conf = Utils.initializeYarnConfiguration()
+
+          val env = System.getenv()
+
+          if(LOG.isDebugEnabled) {
+            LOG.debug("All environment variables: " + env.toString)
+          }
+
+          val currDir = env.get(Environment.PWD.key())
+          require(currDir != null, "Current directory unknown.")
+
+          val logDirs = env.get(Environment.LOG_DIRS.key())
+
+          // Note that we use the "ownHostname" given by YARN here, to make sure
+          // we use the hostnames given by YARN consitently throuout akka.
+          // for akka "localhost" and "localhost.localdomain" are different actors.
+          val ownHostname = env.get(Environment.NM_HOST.key())
+          require(ownHostname != null, s"Own hostname not set.")
+
+          val taskManagerCount = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
+          val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
+          val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
+
+          val jobManagerWebPort = 0 // automatic assignment.
+
+          val (system, actor) = startJobManager(currDir, ownHostname,dynamicPropertiesEncodedString)
+
+          actorSystem = system
+          jobManager = actor
+          val extActor = system.asInstanceOf[ExtendedActorSystem]
+          val jobManagerPort = extActor.provider.getDefaultAddress.port.get
+
+          // generate configuration file for TaskManagers
+          generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, ownHostname,
+            jobManagerPort, jobManagerWebPort, logDirs, slots, taskManagerCount,
+            dynamicPropertiesEncodedString)
+
+
+          // send "start yarn session" message to YarnJobManager.
+          LOG.info("Start yarn session on job manager.")
+          jobManager ! StartYarnSession(conf, jobManagerPort)
+
+          LOG.info("Application Master properly initiated. Await termination of actor system.")
+          actorSystem.awaitTermination()
+        }catch{
+          case t: Throwable =>
+            LOG.error("Error while running the application master.", t)
+
+            if(actorSystem != null){
+              actorSystem.shutdown()
+              actorSystem.awaitTermination()
+
+              actorSystem = null
+            }
+        }
+
+        null
+      }
+    })
+
+  }
+
+  def generateConfigurationFile(fileName: String, currDir: String, ownHostname: String,
+                               jobManagerPort: Int,
+                               jobManagerWebPort: Int, logDirs: String, slots: Int,
+                               taskManagerCount: Int, dynamicPropertiesEncodedString: String)
+  : Unit = {
+    LOG.info("Generate configuration file for application master.")
+    val output = new PrintWriter(new BufferedWriter(
+      new FileWriter(fileName))
+    )
+
+    for (line <- Source.fromFile(s"$currDir/$CONF_FILE").getLines() if !(line.contains
+      (ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY))) {
+      output.println(line)
+    }
+
+    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname")
+    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort")
+
+    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs")
+    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort")
+
+
+    if(slots != -1){
+      output.println(s"${ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS}: $slots")
+      output.println(
+        s"${ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY}: ${slots*taskManagerCount}")
+    }
+
+    // add dynamic properties
+    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+
+    import scala.collection.JavaConverters._
+
+    for(property <- dynamicProperties.asScala){
+      output.println(s"${property.f0}: ${property.f1}")
+    }
+
+    output.close()
+  }
+
+  def startJobManager(currDir: String, hostname: String, dynamicPropertiesEncodedString: String):
+    (ActorSystem, ActorRef) = {
+    LOG.info("Start job manager for yarn")
+    val args = Array[String]("--configDir", currDir)
+
+    LOG.info(s"Config path: ${currDir}.")
+    val (_, _, configuration, _) = JobManager.parseArgs(args)
+
+    // add dynamic properties to JobManager configuration.
+    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+    import scala.collection.JavaConverters._
+    for(property <- dynamicProperties.asScala){
+      configuration.setString(property.f0, property.f1)
+    }
+    GlobalConfiguration.getConfiguration.addAll(configuration) // make part of globalConf.
+
+    // set port to 0 to let Akka automatically determine the port.
+    implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port = 0, configuration)
+
+    LOG.info("Start job manager actor.")
+    (jobManagerSystem, JobManager.startActor(Props(new JobManager(configuration) with
+      WithWebServer with YarnJobManager)))
+  }
+}


Mime
View raw message