flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [7/7] flink git commit: [FLINK-3545][yarn] integrate ResourceManager support
Date Tue, 29 Mar 2016 10:52:19 GMT
[FLINK-3545][yarn] integrate ResourceManager support


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4405235e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4405235e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4405235e

Branch: refs/heads/master
Commit: 4405235e5483d3e4ad94f4ba31627aa852580042
Parents: 92ff2b1
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Feb 24 09:20:42 2016 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Mar 29 12:51:46 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |   2 +
 .../flink/yarn/TestingApplicationMaster.java    |  29 +-
 .../yarn/TestingYarnFlinkResourceManager.java   |  56 ++
 .../java/org/apache/flink/yarn/UtilsTest.java   |  26 +-
 .../YARNSessionCapacitySchedulerITCase.java     |  23 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   6 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   5 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |  20 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |   4 +-
 .../apache/flink/yarn/FlinkYarnClientBase.java  |  50 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java |  12 +-
 .../flink/yarn/RegisteredYarnWorkerNode.java    |  51 ++
 .../main/java/org/apache/flink/yarn/Utils.java  |  48 +-
 .../flink/yarn/YarnApplicationMasterRunner.java | 605 ++++++++++++++++
 .../org/apache/flink/yarn/YarnConfigKeys.java   |  48 ++
 .../flink/yarn/YarnContainerInLaunch.java       |  56 ++
 .../flink/yarn/YarnFlinkResourceManager.java    | 670 ++++++++++++++++++
 .../YarnResourceManagerCallbackHandler.java     |  98 +++
 .../flink/yarn/YarnTaskManagerRunner.java       |  21 +-
 .../yarn/messages/ContainersAllocated.java      |  49 ++
 .../flink/yarn/messages/ContainersComplete.java |  50 ++
 .../apache/flink/yarn/ApplicationClient.scala   |  56 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |  38 -
 .../flink/yarn/ApplicationMasterBase.scala      | 289 --------
 .../org/apache/flink/yarn/YarnJobManager.scala  | 699 +------------------
 .../org/apache/flink/yarn/YarnMessages.scala    |  18 +-
 .../flink/yarn/YarnProcessShutDownThread.java   |  82 ---
 .../org/apache/flink/yarn/YarnTaskManager.scala |  28 +-
 28 files changed, 1910 insertions(+), 1229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 18f1f69..755ea7e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -339,6 +339,7 @@ public final class ConfigConstants {
 	 * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
 	 * 	yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 	 * in the flink-conf.yaml.
+	 * @deprecated Please use {@code CONTAINERED_MASTER_ENV_PREFIX}.
 	 */
 	@Deprecated
 	public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env.";
@@ -346,6 +347,7 @@ public final class ConfigConstants {
 	/**
 	 * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows
 	 * setting custom environment variables.
+	 * @deprecated Please use {@code CONTAINERED_TASK_MANAGER_ENV_PREFIX}.
 	 */
 	@Deprecated
 	public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
index 6671eb4..b0757f5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
@@ -20,13 +20,18 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SignalHandler;
 
 /**
- * Yarn application master which starts the {@link TestingYarnJobManager} and the
- * {@link TestingMemoryArchivist}.
+ * Yarn application master which starts the {@link TestingYarnJobManager},
+ * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}.
  */
-public class TestingApplicationMaster extends ApplicationMasterBase {
+public class TestingApplicationMaster extends YarnApplicationMasterRunner {
+
 	@Override
 	public Class<? extends JobManager> getJobManagerClass() {
 		return TestingYarnJobManager.class;
@@ -37,9 +42,23 @@ public class TestingApplicationMaster extends ApplicationMasterBase {
 		return TestingMemoryArchivist.class;
 	}
 
+	@Override
+	protected Class<? extends TaskManager> getTaskManagerClass() {
+		return TestingYarnTaskManager.class;
+	}
+
+	@Override
+	public Class<? extends YarnFlinkResourceManager> getResourceManagerClass() {
+		return TestingYarnFlinkResourceManager.class;
+	}
+
 	public static void main(String[] args) {
-		TestingApplicationMaster applicationMaster = new TestingApplicationMaster();
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
+		SignalHandler.register(LOG);
 
-		applicationMaster.run(args);
+		// run and exit with the proper return code
+		int returnCode = new TestingApplicationMaster().run(args);
+		System.exit(returnCode);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
new file mode 100644
index 0000000..5a61b8f
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Flink's testing resource manager for Yarn.
+ */
+public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
+
+	public TestingYarnFlinkResourceManager(
+		Configuration flinkConfig,
+		YarnConfiguration yarnConfig,
+		LeaderRetrievalService leaderRetrievalService,
+		String applicationMasterHostName,
+		String webInterfaceURL,
+		ContaineredTaskManagerParameters taskManagerParameters,
+		ContainerLaunchContext taskManagerLaunchContext,
+		int yarnHeartbeatIntervalMillis,
+		int maxFailedContainers,
+		int numInitialTaskManagers) {
+
+		super(
+			flinkConfig,
+			yarnConfig,
+			leaderRetrievalService,
+			applicationMasterHostName,
+			webInterfaceURL,
+			taskManagerParameters,
+			taskManagerLaunchContext,
+			yarnHeartbeatIntervalMillis,
+			maxFailedContainers,
+			numInitialTaskManagers);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 968e769..784bf24 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -57,8 +57,8 @@ public class UtilsTest {
 	@Test
 	public void testHeapCutoff() {
 		Configuration conf = new Configuration();
-		conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
-		conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
+		conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15);
+		conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384);
 
 		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
 		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
@@ -66,35 +66,43 @@ public class UtilsTest {
 		// test different configuration
 		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000");
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1");
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000");
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1");
 		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.5");
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5");
 		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1");
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+
+		// test also deprecated keys
+		conf = new Configuration();
+		conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
+		conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
+
+		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
+		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void illegalArgument() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1.1");
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void illegalArgumentNegative() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "-0.01");
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void tooMuchCutoff() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "6000");
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 17224b9..38e17a5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -204,7 +204,9 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 
 			// test logfile access
 			String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
-			Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version"));
+			Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster"));
+			Assert.assertTrue(logs.contains("Starting JobManager"));
+			Assert.assertTrue(logs.contains("Starting JobManager Web Frontend"));
 		} catch(Throwable e) {
 			LOG.warn("Error while running test",e);
 			Assert.fail(e.getMessage());
@@ -228,7 +230,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
 			for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
 				String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
-				if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) {
+				if(command.contains(YarnTaskManager.class.getSimpleName())) {
 					taskManagerContainer = entry.getKey();
 					nodeManager = nm;
 					nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
@@ -279,7 +281,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			int killedOff = o.indexOf("Container killed by the ApplicationMaster");
 			if (killedOff != -1) {
 				o = o.substring(killedOff);
-				ok = o.indexOf("Launching container") > 0;
+				ok = o.indexOf("Launching TaskManager") > 0;
 			}
 			sleep(1000);
 		} while(!ok);
@@ -304,9 +306,14 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
 
 		// ------ Check if everything happened correctly
-		Assert.assertTrue("Expect to see failed container", eC.contains("New messages from the YARN cluster"));
-		Assert.assertTrue("Expect to see failed container", eC.contains("Container killed by the ApplicationMaster"));
-		Assert.assertTrue("Expect to see new container started", eC.contains("Launching container") && eC.contains("on host"));
+		Assert.assertTrue("Expect to see failed container",
+			eC.contains("New messages from the YARN cluster"));
+
+		Assert.assertTrue("Expect to see failed container",
+			eC.contains("Container killed by the ApplicationMaster"));
+
+		Assert.assertTrue("Expect to see new container started",
+			eC.contains("Launching TaskManager") && eC.contains("on host"));
 
 		// cleanup auth for the subsequent tests.
 		remoteUgi.getTokenIdentifiers().remove(nmIdent);
@@ -502,7 +509,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
 			content = FileUtils.readFileToString(jobmanagerLog);
 			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
-			String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m";
+			String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m";
 			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
 				content.contains(expected));
 			expected = " (2/2) (attempt #0) to ";

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index db9af8c..d713b73 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -20,9 +20,9 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.FlinkYarnSessionCli;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -239,14 +239,14 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			LOG.warn("Failing test", e);
 			Assert.fail("Error while deploying YARN cluster: "+e.getMessage());
 		}
-		FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
+		GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
 		for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
 			try {
 				Thread.sleep(1000);
 			} catch (InterruptedException e) {
 				LOG.warn("Interrupted", e);
 			}
-			FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
+			GetClusterStatusResponse status = yarnCluster.getClusterStatus();
 			if(status != null && status.equals(expectedStatus)) {
 				LOG.info("Cluster reached status " + status);
 				break; // all good, cluster started

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 4e6b745..a6289d4 100644
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -78,7 +78,4 @@ class TestingYarnJobManager(
     checkpointRecoveryFactory,
     savepointStore,
     jobRecoveryTimeout)
-  with TestingJobManagerLike {
-
-  override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]
-}
+  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 11c7f88..73ab7eb 100644
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
   * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
   *
   * @param config Configuration object for the actor
+  * @param resourceID The Yarn container id
   * @param connectionInfo Connection information of this actor
   * @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation
   * @param ioManager IOManager responsible for I/O
@@ -42,6 +44,7 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
   */
 class TestingYarnTaskManager(
     config: TaskManagerConfiguration,
+    resourceID: ResourceID,
     connectionInfo: InstanceConnectionInfo,
     memoryManager: MemoryManager,
     ioManager: IOManager,
@@ -50,10 +53,25 @@ class TestingYarnTaskManager(
     leaderRetrievalService: LeaderRetrievalService)
   extends YarnTaskManager(
     config,
+    resourceID,
     connectionInfo,
     memoryManager,
     ioManager,
     network,
     numberOfSlots,
     leaderRetrievalService)
-  with TestingTaskManagerLike {}
+  with TestingTaskManagerLike {
+
+  object YarnTaskManager {
+
+    /** Entry point (main method) to run the TaskManager on YARN.
+      * @param args The command line arguments.
+      */
+    def main(args: Array[String]): Unit = {
+      YarnTaskManagerRunner.runYarnTaskManager(args, classOf[TestingYarnTaskManager])
+    }
+
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 4225e68..467e06d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -18,11 +18,11 @@
 package org.apache.flink.yarn;
 
 /**
- * Default implementation of {@link FlinkYarnClientBase} which starts an {@link ApplicationMaster}.
+ * Default implementation of {@link FlinkYarnClientBase} which starts an {@link YarnApplicationMasterRunner}.
  */
 public class FlinkYarnClient extends FlinkYarnClientBase {
 	@Override
 	protected Class<?> getApplicationMasterClass() {
-		return ApplicationMaster.class;
+		return YarnApplicationMasterRunner.class;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
index 4e4a2f5..ef02be3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
@@ -85,24 +85,6 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class);
 
 	/**
-	 * Constants,
-	 * all starting with ENV_ are used as environment variables to pass values from the Client
-	 * to the Application Master.
-	 */
-	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
-	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
-	public final static String ENV_APP_ID = "_APP_ID";
-	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
-	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
-	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
-	public static final String ENV_SLOTS = "_SLOTS";
-	public static final String ENV_DETACHED = "_DETACHED";
-	public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
-	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
-
-
-	/**
 	 * Minimum memory requirements, checked by the Client.
 	 */
 	private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
@@ -168,6 +150,9 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
 		}
 	}
 
+	/**
+	 * The class to bootstrap the application master of the Yarn cluster (runs main method).
+	 */
 	protected abstract Class<?> getApplicationMasterClass();
 
 	@Override
@@ -495,7 +480,8 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
 		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
 
 		String amCommand = "$JAVA_HOME/bin/java"
-			+ " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) + "M " +javaOpts;
+			+ " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration)
+			+ "M " + javaOpts;
 
 		if(hasLogback || hasLog4j) {
 			amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
@@ -554,8 +540,8 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
 		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-		Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
-		Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+		Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
+		Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
 		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
 		localResources.put("flink.jar", appMasterJar);
 		localResources.put("flink-conf.yaml", flinkConf);
@@ -569,7 +555,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
 			File shipFile = shipFiles.get(i);
 			LocalResource shipResources = Records.newRecord(LocalResource.class);
 			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
-			paths[2 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
+			paths[2 + i] = Utils.setupLocalResource(fs, appId.toString(),
 				shipLocalPath, shipResources, fs.getHomeDirectory());
 			localResources.put(shipFile.getName(), shipResources);
 
@@ -598,18 +584,18 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
 		// set classpath from YARN configuration
 		Utils.setupEnv(conf, appMasterEnv);
 		// set Flink on YARN internal configuration values
-		appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, String.valueOf(taskManagerCount));
-		appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
-		appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() );
-		appMasterEnv.put(FlinkYarnClient.ENV_APP_ID, appId.toString());
-		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
-		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
-		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
-		appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots));
-		appMasterEnv.put(FlinkYarnClient.ENV_DETACHED, String.valueOf(detached));
+		appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount));
+		appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
+		appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString() );
+		appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
+		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
+		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
+		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
+		appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
+		appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
 
 		if(dynamicPropertiesEncoded != null) {
-			appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+			appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
 		}
 
 		amContainer.setEnvironment(appMasterEnv);

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 2d7e213..13c19a9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -30,11 +30,12 @@ import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -292,7 +293,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	 * This method is only available if the cluster hasn't been started in detached mode.
 	 */
 	@Override
-	public FlinkYarnClusterStatus getClusterStatus() {
+	public GetClusterStatusResponse getClusterStatus() {
 		if(!isConnected) {
 			throw new IllegalStateException("The cluster is not connected to the ApplicationMaster.");
 		}
@@ -309,7 +310,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		if(clusterStatus instanceof None$) {
 			return null;
 		} else if(clusterStatus instanceof Some) {
-			return (FlinkYarnClusterStatus) (((Some) clusterStatus).get());
+			return (GetClusterStatusResponse) (((Some) clusterStatus).get());
 		} else {
 			throw new RuntimeException("Unexpected type: " + clusterStatus.getClass().getCanonicalName());
 		}
@@ -395,8 +396,8 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 				} else {
 					Object obj = messageOption.get();
 
-					if(obj instanceof YarnMessages.YarnMessage) {
-						YarnMessages.YarnMessage msg = (YarnMessages.YarnMessage) obj;
+					if(obj instanceof InfoMessage) {
+						InfoMessage msg = (InfoMessage) obj;
 						ret.add("[" + msg.date() + "] " + msg.message());
 					} else {
 						LOG.warn("LocalGetYarnMessage returned unexpected type: " + messageOption);
@@ -445,7 +446,6 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 							new YarnMessages.LocalStopYarnSession(finalStatus,
 									"Flink YARN Client requested shutdown"),
 							new Timeout(akkaDuration));
-
 					Await.ready(response, akkaDuration);
 				} catch(Exception e) {
 					LOG.warn("Error while stopping YARN Application Client", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
new file mode 100644
index 0000000..1fdb32c
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import static java.util.Objects.requireNonNull;
+
+public class RegisteredYarnWorkerNode extends ResourceID {
+	
+	/** The container on which the worker runs */
+	private final Container yarnContainer;
+
+	public RegisteredYarnWorkerNode(
+		ResourceID resourceId, Container yarnContainer)
+	{
+		super(resourceId.getResourceId());
+		this.yarnContainer = requireNonNull(yarnContainer);
+	}
+
+	public Container yarnContainer() {
+		return yarnContainer;
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "RegisteredYarnWorkerNode{" +
+			"yarnContainer=" + yarnContainer +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index a5dc6ec..4e8f747 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.yarn;
 
 import java.io.File;
@@ -25,9 +26,12 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -60,14 +64,26 @@ public final class Utils {
 	 * See documentation
 	 */
 	public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) {
-		float memoryCutoffRatio = conf.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
-		int minCutoff = conf.getInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_MIN_HEAP_CUTOFF);
+
+		BootstrapTools.substituteDeprecatedConfigKey(conf,
+			ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO);
+		BootstrapTools.substituteDeprecatedConfigKey(conf,
+			ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN);
+
+		float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO,
+			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+		int minCutoff = conf.getInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN,
+			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
 
 		if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
-			throw new IllegalArgumentException("The configuration value '" + ConfigConstants.YARN_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
+			throw new IllegalArgumentException("The configuration value '"
+				+ ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO
+				+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
 		}
 		if (minCutoff > memory) {
-			throw new IllegalArgumentException("The configuration value '" + ConfigConstants.YARN_HEAP_CUTOFF_MIN + "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
+			throw new IllegalArgumentException("The configuration value '"
+				+ ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN
+				+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
 		}
 
 		int heapLimit = (int)((float)memory * memoryCutoffRatio);
@@ -77,33 +93,37 @@ public final class Utils {
 		return memory - heapLimit;
 	}
 
-	
+
 	public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
 		addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
 		for (String c: conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
 			addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
 		}
 	}
-	
-	
+
+
 	/**
 	 * 
 	 * @return Path to remote file (usually hdfs)
 	 * @throws IOException
 	 */
-	public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir)
-			throws IOException {
-		// copy to HDFS
+	public static Path setupLocalResource(
+			FileSystem fs,
+			String appId, Path localRsrcPath,
+			LocalResource appMasterJar,
+			Path homedir) throws IOException {
+
+		// copy resource to HDFS
 		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
-		
+
 		Path dst = new Path(homedir, suffix);
-		
+
 		LOG.info("Copying from " + localRsrcPath + " to " + dst);
 		fs.copyFromLocalFile(localRsrcPath, dst);
 		registerLocalResource(fs, dst, appMasterJar);
 		return dst;
 	}
-	
+
 	public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
 		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
 		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
@@ -121,7 +141,7 @@ public final class Utils {
 		obtainTokenForHBase(credentials, conf);
 		// for user
 		UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
-		
+
 		Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
 		for(Token<? extends TokenIdentifier> token : usrTok) {
 			final Text id = new Text(token.getIdentifier());

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
new file mode 100644
index 0000000..1d0afc4
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ * 
+ * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+	/** Logger */
+	protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+	/** The maximum time that TaskManagers may be waiting to register at the JobManager,
+	 * before they quit */
+	private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	/** The process environment variables */
+	private static final Map<String, String> ENV = System.getenv();
+
+	/** The exit code returned if the initialization of the application master failed */
+	private static final int INIT_ERROR_EXIT_CODE = 31;
+
+	/** The exit code returned if the process exits because a critical actor died */
+	private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+	// ------------------------------------------------------------------------
+	//  Program entry point
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The entry point for the YARN application master. 
+	 *
+	 * @param args The command line arguments.
+	 */
+	public static void main(String[] args) {
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
+		SignalHandler.register(LOG);
+
+		// run and exit with the proper return code
+		int returnCode = new YarnApplicationMasterRunner().run(args);
+		System.exit(returnCode);
+	}
+
+	/**
+	 * The instance entry point for the YARN application master. Obtains user group
+	 * information and calls the main work method {@link #runApplicationMaster()} as a
+	 * privileged action.
+	 *
+	 * @param args The command line arguments.
+	 * @return The process exit code.
+	 */
+	protected int run(String[] args) {
+		try {
+			LOG.debug("All environment variables: {}", ENV);
+
+			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+			require(yarnClientUsername != null, "YARN client user name environment variable {} not set",
+				YarnConfigKeys.ENV_CLIENT_USERNAME);
+
+			final UserGroupInformation currentUser;
+			try {
+				currentUser = UserGroupInformation.getCurrentUser();
+			} catch (Throwable t) {
+				throw new Exception("Cannot access UserGroupInformation information for current user", t);
+			}
+
+			LOG.info("YARN daemon runs as user {}. Running Flink Application Master/JobManager as user {}",
+				currentUser.getShortUserName(), yarnClientUsername);
+
+			UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+
+			// transfer all security tokens, for example for authenticated HDFS and HBase access
+			for (Token<?> token : currentUser.getTokens()) {
+				ugi.addToken(token);
+			}
+
+			// run the actual work in a secured privileged action
+			return ugi.doAs(new PrivilegedAction<Integer>() {
+				@Override
+				public Integer run() {
+					return runApplicationMaster();
+				}
+			});
+		}
+		catch (Throwable t) {
+			// make sure that everything whatever ends up in the log
+			LOG.error("YARN Application Master initialization failed", t);
+			return INIT_ERROR_EXIT_CODE;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Core work method
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The main work method, must run as a privileged action.
+	 * 
+	 * @return The return code for the Java process. 
+	 */
+	protected int runApplicationMaster() {
+		ActorSystem actorSystem = null;
+		WebMonitor webMonitor = null;
+
+		try {
+			// ------- (1) load and parse / validate all configurations -------
+
+			// loading all config values here has the advantage that the program fails fast, if any
+			// configuration problem occurs
+
+			final String currDir = ENV.get(Environment.PWD.key());
+			require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+
+			// Note that we use the "appMasterHostname" given by YARN here, to make sure
+			// we use the hostnames given by YARN consistently throughout akka.
+			// for akka "localhost" and "localhost.localdomain" are different actors.
+			final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
+			require(appMasterHostname != null,
+				"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
+
+			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
+
+			// Flink configuration
+			final Map<String, String> dynamicProperties =
+				CliFrontend.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+
+			final Configuration config = createConfiguration(currDir, dynamicProperties);
+
+			// Hadoop/Yarn configuration (loads config data automatically from classpath files)
+			final YarnConfiguration yarnConfig = new YarnConfiguration();
+
+			final int taskManagerContainerMemory;
+			final int numInitialTaskManagers;
+			final int slotsPerTaskManager;
+
+			try {
+				taskManagerContainerMemory = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY));
+			} catch (NumberFormatException e) {
+				throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_MEMORY + " : "
+					+ e.getMessage());
+			}
+			try {
+				numInitialTaskManagers = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT));
+			} catch (NumberFormatException e) {
+				throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_COUNT + " : "
+					+ e.getMessage());
+			}
+			try {
+				slotsPerTaskManager = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS));
+			} catch (NumberFormatException e) {
+				throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_SLOTS + " : "
+					+ e.getMessage());
+			}
+
+			final ContaineredTaskManagerParameters taskManagerParameters =
+				ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager);
+
+			LOG.info("TaskManagers will be created with {} task slots", taskManagerParameters.numSlots());
+			LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
+				"JVM direct memory limit {} MB",
+				taskManagerParameters.taskManagerTotalMemoryMB(),
+				taskManagerParameters.taskManagerHeapSizeMB(),
+				taskManagerParameters.taskManagerDirectMemoryLimitMB());
+
+
+			// ----------------- (2) start the actor system -------------------
+
+			// try to start the actor system, JobManager and JobManager actor system
+			// using the port range definition from the config.
+			final String amPortRange = config.getString(
+					ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+					ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+			actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange, LOG);
+
+			final String akkaHostname = AkkaUtils.getAddress(actorSystem).host().get();
+			final int akkaPort = (Integer) AkkaUtils.getAddress(actorSystem).port().get();
+
+			LOG.info("Actor system bound to hostname {}.", akkaHostname);
+
+
+			// ---- (3) Generate the configuration for the TaskManagers
+
+			final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
+					config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
+			LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+
+			final ContainerLaunchContext taskManagerContext = createTaskManagerContext(
+				config, yarnConfig, ENV,
+				taskManagerParameters, taskManagerConfig,
+				currDir, getTaskManagerClass(), LOG);
+
+
+			// ---- (4) start the actors and components in this order:
+
+			// 1) JobManager & Archive (in non-HA case, the leader service takes this)
+			// 2) Web Monitor (we need its port to register)
+			// 3) Resource Master for YARN
+			// 4) Process reapers for the JobManager and Resource Master
+
+
+			// 1: the JobManager
+			LOG.debug("Starting JobManager actor");
+
+			// we start the JobManager with its standard name
+			ActorRef jobManager = JobManager.startJobManagerActors(
+				config, actorSystem,
+				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
+				scala.Option.<String>empty(),
+				getJobManagerClass(),
+				getArchivistClass())._1();
+
+
+			// 2: the web monitor
+			LOG.debug("Starting Web Frontend");
+
+			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG);
+			final String webMonitorURL = webMonitor == null ? null :
+				"http://" + appMasterHostname + ":" + webMonitor.getServerPort();
+
+			// 3: Flink's Yarn ResourceManager
+			LOG.debug("Starting YARN Flink Resource Manager");
+
+			// we need the leader retrieval service here to be informed of new
+			// leader session IDs, even though there can be only one leader ever
+			LeaderRetrievalService leaderRetriever = 
+				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
+
+			Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
+				getResourceManagerClass(),
+				config,
+				yarnConfig,
+				leaderRetriever,
+				appMasterHostname,
+				webMonitorURL,
+				taskManagerParameters,
+				taskManagerContext,
+				numInitialTaskManagers, 
+				LOG);
+
+			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
+
+
+			// 4: Process reapers
+			// The process reapers ensure that upon unexpected actor death, the process exits
+			// and does not stay lingering around unresponsive
+
+			LOG.debug("Starting process reapers for JobManager and YARN Application Master");
+
+			actorSystem.actorOf(
+				Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
+				"YARN_Resource_Master_Process_Reaper");
+
+			actorSystem.actorOf(
+				Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE),
+				"JobManager_Process_Reaper");
+		}
+		catch (Throwable t) {
+			// make sure that everything whatever ends up in the log
+			LOG.error("YARN Application Master initialization failed", t);
+
+			if (actorSystem != null) {
+				try {
+					actorSystem.shutdown();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down actor system", tt);
+				}
+			}
+
+			if (webMonitor != null) {
+				try {
+					webMonitor.stop();
+				} catch (Throwable ignored) {
+					LOG.warn("Failed to stop the web frontend", t);
+				}
+			}
+
+			return INIT_ERROR_EXIT_CODE;
+		}
+
+		// everything started, we can wait until all is done or the process is killed
+		LOG.info("YARN Application Master started");
+
+		// wait until everything is done
+		actorSystem.awaitTermination();
+
+		// if we get here, everything work out jolly all right, and we even exited smoothly
+		if (webMonitor != null) {
+			try {
+				webMonitor.stop();
+			} catch (Throwable t) {
+				LOG.error("Failed to stop the web frontend", t);
+			}
+		}
+		return 0;
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  For testing, this allows to override the actor classes used for
+	//  JobManager and the archive of completed jobs
+	// ------------------------------------------------------------------------
+
+	protected Class<? extends YarnFlinkResourceManager> getResourceManagerClass() {
+		return YarnFlinkResourceManager.class;
+	}
+
+	protected Class<? extends JobManager> getJobManagerClass() {
+		return YarnJobManager.class;
+	}
+
+	protected Class<? extends MemoryArchivist> getArchivistClass() {
+		return MemoryArchivist.class;
+	}
+
+	protected Class<? extends TaskManager> getTaskManagerClass() {
+		return YarnTaskManager.class;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Validates a condition, throwing a RuntimeException if the condition is violated.
+	 * 
+	 * @param condition The condition.
+	 * @param message The message for the runtime exception, with format variables as defined by
+	 *                {@link String#format(String, Object...)}.
+	 * @param values The format arguments.
+	 */
+	private static void require(boolean condition, String message, Object... values) {
+		if (!condition) {
+			throw new RuntimeException(String.format(message, values));
+		}
+	}
+
+	/**
+	 * 
+	 * @param baseDirectory
+	 * @param additional
+	 * 
+	 * @return The configuration to be used by the TaskManagers.
+	 */
+	@SuppressWarnings("deprecation")
+	private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
+		LOG.info("Loading config from directory " + baseDirectory);
+
+		GlobalConfiguration.loadConfiguration(baseDirectory);
+		Configuration configuration = GlobalConfiguration.getConfiguration();
+
+		configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+
+		// add dynamic properties to JobManager configuration.
+		for (Map.Entry<String, String> property : additional.entrySet()) {
+			configuration.setString(property.getKey(), property.getValue());
+		}
+
+		// if a web monitor shall be started, set the port to random binding
+		if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+			configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		}
+
+		// if the user has set the deprecated YARN-specific config keys, we add the 
+		// corresponding generic config keys instead. that way, later code needs not
+		// deal with deprecated config keys
+
+		BootstrapTools.substituteDeprecatedConfigKey(configuration,
+			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
+			ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO);
+
+		BootstrapTools.substituteDeprecatedConfigKey(configuration,
+			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
+			ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN);
+
+		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
+			ConfigConstants.CONTAINERED_MASTER_ENV_PREFIX);
+
+		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
+			ConfigConstants.CONTAINERED_TASK_MANAGER_ENV_PREFIX);
+
+		return configuration;
+	}
+
+	/**
+	 * Creates the launch context, which describes how to bring up a TaskManager process in
+	 * an allocated YARN container.
+	 * 
+	 * <p>This code is extremely YARN specific and registers all the resources that the TaskManager
+	 * needs (such as JAR file, config file, ...) and all environment variables in a YARN
+	 * container launch context. The launch context then ensures that those resources will be
+	 * copied into the containers transient working directory. 
+	 * 
+	 * <p>We do this work before we start the ResourceManager actor in order to fail early if
+	 * any of the operations here fail.
+	 * 
+	 * @param flinkConfig
+	 *         The Flink configuration object.
+	 * @param yarnConfig
+	 *         The YARN configuration object.
+	 * @param env
+	 *         The environment variables.
+	 * @param tmParams
+	 *         The TaskManager container memory parameters. 
+	 * @param taskManagerConfig
+	 *         The configuration for the TaskManagers.
+	 * @param workingDirectory
+	 *         The current application master container's working directory. 
+	 * @param taskManagerMainClass
+	 *         The class with the main method.
+	 * @param log
+	 *         The logger.
+	 * 
+	 * @return The launch context for the TaskManager processes.
+	 * 
+	 * @throws Exception Thrown if teh launch context could not be created, for example if
+	 *                   the resources could not be copied.
+	 */
+	public static ContainerLaunchContext createTaskManagerContext(
+			Configuration flinkConfig,
+			YarnConfiguration yarnConfig,
+			Map<String, String> env,
+			ContaineredTaskManagerParameters tmParams,
+			Configuration taskManagerConfig,
+			String workingDirectory,
+			Class<?> taskManagerMainClass,
+			Logger log) throws Exception {
+
+		log.info("Setting up resources for TaskManagers");
+
+		// get and validate all relevant variables
+
+		String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
+		require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH);
+
+		String appId = env.get(YarnConfigKeys.ENV_APP_ID);
+		require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID);
+
+		String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+		require(clientHomeDir != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+
+		String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+		require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+		String yarnClientUsername = env.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+		require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_USERNAME);
+
+		// obtain a handle to the file system used by YARN
+		final org.apache.hadoop.fs.FileSystem yarnFileSystem;
+		try {
+			yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig);
+		} catch (IOException e) {
+			throw new Exception("Could not access YARN's default file system", e);
+		}
+
+		// register Flink Jar with remote HDFS
+		LocalResource flinkJar = Records.newRecord(LocalResource.class);
+		{
+			Path remoteJarPath = new Path(remoteFlinkJarPath);
+			Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar);
+		}
+
+		// register conf with local fs
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+		{
+			// write the TaskManager configuration to a local file
+			final File taskManagerConfigFile = 
+				new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
+			LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
+			BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
+
+			Utils.setupLocalResource(yarnFileSystem, appId, 
+				new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+			log.info("Prepared local resource for modified yaml: {}", flinkConf);
+		}
+
+		Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
+		taskManagerLocalResources.put("flink.jar", flinkJar);
+		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+		// prepare additional files to be shipped
+		for (String pathStr : shipListString.split(",")) {
+			if (!pathStr.isEmpty()) {
+				LocalResource resource = Records.newRecord(LocalResource.class);
+				Path path = new Path(pathStr);
+				Utils.registerLocalResource(yarnFileSystem, path, resource);
+				taskManagerLocalResources.put(path.getName(), resource);
+			}
+		}
+
+		// now that all resources are prepared, we can create the launch context
+
+		log.info("Creating container launch context for TaskManagers");
+
+		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
+		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+
+		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
+			flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+			hasLogback, hasLog4j, taskManagerMainClass);
+
+		log.info("Starting TaskManagers with command: " + launchCommand);
+
+		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+		ctx.setCommands(Collections.singletonList(launchCommand));
+		ctx.setLocalResources(taskManagerLocalResources);
+
+		Map<String, String> containerEnv = new HashMap<>();
+		containerEnv.putAll(tmParams.taskManagerEnv());
+
+		// add YARN classpath, etc to the container environment
+		Utils.setupEnv(yarnConfig, containerEnv);
+		containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, yarnClientUsername);
+
+		ctx.setEnvironment(containerEnv);
+
+		try {
+			UserGroupInformation user = UserGroupInformation.getCurrentUser();
+			Credentials credentials = user.getCredentials();
+			DataOutputBuffer dob = new DataOutputBuffer();
+			credentials.writeTokenStorageToStream(dob);
+			ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+			ctx.setTokens(securityTokens);
+		}
+		catch (Throwable t) {
+			log.error("Getting current user info failed when trying to launch the container", t);
+		}
+
+		return ctx;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
new file mode 100644
index 0000000..7dc86a5
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+/**
+ * The Yarn environment variables used for settings of the containers.
+ */
+public class YarnConfigKeys {
+
+	// ------------------------------------------------------------------------
+	//  Environment variable names
+	// ------------------------------------------------------------------------
+
+	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+	public final static String ENV_APP_ID = "_APP_ID";
+	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+	public static final String ENV_SLOTS = "_SLOTS";
+	public static final String ENV_DETACHED = "_DETACHED";
+	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
+
+	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+
+
+	// ------------------------------------------------------------------------
+
+	/** Private constructor to prevent instantiation */
+	private YarnConfigKeys() {}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4405235e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
new file mode 100644
index 0000000..87020db
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class describes a container in which a TaskManager is being launched (or
+ * has been launched) but where the TaskManager has not properly registered, yet.
+ */
+public class YarnContainerInLaunch {
+	
+	private final Container container;
+	
+	private final long timestamp;
+	
+	public YarnContainerInLaunch(Container container, long timestamp) {
+		this.container = requireNonNull(container);
+		this.timestamp = timestamp;
+	}
+	
+	// ------------------------------------------------------------------------
+
+	public Container container() {
+		return container;
+	}
+
+	public long timestamp() {
+		return timestamp;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "ContainerInLaunch @ " + timestamp + ": " + container;
+	}
+}


Mime
View raw message