flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [6/6] flink git commit: [FLINK-1295][FLINK-883] Allow to deploy 'job only' YARN cluster. Add tests to YARN
Date Fri, 23 Jan 2015 17:43:59 GMT
[FLINK-1295][FLINK-883] Allow to deploy 'job only' YARN cluster. Add tests to YARN

- users can now also deploy Flink on YARN for executing a single job.
- The flink-yarn project has been moved out of the flink-addons module
- the MiniYARNCluster is used for testing Flink on YARN
- There is now a (undocumented) Java interface Flink's YARN client, allowing users to manually control the Yarn session.
- ALL ports used by Flink when running on YARN are automatically determined. In the past users reported problems with blocked ports (YARN is telling the client the RPC address of the application master)
- The checks before deployment have been improved to give better error messages if the user is requesting too many resources for a YARN session


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2af65867
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2af65867
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2af65867

Branch: refs/heads/master
Commit: 2af658673f877a7e0fd73fdd2907f88824d793a5
Parents: 5fc77d6
Author: Robert Metzger <rmetzger@apache.org>
Authored: Mon Dec 1 18:59:49 2014 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Jan 23 18:39:10 2015 +0100

----------------------------------------------------------------------
 docs/cli.md                                     |   8 +-
 docs/config.md                                  |   3 +-
 docs/yarn_setup.md                              |  32 +-
 flink-addons/flink-yarn/pom.xml                 | 228 ------
 .../main/java/org/apache/flink/yarn/Client.java | 713 -------------------
 .../main/java/org/apache/flink/yarn/Utils.java  | 288 --------
 .../yarn/appMaster/YarnTaskManagerRunner.java   |  74 --
 .../apache/flink/yarn/ApplicationClient.scala   | 158 ----
 .../apache/flink/yarn/ApplicationMaster.scala   | 170 -----
 .../scala/org/apache/flink/yarn/Messages.scala  |  37 -
 .../org/apache/flink/yarn/YarnJobManager.scala  | 307 --------
 .../org/apache/flink/yarn/YarnTaskManager.scala |  37 -
 .../scala/org/apache/flink/yarn/YarnUtils.scala |  80 ---
 flink-addons/pom.xml                            |  13 -
 flink-clients/pom.xml                           |  10 +-
 .../org/apache/flink/client/CliFrontend.java    | 153 +++-
 .../flink/client/FlinkYarnSessionCli.java       | 429 +++++++++++
 .../org/apache/flink/client/program/Client.java |   1 -
 .../flink/client/CliFrontendInfoTest.java       |   2 +-
 .../CliFrontendJobManagerConnectionTest.java    |  13 +-
 .../flink/client/CliFrontendListCancelTest.java |   8 +-
 .../flink/client/CliFrontendTestUtils.java      |  10 +-
 .../flink/configuration/ConfigConstants.java    |  14 +
 .../org/apache/flink/core/fs/FileSystem.java    |   1 -
 flink-dist/pom.xml                              |   4 +-
 flink-dist/src/main/flink-bin/bin/flink         |   3 +-
 .../src/main/flink-bin/yarn-bin/yarn-session.sh |   2 +-
 .../jobmanager/web/SetupInfoServlet.java        |  16 +-
 .../runtime/jobmanager/web/WebInfoServer.java   |   9 +-
 .../org/apache/flink/runtime/net/NetUtils.java  |   4 +-
 .../runtime/yarn/AbstractFlinkYarnClient.java   |  50 ++
 .../runtime/yarn/AbstractFlinkYarnCluster.java  |  44 ++
 .../runtime/yarn/FlinkYarnClusterStatus.java    |  86 +++
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   3 +
 .../apache/flink/runtime/client/JobClient.scala |   3 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   7 +
 flink-yarn-tests/pom.xml                        | 121 ++++
 .../java/org/apache/flink/yarn/UtilsTest.java   |  42 ++
 .../yarn/YARNSessionCapacitySchedulerIT.java    |  72 ++
 .../apache/flink/yarn/YARNSessionFIFOIT.java    | 225 ++++++
 .../org/apache/flink/yarn/YarnTestBase.java     | 379 ++++++++++
 .../src/test/resources/log4j-test.properties    |  25 +
 flink-yarn/pom.xml                              | 228 ++++++
 .../org/apache/flink/yarn/FlinkYarnClient.java  | 653 +++++++++++++++++
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 363 ++++++++++
 .../main/java/org/apache/flink/yarn/Utils.java  | 230 ++++++
 .../yarn/appMaster/YarnTaskManagerRunner.java   |  74 ++
 .../apache/flink/yarn/ApplicationClient.scala   | 127 ++++
 .../apache/flink/yarn/ApplicationMaster.scala   | 185 +++++
 .../scala/org/apache/flink/yarn/Messages.scala  |  44 ++
 .../org/apache/flink/yarn/YarnJobManager.scala  | 312 ++++++++
 .../org/apache/flink/yarn/YarnTaskManager.scala |  38 +
 .../scala/org/apache/flink/yarn/YarnUtils.scala |  84 +++
 .../java/org/apache/flink/yarn/UtilsTests.java  |  33 +
 pom.xml                                         |  33 +-
 55 files changed, 4098 insertions(+), 2190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/docs/cli.md
----------------------------------------------------------------------
diff --git a/docs/cli.md b/docs/cli.md
index ceab6e4..8d6048a 100644
--- a/docs/cli.md
+++ b/docs/cli.md
@@ -33,7 +33,8 @@ started from the same installation directory.
 
 A prerequisite to using the command line interface is that the Flink
 master (JobManager) has been started (via `<flink-home>/bin/start-
-local.sh` or `<flink-home>/bin/start-cluster.sh`).
+local.sh` or `<flink-home>/bin/start-cluster.sh`) or that a YARN
+environment is available.
 
 The command line can be used to
 
@@ -64,6 +65,11 @@ The command line can be used to
                                ./examples/flink-java-examples-{{ site.FLINK_VERSION_SHORT }}-WordCount.jar \
                                -file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
+-   Run example program using a [per-job YARN cluster](yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
+
+        ./bin/flink run -m yarn-cluster -yn 2 \
+                               ./examples/flink-java-examples-{{ site.FLINK_VERSION_STABLE }}-WordCount.jar \
+                               -file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
 -   Display the expected arguments for the WordCount example program:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/docs/config.md
----------------------------------------------------------------------
diff --git a/docs/config.md b/docs/config.md
index dab6c2b..edaefdf 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -304,8 +304,7 @@ to avoid duplicate port allocations when running multiple YARN sessions in paral
 
 So if `yarn.am.rpc.port` is configured to `10245` and the session's application ID is `application_1406629969999_0002`, then the actual port being used is 10245 + 2 = 10247
 
-- `yarn.am.rpc.port`: The port that is being opened by the Application Master (AM) to 
-let the YARN client connect for an RPC serice. (DEFAULT: Port 10245)
+- `yarn.heap-cutoff-ratio`: Percentage of heap space to remove from containers started by YARN.
 
 
 ## Background

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/docs/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/yarn_setup.md b/docs/yarn_setup.md
index 3374500..af036af 100644
--- a/docs/yarn_setup.md
+++ b/docs/yarn_setup.md
@@ -36,14 +36,14 @@ cd flink-yarn-{{ site.FLINK_VERSION_SHORT }}/
 
 Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
 
-## Introducing YARN
+## Apache Flink on Hadoop YARN using a YARN Session
 
 Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management framework. It allows to run various distributed applications on top of a cluster. Flink runs on YARN next to other applications. Users do not have to setup or install anything if there is already a YARN setup.
 
 **Requirements**
 
 - Apache Hadoop 2.2
-- HDFS (Hadoop Distributed File System)
+- HDFS (Hadoop Distributed File System) (or another distributed file system supported by Hadoop)
 
 If you have troubles using the Flink YARN client, have a look in the [FAQ section](faq.html).
 
@@ -80,16 +80,14 @@ This command will show you the following overview:
 ~~~bash
 Usage:
    Required
-     -n,--container <arg>   Number of Yarn container to allocate (=Number of Task Managers)
+     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
    Optional
-     -D <arg>                       Dynamic Properties
+     -D <arg>                        Dynamic properties
      -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
      -q,--query                      Display available YARN resources (memory, cores)
      -qu,--queue <arg>               Specify YARN queue.
      -s,--slots <arg>                Number of slots per TaskManager
      -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
-     -tmc,--taskManagerCores <arg>   Virtual CPU cores per TaskManager
-     -v,--verbose                    Verbose debug mode
 ~~~
 
 Please note that the Client requires the `HADOOP_HOME` (or `YARN_CONF_DIR` or `HADOOP_CONF_DIR`) environment variable to be set to read the YARN and HDFS configuration.
@@ -118,7 +116,7 @@ The client has to remain open to keep the deployment running. We suggest to use
 4. Use `screen -r` to resume again.
 
 
-## Submit Job to Flink
+### Submit Job to Flink
 
 Use the following command to submit a Flink program to the YARN cluster:
 
@@ -173,6 +171,24 @@ You can check the number of TaskManagers in the JobManager web interface. The ad
 If the TaskManagers do not show up after a minute, you should investigate the issue using the log files.
 
 
+## Run a single Flink job on Hadoop YARN
+
+The documentation above describes how to start a Flink cluster within a Hadoop YARN environment.
+It is also possible to launch Flink within YARN only for executing a single job.
+
+To deploy a job to a per-job YARN cluster, set the master name to `yarn-cluster`.
+Please note that the client then expects the `-n` value to be set (number of TaskManagers).
+
+***Example:***
+
+~~~bash
+./bin/flink run -m yarn-cluster -yn 2 ./examples/flink-java-examples-{{site.FLINK_VERSION_STABLE }}-WordCount.jar 
+~~~
+
+The command line options of the YARN session are also available with the `./bin/flink` tool. They are prefixed with a `y` or `yarn` (for the long argument options).
+
+
+
 ## Debugging a failed YARN session
 
 There are many reasons why a Flink YARN session deployment can fail. A misconfigured Hadoop setup (HDFS permissions, YARN configuration), version incompatibilities (running Flink with vanilla Hadoop dependencies on Cloudera Hadoop) or other errors.
@@ -203,7 +219,7 @@ It allows to access log files for running YARN applications and shows diagnostic
 Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. Please read the [build instructions](building.html) for more details.
 
 
-## Background
+## Background / Internals
 
 This section briefly describes how Flink and YARN interact. 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/pom.xml b/flink-addons/flink-yarn/pom.xml
deleted file mode 100644
index 42167ee..0000000
--- a/flink-addons/flink-yarn/pom.xml
+++ /dev/null
@@ -1,228 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-addons</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	
-	<artifactId>flink-yarn</artifactId>
-	<name>flink-yarn</name>
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime</artifactId>
-			<version>${project.version}</version>
-			<exclusions>
-				<exclusion>
-					<artifactId>hadoop-core</artifactId>
-					<groupId>org.apache.hadoop</groupId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-actor_2.10</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-remote_2.10</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-camel_2.10</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.camel</groupId>
-			<artifactId>camel-stream</artifactId>
-			<version>2.14.0</version>
-		</dependency>
-
-		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-yarn-client</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-mapreduce-client-core</artifactId>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-
-					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-						 scala classes can be resolved later in the (Java) test-compile phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<version>0.5.0</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>check</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<verbose>false</verbose>
-					<failOnViolation>true</failOnViolation>
-					<includeTestSourceDirectory>true</includeTestSourceDirectory>
-					<failOnWarning>false</failOnWarning>
-					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-					<outputEncoding>UTF-8</outputEncoding>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
deleted file mode 100644
index 1de61a8..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.jar.JarFile;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Records;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * All classes in this package contain code taken from
- * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
- * and
- * https://github.com/hortonworks/simple-yarn-app
- * and
- * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
- *
- * The Flink jar is uploaded to HDFS by this client.
- * The application master and all the TaskManager containers get the jar file downloaded
- * by YARN into their local fs.
- *
- */
-public class Client {
-	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-
-	/**
-	 * Command Line argument options
-	 */
-	private static final Option QUERY = new Option("q","query",false, "Display available YARN resources (memory, cores)");
-	// --- or ---
-	private static final Option VERBOSE = new Option("v","verbose",false, "Verbose debug mode");
-	private static final Option GEN_CONF = new Option("g","generateConf",false, "Place default configuration file in current directory");
-	private static final Option QUEUE = new Option("qu","queue",true, "Specify YARN queue.");
-	private static final Option SHIP_PATH = new Option("t","ship",true, "Ship files in the specified directory (t for transfer)");
-	private static final Option FLINK_CONF_DIR = new Option("c","confDir",true, "Path to Flink configuration directory");
-	private static final Option FLINK_JAR = new Option("j","jar",true, "Path to Flink jar file");
-	private static final Option JM_MEMORY = new Option("jm","jobManagerMemory",true, "Memory for JobManager Container [in MB]");
-	private static final Option TM_MEMORY = new Option("tm","taskManagerMemory",true, "Memory per TaskManager Container [in MB]");
-	private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
-	private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
-			+ " Task Managers)");
-	private static final Option SLOTS = new Option("s","slots",true, "Number of slots per TaskManager");
-	/**
-	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
-	 */
-	private static final Option DYNAMIC_PROPERTIES = new Option("D", true, "Dynamic properties");
-
-	/**
-	 * Constants,
-	 * all starting with ENV_ are used as environment variables to pass values from the Client
-	 * to the Application Master.
-	 */
-	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
-	public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
-	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
-	public final static String ENV_APP_ID = "_APP_ID";
-	public final static String ENV_APP_NUMBER = "_APP_NUMBER";
-	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
-	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
-	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
-	public static final String ENV_SLOTS = "_SLOTS";
-	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
-
-	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-	
-	/**
-	 * Minimum memory requirements, checked by the Client.
-	 */
-	private static final int MIN_JM_MEMORY = 128;
-	private static final int MIN_TM_MEMORY = 128;
-
-	private Configuration conf;
-	private YarnClient yarnClient;
-
-	private ActorSystem actorSystem;
-
-	private ActorRef applicationClient = ActorRef.noSender();
-
-	private File yarnPropertiesFile;
-
-	/**
-	 * Files (usually in a distributed file system) used for the YARN session of Flink.
-	 * Contains configuration files and jar files.
-	 */
-	private Path sessionFilesDir;
-
-	/**
-	 * If the user has specified a different number of slots, we store them here
-	 */
-	private int slots = -1;
-	
-	public void run(String[] args) throws Exception {
-
-		if(UserGroupInformation.isSecurityEnabled()) {
-			throw new RuntimeException("Flink YARN client does not have security support right now."
-					+ "File a bug, we will fix it asap");
-		}
-		//Utils.logFilesInCurrentDirectory(LOG);
-		//
-		//	Command Line Options
-		//
-		Options options = new Options();
-		options.addOption(VERBOSE);
-		options.addOption(FLINK_CONF_DIR);
-		options.addOption(FLINK_JAR);
-		options.addOption(JM_MEMORY);
-		options.addOption(TM_MEMORY);
-		options.addOption(TM_CORES);
-		options.addOption(CONTAINER);
-		options.addOption(GEN_CONF);
-		options.addOption(QUEUE);
-		options.addOption(QUERY);
-		options.addOption(SHIP_PATH);
-		options.addOption(SLOTS);
-		options.addOption(DYNAMIC_PROPERTIES);
-
-		CommandLineParser parser = new PosixParser();
-		CommandLine cmd = null;
-		try {
-			cmd = parser.parse( options, args);
-		} catch(MissingOptionException moe) {
-			System.out.println(moe.getMessage());
-			printUsage();
-			System.exit(1);
-		}
-
-		// Jar Path
-		Path localJarPath;
-		if(cmd.hasOption(FLINK_JAR.getOpt())) {
-			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
-			if(!userPath.startsWith("file://")) {
-				userPath = "file://" + userPath;
-			}
-			localJarPath = new Path(userPath);
-		} else {
-			localJarPath = new Path("file://"+Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
-		}
-
-		if(cmd.hasOption(GEN_CONF.getOpt())) {
-			LOG.info("Placing default configuration in current directory");
-			File outFile = generateDefaultConf(localJarPath);
-			LOG.info("File written to "+outFile.getAbsolutePath());
-			System.exit(0);
-		}
-
-		// Conf Path
-		Path confPath = null;
-		String confDirPath = "";
-		if(cmd.hasOption(FLINK_CONF_DIR.getOpt())) {
-			confDirPath = cmd.getOptionValue(FLINK_CONF_DIR.getOpt())+"/";
-			File confFile = new File(confDirPath+CONFIG_FILE_NAME);
-			if(!confFile.exists()) {
-				LOG.error("Unable to locate configuration file in "+confFile);
-				System.exit(1);
-			}
-			confPath = new Path(confFile.getAbsolutePath());
-		} else {
-			System.out.println("No configuration file has been specified");
-
-			// no configuration path given.
-			// -> see if there is one in the current directory
-			File currDir = new File(".");
-			File[] candidates = currDir.listFiles(new FilenameFilter() {
-				@Override
-				public boolean accept(final File dir, final String name) {
-					return name != null && name.endsWith(".yaml");
-				}
-			});
-			if(candidates == null || candidates.length == 0) {
-				System.out.println("No configuration file has been found in current directory.\n"
-						+ "Copying default.");
-				File outFile = generateDefaultConf(localJarPath);
-				confPath = new Path(outFile.toURI());
-			} else {
-				if(candidates.length > 1) {
-					System.out.println("Multiple .yaml configuration files were found in the current directory\n"
-							+ "Please specify one explicitly");
-					System.exit(1);
-				} else if(candidates.length == 1) {
-					confPath = new Path(candidates[0].toURI());
-				}
-			}
-		}
-		List<File> shipFiles = new ArrayList<File>();
-		// path to directory to ship
-		if(cmd.hasOption(SHIP_PATH.getOpt())) {
-			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
-			File shipDir = new File(shipPath);
-			if(shipDir.isDirectory()) {
-				shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
-					@Override
-					public boolean accept(File dir, String name) {
-						return !(name.equals(".") || name.equals("..") );
-					}
-				})));
-			} else {
-				LOG.warn("Ship directory is not a directory!");
-			}
-		}
-		boolean hasLogback = false;
-		boolean hasLog4j = false;
-		//check if there is a logback or log4j file
-		if(confDirPath.length() > 0) {
-			File logback = new File(confDirPath+"/logback.xml");
-			if(logback.exists()) {
-				shipFiles.add(logback);
-				hasLogback = true;
-			}
-			File log4j = new File(confDirPath+"/log4j.properties");
-			if(log4j.exists()) {
-				shipFiles.add(log4j);
-				hasLog4j = true;
-			}
-		}
-
-		// queue
-		String queue = "default";
-		if(cmd.hasOption(QUEUE.getOpt())) {
-			queue = cmd.getOptionValue(QUEUE.getOpt());
-		}
-
-		// JobManager Memory
-		int jmMemory = 512;
-		if(cmd.hasOption(JM_MEMORY.getOpt())) {
-			jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
-		}
-		if(jmMemory < MIN_JM_MEMORY) {
-			System.out.println("The JobManager memory is below the minimum required memory amount "
-					+ "of "+MIN_JM_MEMORY+" MB");
-			System.exit(1);
-		}
-		// Task Managers memory
-		int tmMemory = 1024;
-		if(cmd.hasOption(TM_MEMORY.getOpt())) {
-			tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
-		}
-		if(tmMemory < MIN_TM_MEMORY) {
-			System.out.println("The TaskManager memory is below the minimum required memory amount "
-					+ "of "+MIN_TM_MEMORY+" MB");
-			System.exit(1);
-		}
-
-		if(cmd.hasOption(SLOTS.getOpt())) {
-			slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
-		}
-
-		String[] dynamicProperties = null;
-		if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
-			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
-		}
-		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-
-		// Task Managers vcores
-		int tmCores = 1;
-		if(cmd.hasOption(TM_CORES.getOpt())) {
-			tmCores = Integer.valueOf(cmd.getOptionValue(TM_CORES.getOpt()));
-		}
-		Utils.getFlinkConfiguration(confPath.toUri().getPath());
-		int jmPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0);
-		if(jmPort == 0) {
-			LOG.warn("Unable to find job manager port in configuration!");
-			jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
-		}
-		FiniteDuration timeout = new FiniteDuration(GlobalConfiguration.getInteger
-				(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT),
-				TimeUnit.SECONDS);
-
-		conf = Utils.initializeYarnConfiguration();
-
-		// intialize HDFS
-		LOG.info("Copy App Master jar from local filesystem and add to local environment");
-		// Copy the application master jar to the filesystem
-		// Create a local resource to point to the destination jar path
-		final FileSystem fs = FileSystem.get(conf);
-
-		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
-		if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
-				fs.getScheme().startsWith("file")) {
-			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
-					+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
-					+ "The Flink YARN client needs to store its files in a distributed file system");
-		}
-
-		// Create yarnClient
-		yarnClient = YarnClient.createYarnClient();
-		yarnClient.init(conf);
-		yarnClient.start();
-
-		// Query cluster for metrics
-		if(cmd.hasOption(QUERY.getOpt())) {
-			showClusterMetrics(yarnClient);
-		}
-		if(!cmd.hasOption(CONTAINER.getOpt())) {
-			LOG.error("Missing required argument "+CONTAINER.getOpt());
-			printUsage();
-			yarnClient.stop();
-			System.exit(1);
-		}
-
-		// TM Count
-		final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
-
-		System.out.println("Using values:");
-		System.out.println("\tContainer Count = "+taskManagerCount);
-		System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
-		System.out.println("\tConfiguration file = "+confPath.toUri().getPath());
-		System.out.println("\tJobManager memory = "+jmMemory);
-		System.out.println("\tTaskManager memory = "+tmMemory);
-		System.out.println("\tTaskManager cores = "+tmCores);
-
-		// Create application via yarnClient
-		YarnClientApplication app = yarnClient.createApplication();
-		GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
-		Resource maxRes = appResponse.getMaximumResourceCapability();
-		if(tmMemory > maxRes.getMemory() || tmCores > maxRes.getVirtualCores()) {
-			LOG.error("The cluster does not have the requested resources for the TaskManagers available!\n"
-					+ "Maximum Memory: "+maxRes.getMemory() +", Maximum Cores: "+tmCores);
-			yarnClient.stop();
-			System.exit(1);
-		}
-		if(jmMemory > maxRes.getMemory() ) {
-			LOG.error("The cluster does not have the requested resources for the JobManager available!\n"
-					+ "Maximum Memory: "+maxRes.getMemory());
-			yarnClient.stop();
-			System.exit(1);
-		}
-		int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount;
-		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
-		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
-			LOG.error("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
-					+ "There are currently only "+freeClusterMem.totalFreeMemory+"MB available.");
-			yarnClient.stop();
-			System.exit(1);
-		}
-		if( tmMemory > freeClusterMem.containerLimit) {
-			LOG.error("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
-					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
-			yarnClient.stop();
-			System.exit(1);
-		}
-		if( jmMemory > freeClusterMem.containerLimit) {
-			LOG.error("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
-					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
-			yarnClient.stop();
-			System.exit(1);
-		}
-
-		// respect custom JVM options in the YAML file
-		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
-		// Set up the container launch context for the application master
-		ContainerLaunchContext amContainer = Records
-				.newRecord(ContainerLaunchContext.class);
-
-		String amCommand = "$JAVA_HOME/bin/java"
-					+ " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
-		if(hasLogback || hasLog4j) {
-			amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-main.log\"";
-		}
-		if(hasLogback) {
-			amCommand += " -Dlogback.configurationFile=file:logback.xml";
-		}
-		if(hasLog4j) {
-			amCommand += " -Dlog4j.configuration=file:log4j.properties";
-		}
-
-		amCommand 	+= " "+ApplicationMaster.class.getName()+" "
-					+ " 1>"
-					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
-					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
-		amContainer.setCommands(Collections.singletonList(amCommand));
-
-		System.err.println("amCommand="+amCommand);
-
-		// Set-up ApplicationSubmissionContext for the application
-		ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
-		final ApplicationId appId = appContext.getApplicationId();
-		/**
-		 * All network ports are offsetted by the application number
-		 * to avoid version port clashes when running multiple Flink sessions
-		 * in parallel
-		 */
-		int appNumber = appId.getId();
-
-		jmPort = Utils.offsetPort(jmPort, appNumber);
-
-		// Setup jar for ApplicationMaster
-		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-		Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), localJarPath, appMasterJar, fs.getHomeDirectory());
-		Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), confPath, flinkConf, fs.getHomeDirectory());
-		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
-		localResources.put("flink.jar", appMasterJar);
-		localResources.put("flink-conf.yaml", flinkConf);
-
-
-		// setup security tokens (code from apache storm)
-		final Path[] paths = new Path[3 + shipFiles.size()];
-		StringBuffer envShipFileList = new StringBuffer();
-		// upload ship files
-		for (int i = 0; i < shipFiles.size(); i++) {
-			File shipFile = shipFiles.get(i);
-			LocalResource shipResources = Records.newRecord(LocalResource.class);
-			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
-			paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
-					shipLocalPath, shipResources, fs.getHomeDirectory());
-			localResources.put(shipFile.getName(), shipResources);
-
-			envShipFileList.append(paths[3 + i]);
-			if(i+1 < shipFiles.size()) {
-				envShipFileList.append(',');
-			}
-		}
-
-		paths[0] = remotePathJar;
-		paths[1] = remotePathConf;
-		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
-		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-		fs.setPermission(sessionFilesDir, permission); // set permission for path.
-		Utils.setTokensFor(amContainer, paths, this.conf);
-
-
-		amContainer.setLocalResources(localResources);
-		fs.close();
-
-		// Setup CLASSPATH for ApplicationMaster
-		Map<String, String> appMasterEnv = new HashMap<String, String>();
-		Utils.setupEnv(conf, appMasterEnv);
-		// set configuration values
-		appMasterEnv.put(Client.ENV_TM_COUNT, String.valueOf(taskManagerCount));
-		appMasterEnv.put(Client.ENV_TM_CORES, String.valueOf(tmCores));
-		appMasterEnv.put(Client.ENV_TM_MEMORY, String.valueOf(tmMemory));
-		appMasterEnv.put(Client.FLINK_JAR_PATH, remotePathJar.toString() );
-		appMasterEnv.put(Client.ENV_APP_ID, appId.toString());
-		appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
-		appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
-		appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
-		appMasterEnv.put(Client.ENV_SLOTS, String.valueOf(slots));
-		appMasterEnv.put(Client.ENV_APP_NUMBER, String.valueOf(appNumber));
-		if(dynamicPropertiesEncoded != null) {
-			appMasterEnv.put(Client.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
-		}
-
-		amContainer.setEnvironment(appMasterEnv);
-
-		// Set up resource type requirements for ApplicationMaster
-		Resource capability = Records.newRecord(Resource.class);
-		capability.setMemory(jmMemory);
-		capability.setVirtualCores(1);
-
-		appContext.setApplicationName("Flink"); // application name
-		appContext.setAMContainerSpec(amContainer);
-		appContext.setResource(capability);
-		appContext.setQueue(queue);
-
-		// file that we write into the conf/ dir containing the jobManager address and the dop.
-		yarnPropertiesFile = new File(confDirPath + CliFrontend.YARN_PROPERTIES_FILE);
-
-		LOG.info("Submitting application master " + appId);
-		yarnClient.submitApplication(appContext);
-
-		Runtime.getRuntime().addShutdownHook(new ClientShutdownHook());
-
-		// start actor system
-		LOG.info("Start actor system.");
-		actorSystem = YarnUtils.createActorSystem();
-
-		// start application client
-		LOG.info("Start application client.");
-		applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, appId, jmPort,
-				yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded,
-				timeout));
-
-		actorSystem.awaitTermination();
-
-		actorSystem = null;
-
-		ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-
-		LOG.info("Application " + appId + " finished with state " + appReport
-				.getYarnApplicationState() + " and final state " + appReport
-				.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
-
-		if(appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
-				== YarnApplicationState.KILLED	) {
-			LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
-			LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retreive "
-					+ "the full application log using this command:\n"
-					+ "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n"
-					+ "(It sometimes takes a few seconds until the logs are aggregated)");
-		}
-	}
-
-	private void stopSession() {
-		if(actorSystem != null){
-			LOG.info("Sending shutdown request to the Application Master");
-			if(applicationClient != ActorRef.noSender()) {
-				applicationClient.tell(new Messages.StopYarnSession(FinalApplicationStatus.KILLED),
-						ActorRef.noSender());
-				applicationClient = ActorRef.noSender();
-			}
-
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
-
-			actorSystem = null;
-		}
-
-		try {
-			FileSystem shutFS = FileSystem.get(conf);
-			shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
-			shutFS.close();
-		}catch(IOException e){
-			LOG.error("Could not delete the conf and jar files.", e);
-		}
-
-		try {
-			yarnPropertiesFile.delete();
-		} catch (Exception e) {
-			LOG.warn("Exception while deleting the JobManager address file", e);
-		}
-		LOG.info("YARN Client is shutting down");
-		yarnClient.stop();
-
-		LOG.info("Deleting files in "+sessionFilesDir );
-	}
-
-	public class ClientShutdownHook extends Thread {
-		@Override
-		public void run() {
-			stopSession();
-		}
-	}
-
-	private static class ClusterResourceDescription {
-		public int totalFreeMemory;
-		public int containerLimit;
-	}
-
-	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
-		ClusterResourceDescription crd = new ClusterResourceDescription();
-		crd.totalFreeMemory = 0;
-		crd.containerLimit = 0;
-		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-		for(NodeReport rep : nodes) {
-			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
-			crd.totalFreeMemory += free;
-			if(free > crd.containerLimit) {
-				crd.containerLimit = free;
-			}
-		}
-		return crd;
-	}
-
-	private void printUsage() {
-		System.out.println("Usage:");
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setWidth(200);
-		formatter.setLeftPadding(5);
-		formatter.setSyntaxPrefix("   Required");
-		Options req = new Options();
-		req.addOption(CONTAINER);
-		formatter.printHelp(" ", req);
-
-		formatter.setSyntaxPrefix("   Optional");
-		Options opt = new Options();
-		opt.addOption(VERBOSE);
-		opt.addOption(JM_MEMORY);
-		opt.addOption(TM_MEMORY);
-		opt.addOption(TM_CORES);
-		opt.addOption(QUERY);
-		opt.addOption(QUEUE);
-		opt.addOption(SLOTS);
-		opt.addOption(DYNAMIC_PROPERTIES);
-		formatter.printHelp(" ", opt);
-	}
-
-	private void showClusterMetrics(YarnClient yarnClient)
-			throws YarnException, IOException {
-		YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
-		System.out.println("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
-		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-		final String format = "|%-16s |%-16s %n";
-		System.out.printf("|Property         |Value          %n");
-		System.out.println("+---------------------------------------+");
-		int totalMemory = 0;
-		int totalCores = 0;
-		for(NodeReport rep : nodes) {
-			final Resource res = rep.getCapability();
-			totalMemory += res.getMemory();
-			totalCores += res.getVirtualCores();
-			System.out.format(format, "NodeID", rep.getNodeId());
-			System.out.format(format, "Memory", res.getMemory()+" MB");
-			System.out.format(format, "vCores", res.getVirtualCores());
-			System.out.format(format, "HealthReport", rep.getHealthReport());
-			System.out.format(format, "Containers", rep.getNumContainers());
-			System.out.println("+---------------------------------------+");
-		}
-		System.out.println("Summary: totalMemory "+totalMemory+" totalCores "+totalCores);
-		List<QueueInfo> qInfo = yarnClient.getAllQueues();
-		for(QueueInfo q : qInfo) {
-			System.out.println("Queue: "+q.getQueueName()+", Current Capacity: "+q.getCurrentCapacity()+" Max Capacity: "+q.getMaximumCapacity()+" Applications: "+q.getApplications().size());
-		}
-		yarnClient.stop();
-		System.exit(0);
-	}
-
-	private File generateDefaultConf(Path localJarPath) throws IOException,
-			FileNotFoundException {
-		JarFile jar = null;
-		try {
-			jar = new JarFile(localJarPath.toUri().getPath());
-		} catch(FileNotFoundException fne) {
-			LOG.error("Unable to access jar file. Specify jar file or configuration file.", fne);
-			System.exit(1);
-		}
-		InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));
-
-		if(confStream == null) {
-			LOG.warn("Given jar file does not contain yaml conf.");
-			confStream = this.getClass().getResourceAsStream("flink-conf.yaml");
-			if(confStream == null) {
-				throw new RuntimeException("Unable to find flink-conf in jar file");
-			}
-		}
-		File outFile = new File("flink-conf.yaml");
-		if(outFile.exists()) {
-			throw new RuntimeException("File unexpectedly exists");
-		}
-		FileOutputStream outputStream = new FileOutputStream(outFile);
-		int read = 0;
-		byte[] bytes = new byte[1024];
-		while ((read = confStream.read(bytes)) != -1) {
-			outputStream.write(bytes, 0, read);
-		}
-		confStream.close(); outputStream.close(); jar.close();
-		return outFile;
-	}
-
-	public static void main(String[] args) throws Exception {
-		Client c = new Client();
-		c.run(args);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
deleted file mode 100644
index bd5659a..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.yarn;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringInterner;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
-public class Utils {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
-	private static final int HEAP_LIMIT_CAP = 500;
-	
-
-	public static void copyJarContents(String prefix, String pathToJar) throws IOException {
-		LOG.info("Copying jar (location: "+pathToJar+") to prefix "+prefix);
-		
-		JarFile jar = null;
-		jar = new JarFile(pathToJar);
-		Enumeration<JarEntry> enumr = jar.entries();
-		byte[] bytes = new byte[1024];
-		while(enumr.hasMoreElements()) {
-			JarEntry entry = enumr.nextElement();
-			if(entry.getName().startsWith(prefix)) {
-				if(entry.isDirectory()) {
-					File cr = new File(entry.getName());
-					cr.mkdirs();
-					continue;
-				}
-				InputStream inStream = jar.getInputStream(entry);
-				File outFile = new File(entry.getName());
-				if(outFile.exists()) {
-					throw new RuntimeException("File unexpectedly exists");
-				}
-				FileOutputStream outputStream = new FileOutputStream(outFile);
-				int read = 0;
-				while ((read = inStream.read(bytes)) != -1) {
-					outputStream.write(bytes, 0, read);
-				}
-				inStream.close(); outputStream.close(); 
-			}
-		}
-		jar.close();
-	}
-	
-	/**
-	 * Calculate the heap size for the JVMs to start in the containers.
-	 * Since JVMs are allocating more than just the heap space, and YARN is very
-	 * fast at killing processes that use memory beyond their limit, we have to come
-	 * up with a good heapsize.
-	 * This code takes 85% of the given amount of memory (in MB). If the amount we removed by these 85%
-	 * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB.
-	 * 
-	 */
-	public static int calculateHeapSize(int memory) {
-		int heapLimit = (int)((float)memory*0.80);
-		if( (memory - heapLimit) > HEAP_LIMIT_CAP) {
-			heapLimit = memory-HEAP_LIMIT_CAP;
-		}
-		return heapLimit;
-	}
-	
-	public static void getFlinkConfiguration(String confDir) {
-		GlobalConfiguration.loadConfiguration(confDir);
-	}
-	
-	private static void addPathToConfig(Configuration conf, File path) {
-		// chain-in a new classloader
-		URL fileUrl = null;
-		try {
-			fileUrl = path.toURI().toURL();
-		} catch (MalformedURLException e) {
-			throw new RuntimeException("Erroneous config file path", e);
-		}
-		URL[] urls = {fileUrl};
-		ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader());
-		conf.setClassLoader(cl);
-	}
-	
-	private static void setDefaultConfValues(Configuration conf) {
-		if(conf.get("fs.hdfs.impl",null) == null) {
-			conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-		}
-		if(conf.get("fs.file.impl",null) == null) {
-			conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
-		}
-	}
-	
-	public static Configuration initializeYarnConfiguration() {
-		Configuration conf = new YarnConfiguration();
-		String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
-		if(configuredHadoopConfig != null) {
-			LOG.info("Using hadoop configuration path from " + ConfigConstants.PATH_HADOOP_CONFIG + " setting.");
-			addPathToConfig(conf, new File(configuredHadoopConfig));
-			setDefaultConfValues(conf);
-			return conf;
-		}
-		String[] envs = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", "HADOOP_CONF_PATH" };
-		for(int i = 0; i < envs.length; ++i) {
-			String confPath = System.getenv(envs[i]);
-			if (confPath != null) {
-				LOG.info("Found "+envs[i]+", adding it to configuration");
-				addPathToConfig(conf, new File(confPath));
-				setDefaultConfValues(conf);
-				return conf;
-			}
-		}
-		LOG.info("Could not find HADOOP_CONF_PATH, using HADOOP_HOME.");
-		String hadoopHome = null;
-		try {
-			hadoopHome = Shell.getHadoopHome();
-		} catch (IOException e) {
-			LOG.error("Unable to get hadoop home. Please set HADOOP_HOME variable!", e);
-			System.exit(1);
-		}
-		File tryConf = new File(hadoopHome+"/etc/hadoop");
-		if(tryConf.exists()) {
-			LOG.info("Found configuration using hadoop home.");
-			addPathToConfig(conf, tryConf);
-		} else {
-			tryConf = new File(hadoopHome+"/conf");
-			if(tryConf.exists()) {
-				addPathToConfig(conf, tryConf);
-			}
-		}
-		setDefaultConfValues(conf);
-		return conf;
-	}
-	
-	public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
-		addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
-		for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-			addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
-		}
-	}
-	
-	
-	/**
-	 * 
-	 * @return Path to remote file (usually hdfs)
-	 * @throws IOException
-	 */
-	public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir)
-			throws IOException {
-		// copy to HDFS
-		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
-		
-		Path dst = new Path(homedir, suffix);
-		
-		LOG.info("Copying from "+localRsrcPath+" to "+dst );
-		fs.copyFromLocalFile(localRsrcPath, dst);
-		registerLocalResource(fs, dst, appMasterJar);
-		return dst;
-	}
-	
-	public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
-		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
-		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
-		localResource.setSize(jarStat.getLen());
-		localResource.setTimestamp(jarStat.getModificationTime());
-		localResource.setType(LocalResourceType.FILE);
-		localResource.setVisibility(LocalResourceVisibility.APPLICATION);
-	}
-
-	public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
-		Credentials credentials = new Credentials();
-		// for HDFS
-		TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
-		// for user
-		UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
-		
-		Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
-		for(Token<? extends TokenIdentifier> token : usrTok) {
-			final Text id = new Text(token.getIdentifier());
-			LOG.info("Adding user token "+id+" with "+token);
-			credentials.addToken(id, token);
-		}
-		DataOutputBuffer dob = new DataOutputBuffer();
-		credentials.writeTokenStorageToStream(dob);
-
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
-		}
-
-		ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-		amContainer.setTokens(securityTokens);
-	}
-	
-	public static void logFilesInCurrentDirectory(final Logger logger) {
-		new File(".").list(new FilenameFilter() {
-			
-			@Override
-			public boolean accept(File dir, String name) {
-				logger.info(dir.getAbsolutePath()+"/"+name);
-				return true;
-			}
-		});
-	}
-	
-	/**
-	 * Copied method from org.apache.hadoop.yarn.util.Apps
-	 * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
-	 * by https://issues.apache.org/jira/browse/YARN-1931
-	 */
-	public static void addToEnvironment(Map<String, String> environment,
-			String variable, String value) {
-		String val = environment.get(variable);
-		if (val == null) {
-			val = value;
-		} else {
-			val = val + File.pathSeparator + value;
-		}
-		environment.put(StringInterner.weakIntern(variable),
-				StringInterner.weakIntern(val));
-	}
-	
-	/**
-	 * Valid ports are 1024 - 65535.
-	 * We offset the incoming port by the applicationId to avoid port collisions if YARN allocates two ApplicationMasters
-	 * on the same physical hardware
-	 */
-	public static int offsetPort(int port, int appId) {
-		if(port > 65535) {
-			LOG.warn("The specified YARN RPC port ("+port+") is above the maximum possible port 65535."
-					+ "Setting it to "+64535);
-			port = 64535;
-		}
-		if(port + (appId % 1000) > 65535) {
-			LOG.warn("The specified YARN RPC port ("+port+") is, when offsetted by the ApplicationID ("+appId+") above "
-					+ "the maximum possible port 65535. Setting it to "+64535);
-			port = port - 1000;
-		}
-		return port + (appId % 1000);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
deleted file mode 100644
index 33a8942..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn.appMaster;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Map;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import org.apache.flink.yarn.YarnUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.yarn.Client;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import scala.Tuple2;
-
-
-public class YarnTaskManagerRunner {
-
-	private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class);
-
-	public static void main(final String[] args) throws IOException {
-		Map<String, String> envs = System.getenv();
-		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
-
-		// configure local directory
-		final String[] newArgs = Arrays.copyOf(args, args.length + 2);
-		newArgs[newArgs.length-2] = "--tempDir";
-		newArgs[newArgs.length-1] = localDirs;
-		LOG.info("Setting log path "+localDirs);
-		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
-				+ " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
-		}
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					Tuple2<ActorSystem, ActorRef> tuple = YarnUtils
-							.startActorSystemAndTaskManager(newArgs);
-
-					tuple._1().awaitTermination();
-				} catch (Exception e) {
-					LOG.error("Error while running the TaskManager", e);
-				}
-				return null;
-			}
-		});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
deleted file mode 100644
index 4a6e8cb..0000000
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import java.io.{File, FileOutputStream}
-import java.util.Properties
-
-import akka.actor._
-import akka.camel.{Consumer, CamelMessage}
-import org.apache.flink.client.CliFrontend
-import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.yarn.Messages._
-import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicationState,
-ApplicationId}
-import org.apache.hadoop.yarn.client.api.YarnClient
-import scala.concurrent.duration._
-
-class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
-                        confDirPath: String, slots: Int, numTaskManagers: Int,
-                        dynamicPropertiesEncoded: String, timeout: FiniteDuration)
-  extends Actor with Consumer with ActorLogMessages with ActorLogging {
-  import context._
-
-  val INITIAL_POLLING_DELAY = 0 seconds
-  val WAIT_FOR_YARN_INTERVAL = 500 milliseconds
-  val POLLING_INTERVAL = 3 seconds
-
-  val waitingChars = Array[Char]('/', '|', '\\', '-')
-
-  var jobManager: Option[ActorRef] = None
-  var pollingTimer: Option[Cancellable] = None
-  var running = false
-  var waitingCharsIndex = 0
-
-  def endpointUri = "stream:in"
-
-  override def preStart(): Unit = {
-    super.preStart()
-    pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY,
-      WAIT_FOR_YARN_INTERVAL, self, PollYarnReport))
-  }
-
-  override def postStop(): Unit = {
-    log.info("Stopped Application client.")
-    pollingTimer foreach {
-      _.cancel()
-    }
-
-    pollingTimer = None
-  }
-
-  override def receiveWithLogMessages: Receive = {
-    case PollYarnReport => {
-      val report = yarnClient.getApplicationReport(appId)
-
-      report.getYarnApplicationState match {
-        case YarnApplicationState.FINISHED | YarnApplicationState.KILLED | YarnApplicationState
-          .FAILED => {
-          log.info(s"Terminate polling.")
-
-          context.system.shutdown()
-        }
-        case YarnApplicationState.RUNNING if !running => {
-          val address = s"${report.getHost}:$port"
-          log.info(s"Flink JobManager is now running on $address")
-          log.info(s"JobManager Web Interface: ${report.getTrackingUrl}")
-
-          writeYarnProperties(address)
-
-          jobManager = Some(AkkaUtils.getReference(JobManager.getRemoteAkkaURL(address))(system,
-            timeout))
-          jobManager.get ! RegisterMessageListener
-
-          pollingTimer foreach {
-            _.cancel()
-          }
-
-          pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY,
-            POLLING_INTERVAL, self, PollYarnReport))
-
-          running = true
-        }
-        case _ =>
-      }
-
-      if(!running){
-        print(waitingChars(waitingCharsIndex) + "\r")
-        waitingCharsIndex += 1
-
-        if(waitingCharsIndex >= waitingChars.length){
-          waitingCharsIndex = 0
-        }
-      }
-    }
-    case msg: YarnMessage => {
-      println(msg)
-    }
-    case msg: StopYarnSession => {
-      log.info("Stop yarn session.")
-      jobManager foreach {
-        _ forward msg
-      }
-    }
-    case msg: CamelMessage => {
-      msg.bodyAs[String] match {
-        case "stop" | "quit" | "exit" => self ! StopYarnSession(FinalApplicationStatus.KILLED)
-        case "help" => printHelp
-        case msg => println(s"Unknown command ${msg}.")
-      }
-    }
-  }
-
-  def printHelp: Unit = {
-    println(
-      """Available commands:
-        |stop : Stop the YARN session
-      """.stripMargin)
-  }
-
-  def writeYarnProperties(address: String): Unit = {
-    val yarnProps = new Properties()
-    yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, address)
-
-    if(slots > 0){
-      yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, (slots * numTaskManagers).toString )
-    }
-
-    if(dynamicPropertiesEncoded != null){
-      yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
-        dynamicPropertiesEncoded)
-    }
-
-    val yarnPropertiesFile = new File(confDirPath + CliFrontend.YARN_PROPERTIES_FILE)
-
-    val out = new FileOutputStream(yarnPropertiesFile)
-    yarnProps.store(out, "Generated YARN properties file")
-    out.close()
-    yarnPropertiesFile.setReadable(true, false)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
deleted file mode 100644
index 64db0ad..0000000
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import java.io.{PrintWriter, FileWriter, BufferedWriter}
-import java.security.PrivilegedAction
-
-import akka.actor._
-import org.apache.flink.client.CliFrontend
-import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants}
-import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager}
-import org.apache.flink.yarn.Messages.StartYarnSession
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.slf4j.LoggerFactory
-
-import scala.io.Source
-
-object ApplicationMaster{
-  import scala.collection.JavaConversions._
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  val CONF_FILE = "flink-conf.yaml"
-  val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
-
-  def main(args: Array[String]): Unit ={
-    val yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME)
-    LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " +
-      s"' setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}'")
-
-    val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
-
-    for(token <- UserGroupInformation.getCurrentUser.getTokens){
-      ugi.addToken(token)
-    }
-
-    ugi.doAs(new PrivilegedAction[Object] {
-      override def run(): Object = {
-        var actorSystem: ActorSystem = null
-        var jobManager: ActorRef = ActorRef.noSender
-
-        try {
-          val conf = Utils.initializeYarnConfiguration()
-
-          val env = System.getenv()
-
-          val currDir = env.get(Environment.PWD.key())
-          require(currDir != null, "Current directory unknown.")
-
-          val logDirs = env.get(Environment.LOG_DIRS.key())
-
-          val ownHostname = env.get(Environment.NM_HOST.key())
-          require(ownHostname != null, s"Own hostname not set.")
-
-          val taskManagerCount = env.get(Client.ENV_TM_COUNT).toInt
-          val slots = env.get(Client.ENV_SLOTS).toInt
-          val dynamicPropertiesEncodedString = env.get(Client.ENV_DYNAMIC_PROPERTIES)
-
-          val appNumber = env.get(Client.ENV_APP_NUMBER).toInt
-
-          val jobManagerPort = GlobalConfiguration.getInteger(
-            ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-            ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) match {
-            case x if x <= 0 => x
-            case x => x + appNumber
-          }
-
-          val jobManagerWebPort = GlobalConfiguration.getInteger(ConfigConstants
-            .JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)
-
-          generateConfigurationFile(currDir, ownHostname, jobManagerPort, jobManagerWebPort,
-            logDirs, slots, taskManagerCount, dynamicPropertiesEncodedString)
-
-          val (system, actor) = startJobManager(currDir)
-
-          actorSystem = system
-          jobManager = actor
-
-          LOG.info("Start yarn session on job manager.")
-          jobManager ! StartYarnSession(conf)
-
-          LOG.info("Await termination of actor system.")
-          actorSystem.awaitTermination()
-        }catch{
-          case t: Throwable =>
-            LOG.error("Error while running the application master.", t)
-
-            if(actorSystem != null){
-              actorSystem.shutdown()
-              actorSystem.awaitTermination()
-
-              actorSystem = null
-            }
-        }
-
-        null
-      }
-    })
-
-  }
-
-  def generateConfigurationFile(currDir: String, ownHostname: String, jobManagerPort: Int,
-                               jobManagerWebPort: Int, logDirs: String, slots: Int,
-                               taskManagerCount: Int, dynamicPropertiesEncodedString: String)
-  : Unit = {
-    LOG.info("Generate configuration file for application master.")
-    val output = new PrintWriter(new BufferedWriter(
-      new FileWriter(s"$currDir/$MODIFIED_CONF_FILE"))
-    )
-
-    for (line <- Source.fromFile(s"$currDir/$CONF_FILE").getLines() if !(line.contains
-      (ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY))) {
-      output.println(line)
-    }
-
-    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname")
-    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort")
-    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs")
-    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort")
-
-    if(slots != -1){
-      output.println(s"${ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS}: $slots")
-      output.println(
-        s"${ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY}: ${slots*taskManagerCount}")
-    }
-
-    // add dynamic properties
-    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
-
-    import scala.collection.JavaConverters._
-
-    for(property <- dynamicProperties.asScala){
-      output.println(s"${property.f0}: ${property.f1}")
-    }
-
-    output.close()
-  }
-
-  def startJobManager(currDir: String): (ActorSystem, ActorRef) = {
-    LOG.info("Start job manager for yarn")
-    val pathToConfig = s"$currDir/$MODIFIED_CONF_FILE"
-    val args = Array[String]("--configDir", pathToConfig)
-
-    LOG.info(s"Config path: ${pathToConfig}.")
-    val (hostname, port, configuration, _) = JobManager.parseArgs(args)
-
-    implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port, configuration)
-
-    LOG.info("Start job manager actor.")
-    (jobManagerSystem, JobManager.startActor(Props(new JobManager(configuration) with
-      WithWebServer with YarnJobManager)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
deleted file mode 100644
index cc92165..0000000
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import java.util.Date
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-
-object Messages {
-  case class YarnMessage(message: String, date: Date = new Date())
-  case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int)
-  case object RegisterMessageListener
-
-  case class StopYarnSession(status: FinalApplicationStatus)
-  case class StartYarnSession(configuration: Configuration)
-
-  case object PollContainerCompletion
-  case object PollYarnReport
-  case object CheckForUserCommand
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
deleted file mode 100644
index 63c9b71..0000000
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import java.io.{IOException, File}
-import java.nio.ByteBuffer
-import java.util.{ Collections}
-
-import akka.actor.{ActorRef}
-import org.apache.flink.configuration.ConfigConstants
-import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.yarn.Messages._
-import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.{NMClient, AMRMClient}
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.Records
-
-import scala.concurrent.duration._
-
-
-trait YarnJobManager extends ActorLogMessages {
-  that: JobManager =>
-
-  import context._
-  import scala.collection.JavaConverters._
-
-  val ALLOCATION_DELAY = 100 milliseconds
-  val COMPLETION_DELAY = 5 seconds
-
-  var rmClientOption: Option[AMRMClient[ContainerRequest]] = None
-  var nmClientOption: Option[NMClient] = None
-  var messageListener:Option[ActorRef] = None
-  var containerLaunchContext: Option[ContainerLaunchContext] = None
-
-  var allocatedContainers = 0
-  var completedContainers = 0
-  var numTaskManager = 0
-
-
-  abstract override def receiveWithLogMessages: Receive = {
-    receiveYarnMessages orElse super.receiveWithLogMessages
-  }
-
-  def receiveYarnMessages: Receive = {
-    case StopYarnSession(status) =>
-      log.info("Stopping Yarn Session.")
-
-      instanceManager.getAllRegisteredInstances.asScala foreach {
-        instance =>
-          instance.getTaskManager ! StopYarnSession(status)
-      }
-
-      rmClientOption foreach {
-        rmClient =>
-          rmClient.unregisterApplicationMaster(status, "", "")
-          rmClient.close()
-      }
-
-      rmClientOption = None
-
-      nmClientOption foreach {
-        _.close()
-      }
-
-      nmClientOption = None
-
-      context.system.shutdown()
-
-    case RegisterMessageListener =>
-      messageListener = Some(sender)
-
-    case StartYarnSession(conf) => {
-      log.info("Start yarn session.")
-      val memoryPerTaskManager = env.get(Client.ENV_TM_MEMORY).toInt
-      val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager)
-
-      val applicationMasterHost = env.get(Environment.NM_HOST.key)
-      require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.")
-
-      numTaskManager = env.get(Client.ENV_TM_COUNT).toInt
-      log.info(s"Requesting ${numTaskManager} task managers.")
-
-      val coresPerTaskManager = env.get(Client.ENV_TM_CORES).toInt
-      val remoteFlinkJarPath = env.get(Client.FLINK_JAR_PATH)
-      val fs = FileSystem.get(conf)
-      val appId = env.get(Client.ENV_APP_ID)
-      val currDir = env.get(Environment.PWD.key())
-      val clientHomeDir = env.get(Client.ENV_CLIENT_HOME_DIR)
-      val shipListString = env.get(Client.ENV_CLIENT_SHIP_FILES)
-      val yarnClientUsername = env.get(Client.ENV_CLIENT_USERNAME)
-
-      val jobManagerWebPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-        ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)
-
-      val rm = AMRMClient.createAMRMClient[ContainerRequest]()
-      rm.init(conf)
-      rm.start()
-
-      rmClientOption = Some(rm)
-
-      val nm = NMClient.createNMClient()
-      nm.init(conf)
-      nm.start()
-      nm.cleanupRunningContainersOnStop(true)
-
-      nmClientOption = Some(nm)
-
-      // Register with ResourceManager
-      val url = s"http://$applicationMasterHost:$jobManagerWebPort"
-      log.info(s"Registering ApplicationMaster with tracking url $url.")
-      rm.registerApplicationMaster(applicationMasterHost, 0, url)
-
-
-      // Priority for worker containers - priorities are intra-application
-      val priority = Records.newRecord(classOf[Priority])
-      priority.setPriority(0)
-
-      // Resource requirements for worker containers
-      val capability = Records.newRecord(classOf[Resource])
-      capability.setMemory(memoryPerTaskManager)
-      capability.setVirtualCores(coresPerTaskManager)
-
-      // Make container requests to ResourceManager
-      for (i <- 0 until numTaskManager) {
-        val containerRequest = new ContainerRequest(capability, null, null, priority)
-        log.info(s"Requesting TaskManager container $i.")
-        rm.addContainerRequest(containerRequest)
-      }
-
-      val flinkJar = Records.newRecord(classOf[LocalResource])
-      val flinkConf = Records.newRecord(classOf[LocalResource])
-
-      // register Flink Jar with remote HDFS
-      val remoteJarPath = new Path(remoteFlinkJarPath)
-      Utils.registerLocalResource(fs, remoteJarPath, flinkJar)
-
-      // register conf with local fs
-      Utils.setupLocalResource(conf, fs, appId, new Path(s"file://$currDir/flink-conf-modified" +
-        s".yaml"), flinkConf, new Path(clientHomeDir))
-      log.info(s"Prepared local resource for modified yaml: $flinkConf")
-
-      val hasLogback = new File(s"$currDir/logback.xml").exists()
-      val hasLog4j = new File(s"$currDir/log4j.properties").exists()
-
-      // prepare files to be shipped
-      val resources = shipListString.split(",") flatMap {
-        pathStr =>
-          if (pathStr.isEmpty) {
-            None
-          } else {
-            val resource = Records.newRecord(classOf[LocalResource])
-            val path = new Path(pathStr)
-            Utils.registerLocalResource(fs, path, resource)
-            Some((path.getName, resource))
-          }
-      } toList
-
-      val taskManagerLocalResources = ("flink.jar", flinkJar) ::("flink-conf.yaml",
-        flinkConf) :: resources toMap
-
-      allocatedContainers = 0
-      completedContainers = 0
-
-      containerLaunchContext = Some(createContainerLaunchContext(heapLimit, hasLogback, hasLog4j,
-        yarnClientUsername, conf, taskManagerLocalResources))
-
-      context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion)
-    }
-
-    case PollContainerCompletion => {
-      rmClientOption match {
-        case Some(rmClient) => {
-          val response = rmClient.allocate(completedContainers.toFloat / numTaskManager)
-
-          for (container <- response.getAllocatedContainers.asScala) {
-            log.info(s"Got new container for TM ${container.getId} on host ${
-              container.getNodeId.getHost}")
-
-            allocatedContainers += 1
-
-            log.info(s"Launching container #$allocatedContainers.")
-            nmClientOption match {
-              case Some(nmClient) => {
-                containerLaunchContext match {
-                  case Some(ctx) => nmClient.startContainer(container, ctx)
-                  case None => {
-                    log.error("The ContainerLaunchContext was not set.")
-                    self ! StopYarnSession(FinalApplicationStatus.FAILED)
-                  }
-                }
-              }
-              case None => {
-                log.error("The NMClient was not set.")
-                self ! StopYarnSession(FinalApplicationStatus.FAILED)
-              }
-            }
-          }
-
-          for (status <- response.getCompletedContainersStatuses.asScala) {
-            completedContainers += 1
-            log.info(s"Completed container ${status.getContainerId}. Total completed " +
-              s"$completedContainers.")
-            log.info(s"Diagnostics ${status.getDiagnostics}.")
-
-            messageListener foreach {
-              _ ! YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " +
-                s"state=${status.getState}.\n${status.getDiagnostics}")
-            }
-          }
-
-          if (allocatedContainers < numTaskManager) {
-            context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion)
-          } else if (completedContainers < numTaskManager) {
-            context.system.scheduler.scheduleOnce(COMPLETION_DELAY, self, PollContainerCompletion)
-          } else {
-            self ! StopYarnSession(FinalApplicationStatus.FAILED)
-          }
-        }
-        case None => {
-          log.error("The AMRMClient was not set.")
-          self ! StopYarnSession(FinalApplicationStatus.FAILED)
-        }
-      }
-    }
-  }
-
-  def createContainerLaunchContext(heapLimit: Int, hasLogback: Boolean, hasLog4j: Boolean,
-                                   yarnClientUsername: String, yarnConf: Configuration,
-                                   taskManagerLocalResources: Map[String, LocalResource]):
-  ContainerLaunchContext = {
-    log.info("Create container launch context.")
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-
-    val javaOpts = configuration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
-    val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xmx${heapLimit}m $javaOpts")
-
-    if (hasLogback || hasLog4j) {
-      tmCommand ++=
-        s""" -Dlog.file="${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.log""""
-    }
-
-    if (hasLogback) {
-      tmCommand ++= s" -Dlogback.configurationFile=file:logback.xml"
-    }
-
-    if (hasLog4j) {
-      tmCommand ++= s" -Dlog4j.configuration=file:log4j.properties"
-    }
-
-    tmCommand ++= s" ${classOf[YarnTaskManagerRunner].getName} --configDir . 1> " +
-      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " +
-      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log"
-
-    ctx.setCommands(Collections.singletonList(tmCommand.toString()))
-
-    log.info(s"Starting TM with command=${tmCommand.toString()}")
-
-    ctx.setLocalResources(taskManagerLocalResources.asJava)
-
-    // Setup classpath for container ( = TaskManager )
-    val containerEnv = new java.util.HashMap[String, String]()
-    Utils.setupEnv(yarnConf, containerEnv)
-    containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername)
-    ctx.setEnvironment(containerEnv)
-
-    val user = UserGroupInformation.getCurrentUser
-
-    try {
-      val credentials = user.getCredentials
-      val dob = new DataOutputBuffer()
-      credentials.writeTokenStorageToStream(dob)
-      val securityTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
-      ctx.setTokens(securityTokens)
-    } catch {
-      case e: IOException =>
-        log.warning("Getting current user info failed when trying to launch the container", e)
-    }
-
-    ctx
-  }
-
-  def env = System.getenv()
-}


Mime
View raw message