flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [4/9] flink git commit: [FLINK-1984] Mesos ResourceManager - T1 milestone
Date Mon, 29 Aug 2016 15:33:24 GMT
[FLINK-1984] Mesos ResourceManager - T1 milestone

Implemented Mesos AppMaster including:
- runners for AppMaster and TaskManager
- MesosFlinkResourceManager as a Mesos framework
- ZK persistent storage for Mesos tasks
- reusable scheduler actors for:
  - offer handling using Netflix Fenzo (LaunchCoordinator)
  - reconciliation (ReconciliationCoordinator)
  - task monitoring (TaskMonitor)
  - connection monitoring (ConnectionMonitor)
- lightweight HTTP server to serve artifacts to the Mesos fetcher (ArtifactServer)
- scenario-based logging for:
  - connectivity issues
  - offer handling (receive, process, decline, rescind, accept)
- incorporated FLINK-4152, FLINK-3904, FLINK-4141, FLINK-3675, FLINK-4166


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9b2be05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9b2be05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9b2be05

Branch: refs/heads/master
Commit: d9b2be054f7dadf902d74622352e3ec8dfdcd584
Parents: 578e80e
Author: wrighe3 <eron.wright@emc.com>
Authored: Thu Jul 14 00:12:49 2016 -0700
Committer: Maximilian Michels <mxm@apache.org>
Committed: Mon Aug 29 17:27:10 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  76 ++
 flink-dist/pom.xml                              |   7 +-
 flink-mesos/pom.xml                             | 294 ++++++++
 .../main/java/org/apache/flink/mesos/Utils.java |  49 ++
 .../flink/mesos/cli/FlinkMesosSessionCli.java   |  41 ++
 .../clusterframework/LaunchableMesosWorker.java | 187 +++++
 .../MesosApplicationMasterRunner.java           | 600 +++++++++++++++
 .../clusterframework/MesosConfigKeys.java       |  26 +
 .../MesosFlinkResourceManager.java              | 737 +++++++++++++++++++
 .../MesosTaskManagerParameters.java             |  51 ++
 .../MesosTaskManagerRunner.java                 |  99 +++
 .../RegisteredMesosWorkerNode.scala             |  15 +
 .../store/MesosWorkerStore.java                 | 134 ++++
 .../store/StandaloneMesosWorkerStore.java       |  69 ++
 .../store/ZooKeeperMesosWorkerStore.java        | 272 +++++++
 .../flink/mesos/scheduler/LaunchableTask.java   |  24 +
 .../flink/mesos/scheduler/SchedulerProxy.java   |  87 +++
 .../mesos/scheduler/TaskSchedulerBuilder.java   |  16 +
 .../mesos/scheduler/messages/AcceptOffers.java  |  56 ++
 .../mesos/scheduler/messages/Connected.java     |   8 +
 .../mesos/scheduler/messages/Disconnected.java  |  12 +
 .../flink/mesos/scheduler/messages/Error.java   |  24 +
 .../scheduler/messages/OfferRescinded.java      |  26 +
 .../mesos/scheduler/messages/ReRegistered.java  |  30 +
 .../mesos/scheduler/messages/Registered.java    |  39 +
 .../scheduler/messages/ResourceOffers.java      |  30 +
 .../mesos/scheduler/messages/SlaveLost.java     |  26 +
 .../mesos/scheduler/messages/StatusUpdate.java  |  27 +
 .../flink/mesos/util/MesosArtifactServer.java   | 286 +++++++
 .../flink/mesos/util/MesosConfiguration.java    | 108 +++
 .../apache/flink/mesos/util/ZooKeeperUtils.java |  22 +
 flink-mesos/src/main/resources/log4j.properties |  27 +
 .../clusterframework/MesosJobManager.scala      |  66 ++
 .../clusterframework/MesosTaskManager.scala     |  47 ++
 .../mesos/scheduler/ConnectionMonitor.scala     | 108 +++
 .../mesos/scheduler/LaunchCoordinator.scala     | 331 +++++++++
 .../scheduler/ReconciliationCoordinator.scala   | 164 +++++
 .../flink/mesos/scheduler/TaskMonitor.scala     | 240 ++++++
 .../apache/flink/mesos/scheduler/Tasks.scala    |  96 +++
 .../ContaineredJobManager.scala                 | 174 +++++
 .../MesosFlinkResourceManagerTest.java          | 697 ++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  32 +
 flink-mesos/src/test/resources/logback-test.xml |  37 +
 .../scala/org/apache/flink/mesos/Utils.scala    |  34 +
 .../mesos/scheduler/LaunchCoordinatorTest.scala | 421 +++++++++++
 .../ReconciliationCoordinatorTest.scala         | 214 ++++++
 .../flink/mesos/scheduler/TaskMonitorTest.scala | 237 ++++++
 .../org/apache/flink/runtime/akka/FSMSpec.scala |  40 +
 .../flink/runtime/util/ZooKeeperUtils.java      |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 pom.xml                                         |   1 +
 51 files changed, 6446 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 514c730..2fe27e0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -426,6 +426,60 @@ public final class ConfigConstants {
 	public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
 
 
+	// ------------------------ Mesos Configuration ------------------------
+
+	/**
+	 * The maximum number of failed Mesos tasks before entirely stopping
+	 * the Mesos session / job on Mesos.
+	 *
+	 * By default, we take the number of of initially requested tasks.
+	 */
+	public static final String MESOS_MAX_FAILED_TASKS = "mesos.maximum-failed-tasks";
+
+	/**
+	 * The Mesos master URL.
+	 *
+	 * The value should be in one of the following forms:
+	 * <pre>
+	 * {@code
+	 *     host:port
+	 *     zk://host1:port1,host2:port2,.../path
+	 *     zk://username:password@host1:port1,host2:port2,.../path
+	 *     file:///path/to/file (where file contains one of the above)
+	 * }
+	 * </pre>
+	 *
+	 */
+	public static final String MESOS_MASTER_URL = "mesos.master";
+
+	/**
+	 * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
+	 *
+	 * The default value is 600 (seconds).
+	 */
+	public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = "mesos.failover-timeout";
+
+	/**
+	 * The config parameter defining the Mesos artifact server port to use.
+	 * Setting the port to 0 will let the OS choose an available port.
+	 */
+	public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = "mesos.resourcemanager.artifactserver.port";
+
+	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "mesos.resourcemanager.framework.name";
+
+	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "mesos.resourcemanager.framework.role";
+
+	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = "mesos.resourcemanager.framework.principal";
+
+	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret";
+
+	/**
+	 * The cpus to acquire from Mesos.
+	 *
+	 * By default, we use the number of requested task slots.
+	 */
+	public static final String MESOS_RESOURCEMANAGER_TASKS_CPUS = "mesos.resourcemanager.tasks.cpus";
+
 	// ------------------------ Hadoop Configuration ------------------------
 
 	/**
@@ -736,6 +790,9 @@ public final class ConfigConstants {
 	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";
 
+	/** ZooKeeper root path (ZNode) for Mesos workers. */
+	public static final String ZOOKEEPER_MESOS_WORKERS_PATH = "recovery.zookeeper.path.mesos-workers";
+
 	/** Deprecated in favour of {@link #HA_ZOOKEEPER_SESSION_TIMEOUT}. */
 	@Deprecated
 	public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";
@@ -983,6 +1040,23 @@ public final class ConfigConstants {
 	 */
 	public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";
 
+	// ------ Mesos-Specific Configuration ------
+
+	/** The default failover timeout provided to Mesos (10 mins) */
+	public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60;
+
+	/**
+	 * The default network port to listen on for the Mesos artifact server.
+	 */
+	public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0;
+
+	/**
+	 * The default Mesos framework name for the ResourceManager to use.
+	 */
+	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "Flink";
+
+	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
+
 	// ------------------------ File System Behavior ------------------------
 
 	/**
@@ -1131,6 +1205,8 @@ public final class ConfigConstants {
 
 	public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";
 
+	public static final String DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH = "/mesos-workers";
+
 	public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
 
 	public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 539ca8e..ec84adc 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -119,8 +119,13 @@ under the License.
 			<artifactId>flink-metrics-jmx</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+        
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-mesos_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 		
-
 	</dependencies>
 
 	<!-- See main pom.xml for explanation of profiles -->

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
new file mode 100644
index 0000000..c344ab2
--- /dev/null
+++ b/flink-mesos/pom.xml
@@ -0,0 +1,294 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+	
+	<artifactId>flink-mesos_2.10</artifactId>
+	<name>flink-mesos</name>
+	<packaging>jar</packaging>
+
+    <properties>
+        <mesos.version>0.27.1</mesos.version>
+    </properties>
+	
+    <dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>hadoop-core</artifactId>
+					<groupId>org.apache.hadoop</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>${shading-artifact.name}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-actor_${scala.binary.version}</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+		</dependency>
+
+		<!--<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-camel_${scala.binary.version}</artifactId>
+		</dependency>-->
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-curator-recipes</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- ===================================================
+        Dependencies for Mesos
+		=================================================== -->
+
+		<dependency>
+			<groupId>org.apache.mesos</groupId>
+			<artifactId>mesos</artifactId>
+			<version>${mesos.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.netflix.fenzo</groupId>
+			<artifactId>fenzo-core</artifactId>
+			<version>0.9.3</version>
+		</dependency>
+
+		<dependency>
+			<groupId>tv.cntt</groupId>
+			<artifactId>netty-router</artifactId>
+			<version>1.10</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.scalatest</groupId>
+			<artifactId>scalatest_${scala.binary.version}</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Code Style, most of the configuration done via plugin management -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<configuration>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+				</configuration>
+			</plugin>
+
+			<!-- Relocate curator -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<include>org.apache.flink:flink-shaded-curator-recipes</include>
+								</includes>
+							</artifactSet>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>org.apache.curator</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
new file mode 100644
index 0000000..2509465
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -0,0 +1,49 @@
+package org.apache.flink.mesos;
+
+import org.apache.mesos.Protos;
+
+import java.net.URL;
+import java.util.Arrays;
+
+public class Utils {
+	/**
+	 * Construct a Mesos environment variable.
+     */
+	public static Protos.Environment.Variable variable(String name, String value) {
+		return Protos.Environment.Variable.newBuilder()
+			.setName(name)
+			.setValue(value)
+			.build();
+	}
+
+	/**
+	 * Construct a Mesos URI.
+     */
+	public static Protos.CommandInfo.URI uri(URL url, boolean cacheable) {
+		return Protos.CommandInfo.URI.newBuilder()
+			.setValue(url.toExternalForm())
+			.setExtract(false)
+			.setCache(cacheable)
+			.build();
+	}
+
+	public static Protos.Resource scalar(String name, double value) {
+		return Protos.Resource.newBuilder()
+			.setName(name)
+			.setType(Protos.Value.Type.SCALAR)
+			.setScalar(Protos.Value.Scalar.newBuilder().setValue(value))
+			.build();
+	}
+
+	public static Protos.Value.Range range(long begin, long end) {
+		return Protos.Value.Range.newBuilder().setBegin(begin).setEnd(end).build();
+	}
+
+	public static Protos.Resource ranges(String name, Protos.Value.Range... ranges) {
+		return Protos.Resource.newBuilder()
+			.setName(name)
+			.setType(Protos.Value.Type.RANGES)
+			.setRanges(Protos.Value.Ranges.newBuilder().addAllRange(Arrays.asList(ranges)).build())
+			.build();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
new file mode 100644
index 0000000..b767344
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
@@ -0,0 +1,41 @@
+package org.apache.flink.mesos.cli;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FlinkMesosSessionCli {
+
+	private static final ObjectMapper mapper = new ObjectMapper();
+
+	public static Configuration decodeDynamicProperties(String dynamicPropertiesEncoded) {
+		try {
+			Configuration configuration = new Configuration();
+			if(dynamicPropertiesEncoded != null) {
+				TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {};
+				Map<String,String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
+				for (Map.Entry<String, String> property : props.entrySet()) {
+					configuration.setString(property.getKey(), property.getValue());
+				}
+			}
+			return configuration;
+		}
+		catch(IOException ex) {
+			throw new IllegalArgumentException("unreadable encoded properties", ex);
+		}
+	}
+
+	public static String encodeDynamicProperties(Configuration configuration) {
+		try {
+			String dynamicPropertiesEncoded = mapper.writeValueAsString(configuration.toMap());
+			return dynamicPropertiesEncoded;
+		}
+		catch (JsonProcessingException ex) {
+			throw new IllegalArgumentException("unwritable properties", ex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
new file mode 100644
index 0000000..8abd79a
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -0,0 +1,187 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+	/**
+	 * The set of configuration keys to be dynamically configured with a port allocated from Mesos.
+	 */
+	private static String[] TM_PORT_KEYS = {
+		"taskmanager.rpc.port",
+		"taskmanager.data.port" };
+
+	private final MesosTaskManagerParameters params;
+	private final Protos.TaskInfo.Builder template;
+	private final Protos.TaskID taskID;
+	private final Request taskRequest;
+
+	/**
+	 * Construct a launchable Mesos worker.
+	 * @param params the TM parameters such as memory, cpu to acquire.
+	 * @param template a template for the TaskInfo to be constructed at launch time.
+	 * @param taskID the taskID for this worker.
+     */
+	public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+		this.params = params;
+		this.template = template;
+		this.taskID = taskID;
+		this.taskRequest = new Request();
+	}
+
+	public Protos.TaskID taskID() {
+		return taskID;
+	}
+
+	@Override
+	public TaskRequest taskRequest() {
+		return taskRequest;
+	}
+
+	class Request implements TaskRequest {
+		private final AtomicReference<TaskRequest.AssignedResources> assignedResources = new AtomicReference<>();
+
+		@Override
+		public String getId() {
+			return taskID.getValue();
+		}
+
+		@Override
+		public String taskGroupName() {
+			return "";
+		}
+
+		@Override
+		public double getCPUs() {
+			return params.cpus();
+		}
+
+		@Override
+		public double getMemory() {
+			return params.containeredParameters().taskManagerTotalMemoryMB();
+		}
+
+		@Override
+		public double getNetworkMbps() {
+			return 0.0;
+		}
+
+		@Override
+		public double getDisk() {
+			return 0.0;
+		}
+
+		@Override
+		public int getPorts() {
+			return TM_PORT_KEYS.length;
+		}
+
+		@Override
+		public Map<String, NamedResourceSetRequest> getCustomNamedResources() {
+			return Collections.emptyMap();
+		}
+
+		@Override
+		public List<? extends ConstraintEvaluator> getHardConstraints() {
+			return null;
+		}
+
+		@Override
+		public List<? extends VMTaskFitnessCalculator> getSoftConstraints() {
+			return null;
+		}
+
+		@Override
+		public void setAssignedResources(AssignedResources assignedResources) {
+			this.assignedResources.set(assignedResources);
+		}
+
+		@Override
+		public AssignedResources getAssignedResources() {
+			return assignedResources.get();
+		}
+
+		@Override
+		public String toString() {
+			return "Request{" +
+				"cpus=" + getCPUs() +
+				"memory=" + getMemory() +
+				'}';
+		}
+	}
+
+	/**
+	 * Construct the TaskInfo needed to launch the worker.
+	 * @param slaveId the assigned slave.
+	 * @param assignment the assignment details.
+     * @return a fully-baked TaskInfo.
+     */
+	@Override
+	public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assignment) {
+
+		final Configuration dynamicProperties = new Configuration();
+
+		// specialize the TaskInfo template with assigned resources, environment variables, etc
+		final Protos.TaskInfo.Builder taskInfo = template
+			.clone()
+			.setSlaveId(slaveId)
+			.setTaskId(taskID)
+			.setName(taskID.getValue())
+			.addResources(scalar("cpus", assignment.getRequest().getCPUs()))
+			.addResources(scalar("mem", assignment.getRequest().getMemory()));
+		//.addResources(scalar("disk", assignment.getRequest.getDisk).setRole("Flink"))
+
+		// use the assigned ports for the TM
+		if (assignment.getAssignedPorts().size() != TM_PORT_KEYS.length) {
+			throw new IllegalArgumentException("unsufficient # of ports assigned");
+		}
+		for (int i = 0; i < TM_PORT_KEYS.length; i++) {
+			int port = assignment.getAssignedPorts().get(i);
+			String key = TM_PORT_KEYS[i];
+			taskInfo.addResources(ranges("ports", range(port, port)));
+			dynamicProperties.setInteger(key, port);
+		}
+
+		// finalize environment variables
+		final Protos.Environment.Builder environmentBuilder = taskInfo.getCommandBuilder().getEnvironmentBuilder();
+
+		// propagate the Mesos task ID to the TM
+		environmentBuilder
+			.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
+
+		// propagate the dynamic configuration properties to the TM
+		String dynamicPropertiesEncoded = FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties);
+		environmentBuilder
+			.addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded));
+
+		return taskInfo.build();
+	}
+
+	@Override
+	public String toString() {
+		return "LaunchableMesosWorker{" +
+			"taskID=" + taskID +
+			"taskRequest=" + taskRequest +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
new file mode 100644
index 0000000..30f2258
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -0,0 +1,600 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application Master.
+ * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+	/** Logger */
+	protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+	/** The maximum time that TaskManagers may be waiting to register at the JobManager,
+	 * before they quit */
+	private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	/** The process environment variables */
+	private static final Map<String, String> ENV = System.getenv();
+
+	/** The exit code returned if the initialization of the application master failed */
+	private static final int INIT_ERROR_EXIT_CODE = 31;
+
+	/** The exit code returned if the process exits because a critical actor died */
+	private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+	// ------------------------------------------------------------------------
+	//  Program entry point
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The entry point for the Mesos AppMaster.
+	 *
+	 * @param args The command line arguments.
+	 */
+	public static void main(String[] args) {
+		EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos AppMaster", args);
+		SignalHandler.register(LOG);
+
+		// run and exit with the proper return code
+		int returnCode = new MesosApplicationMasterRunner().run(args);
+		System.exit(returnCode);
+	}
+
+	/**
+	 * The instance entry point for the Mesos AppMaster. Obtains user group
+	 * information and calls the main work method {@link #runPrivileged()} as a
+	 * privileged action.
+	 *
+	 * @param args The command line arguments.
+	 * @return The process exit code.
+	 */
+	protected int run(String[] args) {
+		try {
+			LOG.debug("All environment variables: {}", ENV);
+
+			final UserGroupInformation currentUser;
+			try {
+				currentUser = UserGroupInformation.getCurrentUser();
+			} catch (Throwable t) {
+				throw new Exception("Cannot access UserGroupInformation information for current user", t);
+			}
+
+			LOG.info("Running Flink as user {}", currentUser.getShortUserName());
+
+			// run the actual work in a secured privileged action
+			return currentUser.doAs(new PrivilegedAction<Integer>() {
+				@Override
+				public Integer run() {
+					return runPrivileged();
+				}
+			});
+		}
+		catch (Throwable t) {
+			// make sure that everything whatever ends up in the log
+			LOG.error("Mesos AppMaster initialization failed", t);
+			return INIT_ERROR_EXIT_CODE;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Core work method
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The main work method, must run as a privileged action.
+	 *
+	 * @return The return code for the Java process.
+	 */
+	protected int runPrivileged() {
+
+		ActorSystem actorSystem = null;
+		WebMonitor webMonitor = null;
+		MesosArtifactServer artifactServer = null;
+
+		try {
+			// ------- (1) load and parse / validate all configurations -------
+
+			// loading all config values here has the advantage that the program fails fast, if any
+			// configuration problem occurs
+
+			final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
+			require(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+
+			final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
+			require(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
+
+			// Note that we use the "appMasterHostname" given by the system, to make sure
+			// we use the hostnames consistently throughout akka.
+			// for akka "localhost" and "localhost.localdomain" are different actors.
+			final String appMasterHostname = InetAddress.getLocalHost().getHostName();
+
+			// Flink configuration
+			final Configuration dynamicProperties =
+				FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+
+			final Configuration config = createConfiguration(workingDir, dynamicProperties);
+
+			// Mesos configuration
+			final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
+
+			// environment values related to TM
+			final int taskManagerContainerMemory;
+			final int numInitialTaskManagers;
+			final int slotsPerTaskManager;
+
+			try {
+				taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
+			} catch (NumberFormatException e) {
+				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY + " : "
+					+ e.getMessage());
+			}
+			try {
+				numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
+			} catch (NumberFormatException e) {
+				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT + " : "
+					+ e.getMessage());
+			}
+			try {
+				slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
+			} catch (NumberFormatException e) {
+				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " : "
+					+ e.getMessage());
+			}
+
+			final ContaineredTaskManagerParameters containeredParameters =
+				ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager);
+
+			final MesosTaskManagerParameters taskManagerParameters =
+				MesosTaskManagerParameters.create(config, containeredParameters);
+
+			LOG.info("TaskManagers will be created with {} task slots",
+				taskManagerParameters.containeredParameters().numSlots());
+			LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
+					"JVM direct memory limit {} MB, {} cpus",
+				taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
+				taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
+				taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
+				taskManagerParameters.cpus());
+
+			// JM endpoint, which should be explicitly configured by the dispatcher (based on acquired net resources)
+			final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+			require(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" +
+				ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" is invalid, it must be between 0 and 65536");
+
+			// ----------------- (2) start the actor system -------------------
+
+			// try to start the actor system, JobManager and JobManager actor system
+			// using the configured address and ports
+			actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, listeningPort, LOG);
+
+			final String akkaHostname = AkkaUtils.getAddress(actorSystem).host().get();
+			final int akkaPort = (Integer) AkkaUtils.getAddress(actorSystem).port().get();
+
+			LOG.info("Actor system bound to hostname {}.", akkaHostname);
+
+			// try to start the artifact server
+			LOG.debug("Starting Artifact Server");
+			final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
+				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
+			artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort);
+
+			// ----------------- (3) Generate the configuration for the TaskManagers -------------------
+
+			final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
+				config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
+			LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+
+			final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext(
+				config, mesosConfig, ENV,
+				taskManagerParameters, taskManagerConfig,
+				workingDir, getTaskManagerClass(), artifactServer, LOG);
+
+			// ----------------- (4) start the actors -------------------
+
+			// 1) JobManager & Archive (in non-HA case, the leader service takes this)
+			// 2) Web Monitor (we need its port to register)
+			// 3) Resource Master for Mesos
+			// 4) Process reapers for the JobManager and Resource Master
+
+			// 1: the JobManager
+			LOG.debug("Starting JobManager actor");
+
+			// we start the JobManager with its standard name
+			ActorRef jobManager = JobManager.startJobManagerActors(
+				config, actorSystem,
+				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
+				scala.Option.<String>empty(),
+				getJobManagerClass(),
+				getArchivistClass())._1();
+
+
+			// 2: the web monitor
+			LOG.debug("Starting Web Frontend");
+
+			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG);
+			if(webMonitor != null) {
+				final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
+				mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
+			}
+
+			// 3: Flink's Mesos ResourceManager
+			LOG.debug("Starting Mesos Flink Resource Manager");
+
+			// create the worker store to persist task information across restarts
+			MesosWorkerStore workerStore = createWorkerStore(config);
+
+			// we need the leader retrieval service here to be informed of new
+			// leader session IDs, even though there can be only one leader ever
+			LeaderRetrievalService leaderRetriever =
+				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
+
+			Props resourceMasterProps = MesosFlinkResourceManager.createActorProps(
+				getResourceManagerClass(),
+				config,
+				mesosConfig,
+				workerStore,
+				leaderRetriever,
+				taskManagerParameters,
+				taskManagerContext,
+				numInitialTaskManagers,
+				LOG);
+
+			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
+
+
+			// 4: Process reapers
+			// The process reapers ensure that upon unexpected actor death, the process exits
+			// and does not stay lingering around unresponsive
+
+			LOG.debug("Starting process reapers for JobManager");
+
+			actorSystem.actorOf(
+				Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
+				"Mesos_Resource_Master_Process_Reaper");
+
+			actorSystem.actorOf(
+				Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE),
+				"JobManager_Process_Reaper");
+		}
+		catch (Throwable t) {
+			// make sure that everything whatever ends up in the log
+			LOG.error("Mesos JobManager initialization failed", t);
+
+			if (actorSystem != null) {
+				try {
+					actorSystem.shutdown();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down actor system", tt);
+				}
+			}
+
+			if (webMonitor != null) {
+				try {
+					webMonitor.stop();
+				} catch (Throwable ignored) {
+					LOG.warn("Failed to stop the web frontend", ignored);
+				}
+			}
+
+			if(artifactServer != null) {
+				try {
+					artifactServer.stop();
+				} catch (Throwable ignored) {
+					LOG.error("Failed to stop the artifact server", ignored);
+				}
+			}
+
+			return INIT_ERROR_EXIT_CODE;
+		}
+
+		// everything started, we can wait until all is done or the process is killed
+		LOG.info("Mesos JobManager started");
+
+		// wait until everything is done
+		actorSystem.awaitTermination();
+
+		// if we get here, everything work out jolly all right, and we even exited smoothly
+		if (webMonitor != null) {
+			try {
+				webMonitor.stop();
+			} catch (Throwable t) {
+				LOG.error("Failed to stop the web frontend", t);
+			}
+		}
+
+		try {
+			artifactServer.stop();
+		} catch (Throwable t) {
+			LOG.error("Failed to stop the artifact server", t);
+		}
+
+		return 0;
+	}
+
+	// ------------------------------------------------------------------------
+	//  For testing, this allows to override the actor classes used for
+	//  JobManager and the archive of completed jobs
+	// ------------------------------------------------------------------------
+
+	protected Class<? extends MesosFlinkResourceManager> getResourceManagerClass() {
+		return MesosFlinkResourceManager.class;
+	}
+
+	protected Class<? extends JobManager> getJobManagerClass() {
+		return MesosJobManager.class;
+	}
+
+	protected Class<? extends MemoryArchivist> getArchivistClass() {
+		return MemoryArchivist.class;
+	}
+
+	protected Class<? extends TaskManager> getTaskManagerClass() {
+		return MesosTaskManager.class;
+	}
+
+	/**
+	 * Validates a condition, throwing a RuntimeException if the condition is violated.
+	 *
+	 * @param condition The condition.
+	 * @param message The message for the runtime exception, with format variables as defined by
+	 *                {@link String#format(String, Object...)}.
+	 * @param values The format arguments.
+	 */
+	private static void require(boolean condition, String message, Object... values) {
+		if (!condition) {
+			throw new RuntimeException(String.format(message, values));
+		}
+	}
+
+	/**
+	 *
+	 * @param baseDirectory
+	 * @param additional
+	 *
+	 * @return The configuration to be used by the TaskManagers.
+	 */
+	@SuppressWarnings("deprecation")
+	private static Configuration createConfiguration(String baseDirectory, Configuration additional) {
+		LOG.info("Loading config from directory " + baseDirectory);
+
+		Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
+
+		configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+
+		// add dynamic properties to JobManager configuration.
+		configuration.addAll(additional);
+
+		return configuration;
+	}
+
+	/**
+	 * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration.
+	 */
+	public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) {
+
+		Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
+			.setUser("")
+			.setHostname(hostname);
+		Protos.Credential.Builder credential = null;
+
+		if(!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
+			throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
+		}
+		String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
+
+		Duration failoverTimeout = FiniteDuration.apply(
+			flinkConfig.getInteger(
+				ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS,
+				ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS),
+			TimeUnit.SECONDS);
+		frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+		frameworkInfo.setName(flinkConfig.getString(
+			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME,
+			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME));
+
+		frameworkInfo.setRole(flinkConfig.getString(
+			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
+			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+		if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+			frameworkInfo.setPrincipal(flinkConfig.getString(
+				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
+
+			credential = Protos.Credential.newBuilder();
+			credential.setPrincipal(frameworkInfo.getPrincipal());
+
+			if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+				throw new IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET + " must be configured.");
+			}
+			credential.setSecret(flinkConfig.getString(
+				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
+		}
+
+		MesosConfiguration mesos =
+			new MesosConfiguration(masterUrl, frameworkInfo, Option.apply(credential));
+
+		return mesos;
+	}
+
+	private MesosWorkerStore createWorkerStore(Configuration flinkConfig) throws Exception {
+		MesosWorkerStore workerStore;
+		RecoveryMode recoveryMode = RecoveryMode.fromConfig(flinkConfig);
+		if (recoveryMode == RecoveryMode.STANDALONE) {
+			workerStore = new StandaloneMesosWorkerStore();
+		}
+		else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
+			// note: the store is responsible for closing the client.
+			CuratorFramework client = ZooKeeperUtils.startCuratorFramework(flinkConfig);
+			workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig);
+		}
+		else {
+			throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode + ".");
+		}
+
+		return workerStore;
+	}
+
+	/**
+	 * Creates a Mesos task info template, which describes how to bring up a TaskManager process as
+	 * a Mesos task.
+	 *
+	 * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
+	 * needs (such as JAR file, config file, ...) and all environment variables in a task info record.
+	 * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
+	 * A lightweight HTTP server serves the artifacts to the fetcher.
+	 *
+	 * <p>We do this work before we start the ResourceManager actor in order to fail early if
+	 * any of the operations here fail.
+	 *
+	 * @param flinkConfig
+	 *         The Flink configuration object.
+	 * @param mesosConfig
+	 *         The Mesos configuration object.
+	 * @param env
+	 *         The environment variables.
+	 * @param tmParams
+	 *         The TaskManager container memory parameters.
+	 * @param taskManagerConfig
+	 *         The configuration for the TaskManagers.
+	 * @param workingDirectory
+	 *         The current application master container's working directory.
+	 * @param taskManagerMainClass
+	 *         The class with the main method.
+	 * @param artifactServer
+	 *         The artifact server.
+	 * @param log
+	 *         The logger.
+	 *
+	 * @return The task info template for the TaskManager processes.
+	 *
+	 * @throws Exception Thrown if the task info could not be created, for example if
+	 *                   the resources could not be copied.
+	 */
+	public static Protos.TaskInfo.Builder createTaskManagerContext(
+		Configuration flinkConfig,
+		MesosConfiguration mesosConfig,
+		Map<String, String> env,
+		MesosTaskManagerParameters tmParams,
+		Configuration taskManagerConfig,
+		String workingDirectory,
+		Class<?> taskManagerMainClass,
+		MesosArtifactServer artifactServer,
+		Logger log) throws Exception {
+
+
+		Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder();
+		Protos.CommandInfo.Builder cmd = Protos.CommandInfo.newBuilder();
+
+		log.info("Setting up artifacts for TaskManagers");
+
+		String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
+		require(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+		String clientUsername = env.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+		require(clientUsername != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_USERNAME);
+
+		String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
+		require(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
+
+		// register the Flink jar
+		final File flinkJarFile = new File(workingDirectory, "flink.jar");
+		cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true));
+
+		// register the TaskManager configuration
+		final File taskManagerConfigFile =
+			new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
+		LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
+		BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
+		cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, GlobalConfiguration.FLINK_CONF_FILENAME), true));
+
+		// prepare additional files to be shipped
+		for (String pathStr : shipListString.split(",")) {
+			if (!pathStr.isEmpty()) {
+				File shipFile = new File(workingDirectory, pathStr);
+				cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true));
+			}
+		}
+
+		log.info("Creating task info for TaskManagers");
+
+		// build the launch command
+		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
+		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+
+		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
+			flinkConfig, tmParams.containeredParameters(), ".", ".",
+			hasLogback, hasLog4j, taskManagerMainClass);
+		cmd.setValue(launchCommand);
+
+		// build the environment variables
+		Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder();
+		for (Map.Entry<String, String> entry : tmParams.containeredParameters().taskManagerEnv().entrySet()) {
+			envBuilder.addVariables(variable(entry.getKey(), entry.getValue()));
+		}
+		envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString));
+		envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLIENT_USERNAME, clientUsername));
+
+		cmd.setEnvironment(envBuilder);
+
+		info.setCommand(cmd);
+
+		return info;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
new file mode 100644
index 0000000..3173286
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -0,0 +1,26 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+/**
+ * The Mesos environment variables used for settings of the containers.
+ */
+public class MesosConfigKeys {
+	// ------------------------------------------------------------------------
+	//  Environment variable names
+	// ------------------------------------------------------------------------
+
+	public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+	public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+	public static final String ENV_SLOTS = "_SLOTS";
+	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
+	public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+	public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
+	public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
+	public static final String ENV_CLASSPATH = "CLASSPATH";
+	public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
+	public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
+
+	/** Private constructor to prevent instantiation */
+	private MesosConfigKeys() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
new file mode 100644
index 0000000..483c7b7
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -0,0 +1,737 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> {
+
+	/** The Mesos configuration (master and framework info) */
+	private final MesosConfiguration mesosConfig;
+
+	/** The TaskManager container parameters (like container memory size) */
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	/** Context information used to start a TaskManager Java process */
+	private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+	/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
+	private final int maxFailedTasks;
+
+	/** Callback handler for the asynchronous Mesos scheduler */
+	private SchedulerProxy schedulerCallbackHandler;
+
+	/** Mesos scheduler driver */
+	private SchedulerDriver schedulerDriver;
+
+	private ActorRef connectionMonitor;
+
+	private ActorRef taskRouter;
+
+	private ActorRef launchCoordinator;
+
+	private ActorRef reconciliationCoordinator;
+
+	private MesosWorkerStore workerStore;
+
+	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
+	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
+
+	/** The number of failed tasks since the master became active */
+	private int failedTasksSoFar;
+
+	public MesosFlinkResourceManager(
+		Configuration flinkConfig,
+		MesosConfiguration mesosConfig,
+		MesosWorkerStore workerStore,
+		LeaderRetrievalService leaderRetrievalService,
+		MesosTaskManagerParameters taskManagerParameters,
+		Protos.TaskInfo.Builder taskManagerLaunchContext,
+		int maxFailedTasks,
+		int numInitialTaskManagers) {
+
+		super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
+
+		this.mesosConfig = requireNonNull(mesosConfig);
+
+		this.workerStore = requireNonNull(workerStore);
+
+		this.taskManagerParameters = requireNonNull(taskManagerParameters);
+		this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext);
+		this.maxFailedTasks = maxFailedTasks;
+
+		this.workersInNew = new HashMap<>();
+		this.workersInLaunch = new HashMap<>();
+		this.workersBeingReturned = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mesos-specific behavior
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void initialize() throws Exception {
+		LOG.info("Initializing Mesos resource master");
+
+		workerStore.start();
+
+		// create the scheduler driver to communicate with Mesos
+		schedulerCallbackHandler = new SchedulerProxy(self());
+
+		// register with Mesos
+		FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
+			.clone()
+			.setCheckpoint(true);
+
+		Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
+		if(frameworkID.isEmpty()) {
+			LOG.info("Registering as new framework.");
+		}
+		else {
+			LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
+			frameworkInfo.setId(frameworkID.get());
+		}
+
+		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+		schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
+
+		// create supporting actors
+		connectionMonitor = createConnectionMonitor();
+		launchCoordinator = createLaunchCoordinator();
+		reconciliationCoordinator = createReconciliationCoordinator();
+		taskRouter = createTaskRouter();
+
+		recoverWorkers();
+
+		connectionMonitor.tell(new ConnectionMonitor.Start(), self());
+		schedulerDriver.start();
+	}
+
+	protected ActorRef createConnectionMonitor() {
+		return context().actorOf(
+			ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
+			"connectionMonitor");
+	}
+
+	protected ActorRef createTaskRouter() {
+		return context().actorOf(
+			Tasks.createActorProps(Tasks.class, config, schedulerDriver, TaskMonitor.class),
+			"tasks");
+	}
+
+	protected ActorRef createLaunchCoordinator() {
+		return context().actorOf(
+			LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver, createOptimizer()),
+			"launchCoordinator");
+	}
+
+	protected ActorRef createReconciliationCoordinator() {
+		return context().actorOf(
+			ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config, schedulerDriver),
+			"reconciliationCoordinator");
+	}
+
+	@Override
+	public void postStop() {
+		LOG.info("Stopping Mesos resource master");
+		super.postStop();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Actor messages
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void handleMessage(Object message) {
+
+		// check for Mesos-specific actor messages first
+
+		// --- messages about Mesos connection
+		if (message instanceof Registered) {
+			registered((Registered) message);
+		} else if (message instanceof ReRegistered) {
+			reregistered((ReRegistered) message);
+		} else if (message instanceof Disconnected) {
+			disconnected((Disconnected) message);
+		} else if (message instanceof Error) {
+			error(((Error) message).message());
+
+		// --- messages about offers
+		} else if (message instanceof ResourceOffers || message instanceof OfferRescinded) {
+			launchCoordinator.tell(message, self());
+		} else if (message instanceof AcceptOffers) {
+			acceptOffers((AcceptOffers) message);
+
+		// --- messages about tasks
+		} else if (message instanceof StatusUpdate) {
+			taskStatusUpdated((StatusUpdate) message);
+		} else if (message instanceof ReconciliationCoordinator.Reconcile) {
+			// a reconciliation request from a task
+			reconciliationCoordinator.tell(message, self());
+		} else if (message instanceof TaskMonitor.TaskTerminated) {
+			// a termination message from a task
+			TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
+			taskTerminated(msg.taskID(), msg.status());
+
+		} else  {
+			// message handled by the generic resource master code
+			super.handleMessage(message);
+		}
+	}
+
+	/**
+	 * Called to shut down the cluster (not a failover situation).
+	 *
+	 * @param finalStatus The application status to report.
+	 * @param optionalDiagnostics An optional diagnostics message.
+     */
+	@Override
+	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+		LOG.info("Shutting down and unregistering as a Mesos framework.");
+		try {
+			// unregister the framework, which implicitly removes all tasks.
+			schedulerDriver.stop(false);
+		}
+		catch(Exception ex) {
+			LOG.warn("unable to unregister the framework", ex);
+		}
+
+		try {
+			workerStore.cleanup();
+		}
+		catch(Exception ex) {
+			LOG.warn("unable to cleanup the ZooKeeper state", ex);
+		}
+
+		context().stop(self());
+	}
+
+	@Override
+	protected void fatalError(String message, Throwable error) {
+		// we do not unregister, but cause a hard fail of this process, to have it
+		// restarted by the dispatcher
+		LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error);
+		LOG.error("Shutting down process");
+
+		// kill this process, this will make an external supervisor (the dispatcher) restart the process
+		System.exit(EXIT_CODE_FATAL_ERROR);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Worker Management
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Recover framework/worker information persisted by a prior incarnation of the RM.
+	 */
+	private void recoverWorkers() throws Exception {
+		// if this application master starts as part of an ApplicationMaster/JobManager recovery,
+		// then some worker tasks are most likely still alive and we can re-obtain them
+		final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+
+		if (!tasksFromPreviousAttempts.isEmpty()) {
+			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
+
+			List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
+			List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
+
+			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
+
+				switch(worker.state()) {
+					case New:
+						workersInNew.put(extractResourceID(worker.taskID()), worker);
+						toLaunch.add(launchable);
+						break;
+					case Launched:
+						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
+						break;
+					case Released:
+						workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+						break;
+				}
+				taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+			}
+
+			// tell the launch coordinator about prior assignments
+			if(toAssign.size() >= 1) {
+				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
+			}
+			// tell the launch coordinator to launch any new tasks
+			if(toLaunch.size() >= 1) {
+				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
+			}
+		}
+	}
+
+	/**
+	 * Plan for some additional workers to be launched.
+	 *
+	 * @param numWorkers The number of workers to allocate.
+     */
+	@Override
+	protected void requestNewWorkers(int numWorkers) {
+
+		try {
+			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(numWorkers);
+			List<LaunchableTask> toLaunch = new ArrayList<>(numWorkers);
+
+			// generate new workers into persistent state and launch associated actors
+			for (int i = 0; i < numWorkers; i++) {
+				MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
+				workerStore.putWorker(worker);
+				workersInNew.put(extractResourceID(worker.taskID()), worker);
+
+				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
+
+				LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).",
+					launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
+
+				toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+				toLaunch.add(launchable);
+			}
+
+			// tell the task router about the new plans
+			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
+				taskRouter.tell(update, self());
+			}
+
+			// tell the launch coordinator to launch the new tasks
+			if(toLaunch.size() >= 1) {
+				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
+			}
+		}
+		catch(Exception ex) {
+			fatalError("unable to request new workers", ex);
+		}
+	}
+
+	/**
+	 * Accept offers as advised by the launch coordinator.
+	 *
+	 * Acceptance is routed through the RM to update the persistent state before
+	 * forwarding the message to Mesos.
+     */
+	private void acceptOffers(AcceptOffers msg) {
+
+		try {
+			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
+
+			// transition the persistent state of some tasks to Launched
+			for (Protos.Offer.Operation op : msg.operations()) {
+				if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) {
+					continue;
+				}
+				for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
+					MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId()));
+					assert (worker != null);
+
+					worker = worker.launchTask(info.getSlaveId(), msg.hostname());
+					workerStore.putWorker(worker);
+					workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+
+					LOG.info("Launching Mesos task {} on host {}.",
+						worker.taskID().getValue(), worker.hostname().get());
+
+					toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+				}
+			}
+
+			// tell the task router about the new plans
+			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
+				taskRouter.tell(update, self());
+			}
+
+			// send the acceptance message to Mesos
+			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
+		}
+		catch(Exception ex) {
+			fatalError("unable to accept offers", ex);
+		}
+	}
+
+	/**
+	 * Handle a task status change.
+     */
+	private void taskStatusUpdated(StatusUpdate message) {
+		taskRouter.tell(message, self());
+		reconciliationCoordinator.tell(message, self());
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	/**
+	 * Accept the given started worker into the internal state.
+	 *
+	 * @param resourceID The worker resource id
+	 * @return A registered worker node record.
+	 */
+	@Override
+	protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) {
+		MesosWorkerStore.Worker inLaunch = workersInLaunch.remove(resourceID);
+		if (inLaunch == null) {
+			// Worker was not in state "being launched", this can indicate that the TaskManager
+			// in this worker was already registered or that the container was not started
+			// by this resource manager. Simply ignore this resourceID.
+			return null;
+		}
+		return new RegisteredMesosWorkerNode(inLaunch);
+	}
+
+	/**
+	 * Accept the given registered workers into the internal state.
+	 *
+	 * @param toConsolidate The worker IDs known previously to the JobManager.
+	 * @return A collection of registered worker node records.
+     */
+	@Override
+	protected Collection<RegisteredMesosWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
+
+		// we check for each task manager if we recognize its Mesos task ID
+		List<RegisteredMesosWorkerNode> accepted = new ArrayList<>(toConsolidate.size());
+		for (ResourceID resourceID : toConsolidate) {
+			MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID);
+			if (worker != null) {
+				LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID);
+				accepted.add(new RegisteredMesosWorkerNode(worker));
+			}
+			else {
+				if(isStarted(resourceID)) {
+					LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
+				}
+				else {
+					LOG.info("Mesos worker consolidation does not recognize TaskManager {}.", resourceID);
+				}
+			}
+		}
+		return accepted;
+	}
+
+	/**
+	 * Release the given pending worker.
+	 */
+	@Override
+	protected void releasePendingWorker(ResourceID id) {
+		MesosWorkerStore.Worker worker = workersInLaunch.remove(id);
+		if (worker != null) {
+			releaseWorker(worker);
+		} else {
+			LOG.error("Cannot find worker {} to release. Ignoring request.", id);
+		}
+	}
+
+	/**
+	 * Release the given started worker.
+	 */
+	@Override
+	protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
+		releaseWorker(worker.task());
+	}
+
+	/**
+	 * Plan for the removal of the given worker.
+     */
+	private void releaseWorker(MesosWorkerStore.Worker worker) {
+		try {
+			LOG.info("Releasing worker {}", worker.taskID());
+
+			// update persistent state of worker to Released
+			worker = worker.releaseTask();
+			workerStore.putWorker(worker);
+			workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+			taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+
+			if (worker.hostname().isDefined()) {
+				// tell the launch coordinator that the task is being unassigned from the host, for planning purposes
+				launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self());
+			}
+		}
+		catch (Exception ex) {
+			fatalError("unable to release worker", ex);
+		}
+	}
+
+	@Override
+	protected int getNumWorkerRequestsPending() {
+		return workersInNew.size();
+	}
+
+	@Override
+	protected int getNumWorkersPendingRegistration() {
+		return workersInLaunch.size();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Callbacks from the Mesos Master
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Called when connected to Mesos as a new framework.
+	 */
+	private void registered(Registered message) {
+		connectionMonitor.tell(message, self());
+
+		try {
+			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+		}
+		catch(Exception ex) {
+			fatalError("unable to store the assigned framework ID", ex);
+			return;
+		}
+
+		launchCoordinator.tell(message, self());
+		reconciliationCoordinator.tell(message, self());
+		taskRouter.tell(message, self());
+	}
+
+	/**
+	 * Called when reconnected to Mesos following a failover event.
+	 */
+	private void reregistered(ReRegistered message) {
+		connectionMonitor.tell(message, self());
+		launchCoordinator.tell(message, self());
+		reconciliationCoordinator.tell(message, self());
+		taskRouter.tell(message, self());
+	}
+
+	/**
+	 * Called when disconnected from Mesos.
+	 */
+	private void disconnected(Disconnected message) {
+		connectionMonitor.tell(message, self());
+		launchCoordinator.tell(message, self());
+		reconciliationCoordinator.tell(message, self());
+		taskRouter.tell(message, self());
+	}
+
+	/**
+	 * Called when an error is reported by the scheduler callback.
+	 */
+	private void error(String message) {
+		self().tell(new FatalErrorOccurred("Connection to Mesos failed", new Exception(message)), self());
+	}
+
+	/**
+	 * Invoked when a Mesos task reaches a terminal status.
+     */
+	private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus status) {
+		// this callback occurs for failed containers and for released containers alike
+
+		final ResourceID id = extractResourceID(taskID);
+
+		try {
+			workerStore.removeWorker(taskID);
+		}
+		catch(Exception ex) {
+			fatalError("unable to remove worker", ex);
+			return;
+		}
+
+		// check if this is a failed task or a released task
+		if (workersBeingReturned.remove(id) != null) {
+			// regular finished worker that we released
+			LOG.info("Worker {} finished successfully with diagnostics: {}",
+				id, status.getMessage());
+		} else {
+			// failed worker, either at startup, or running
+			final MesosWorkerStore.Worker launched = workersInLaunch.remove(id);
+			if (launched != null) {
+				LOG.info("Mesos task {} failed, with a TaskManager in launch or registration. " +
+					"State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
+				// we will trigger re-acquiring new workers at the end
+			} else {
+				// failed registered worker
+				LOG.info("Mesos task {} failed, with a registered TaskManager. " +
+					"State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
+
+				// notify the generic logic, which notifies the JobManager, etc.
+				notifyWorkerFailed(id, "Mesos task " + id + " failed.  State: " + status.getState());
+			}
+
+			// general failure logging
+			failedTasksSoFar++;
+
+			String diagMessage = String.format("Diagnostics for task %s in state %s : " +
+					"reason=%s message=%s",
+				id, status.getState(), status.getReason(), status.getMessage());
+			sendInfoMessage(diagMessage);
+
+			LOG.info(diagMessage);
+			LOG.info("Total number of failed tasks so far: " + failedTasksSoFar);
+
+			// maxFailedTasks == -1 is infinite number of retries.
+			if (maxFailedTasks >= 0 && failedTasksSoFar > maxFailedTasks) {
+				String msg = "Stopping Mesos session because the number of failed tasks ("
+					+ failedTasksSoFar + ") exceeded the maximum failed tasks ("
+					+ maxFailedTasks + "). This number is controlled by the '"
+					+ ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. "
+					+ "By default its the number of requested tasks.";
+
+				LOG.error(msg);
+				self().tell(decorateMessage(new StopCluster(ApplicationStatus.FAILED, msg)),
+					ActorRef.noSender());
+
+				// no need to do anything else
+				return;
+			}
+		}
+
+		// in case failed containers were among the finished containers, make
+		// sure we re-examine and request new ones
+		triggerCheckWorkers();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
+		LaunchableMesosWorker launchable =
+			new LaunchableMesosWorker(taskManagerParameters, taskManagerLaunchContext, taskID);
+		return launchable;
+	}
+
+	/**
+	 * Extracts a unique ResourceID from the Mesos task.
+	 *
+	 * @param taskId the Mesos TaskID
+	 * @return The ResourceID for the container
+	 */
+	static ResourceID extractResourceID(Protos.TaskID taskId) {
+		return new ResourceID(taskId.getValue());
+	}
+
+	/**
+	 * Extracts the Mesos task goal state from the worker information.
+	 * @param worker the persistent worker information.
+	 * @return goal state information for the {@Link TaskMonitor}.
+     */
+	static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
+		switch(worker.state()) {
+			case New: return new TaskMonitor.New(worker.taskID());
+			case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+			case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+			default: throw new IllegalArgumentException();
+		}
+	}
+
+	/**
+	 * Creates the Fenzo optimizer (builder).
+	 * The builder is an indirection to faciliate unit testing of the Launch Coordinator.
+     */
+	private static TaskSchedulerBuilder createOptimizer() {
+		return new TaskSchedulerBuilder() {
+			TaskScheduler.Builder builder = new TaskScheduler.Builder();
+
+			@Override
+			public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action) {
+				builder.withLeaseRejectAction(action);
+				return this;
+			}
+
+			@Override
+			public TaskScheduler build() {
+				return builder.build();
+			}
+		};
+	}
+
+	/**
+	 * Creates the props needed to instantiate this actor.
+	 *
+	 * Rather than extracting and validating parameters in the constructor, this factory method takes
+	 * care of that. That way, errors occur synchronously, and are not swallowed simply in a
+	 * failed asynchronous attempt to start the actor.
+
+	 * @param actorClass
+	 *             The actor class, to allow overriding this actor with subclasses for testing.
+	 * @param flinkConfig
+	 *             The Flink configuration object.
+	 * @param taskManagerParameters
+	 *             The parameters for launching TaskManager containers.
+	 * @param taskManagerLaunchContext
+	 *             The parameters for launching the TaskManager processes in the TaskManager containers.
+	 * @param numInitialTaskManagers
+	 *             The initial number of TaskManagers to allocate.
+	 * @param log
+	 *             The logger to log to.
+	 *
+	 * @return The Props object to instantiate the MesosFlinkResourceManager actor.
+	 */
+	public static Props createActorProps(Class<? extends MesosFlinkResourceManager> actorClass,
+			Configuration flinkConfig,
+			MesosConfiguration mesosConfig,
+			MesosWorkerStore workerStore,
+			LeaderRetrievalService leaderRetrievalService,
+			MesosTaskManagerParameters taskManagerParameters,
+			Protos.TaskInfo.Builder taskManagerLaunchContext,
+			int numInitialTaskManagers,
+			Logger log)
+	{
+		final int maxFailedTasks = flinkConfig.getInteger(
+			ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers);
+		if (maxFailedTasks >= 0) {
+			log.info("Mesos framework tolerates {} failed tasks before giving up",
+				maxFailedTasks);
+		}
+
+		return Props.create(actorClass,
+			flinkConfig,
+			mesosConfig,
+			workerStore,
+			leaderRetrievalService,
+			taskManagerParameters,
+			taskManagerLaunchContext,
+			maxFailedTasks,
+			numInitialTaskManagers);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
new file mode 100644
index 0000000..b3956aa
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -0,0 +1,51 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+
+import static java.util.Objects.requireNonNull;
+
+public class MesosTaskManagerParameters {
+
+	private double cpus;
+
+	private ContaineredTaskManagerParameters containeredParameters;
+
+	public MesosTaskManagerParameters(double cpus, ContaineredTaskManagerParameters containeredParameters) {
+		requireNonNull(containeredParameters);
+		this.cpus = cpus;
+		this.containeredParameters = containeredParameters;
+	}
+
+	public double cpus() {
+		return cpus;
+	}
+
+	public ContaineredTaskManagerParameters containeredParameters() {
+		return containeredParameters;
+	}
+
+	@Override
+	public String toString() {
+		return "MesosTaskManagerParameters{" +
+			"cpus=" + cpus +
+			", containeredParameters=" + containeredParameters +
+			'}';
+	}
+
+	/**
+	 * Create the Mesos TaskManager parameters.
+	 * @param flinkConfig the TM configuration.
+	 * @param containeredParameters additional containered parameters.
+     */
+	public static MesosTaskManagerParameters create(
+		Configuration flinkConfig,
+		ContaineredTaskManagerParameters containeredParameters) {
+
+		double cpus = flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS,
+			Math.max(containeredParameters.numSlots(), 1.0));
+
+		return new MesosTaskManagerParameters(cpus, containeredParameters);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
new file mode 100644
index 0000000..5dfc75e
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -0,0 +1,99 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskManagerRunner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class);
+
+	/** The process environment variables */
+	private static final Map<String, String> ENV = System.getenv();
+
+	public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws IOException {
+		EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args);
+		org.apache.flink.runtime.util.SignalHandler.register(LOG);
+
+		// try to parse the command line arguments
+		final Configuration configuration;
+		try {
+			configuration = TaskManager.parseArgsAndLoadConfig(args);
+
+			// add dynamic properties to TaskManager configuration.
+			final Configuration dynamicProperties =
+				FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+			configuration.addAll(dynamicProperties);
+		}
+		catch (Throwable t) {
+			LOG.error(t.getMessage(), t);
+			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+			return;
+		}
+
+		// read the environment variables
+		final Map<String, String> envs = System.getenv();
+		final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+		// configure local directory
+		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+		if (flinkTempDirs != null) {
+			LOG.info("Overriding Mesos temporary file directories with those " +
+				"specified in the Flink config: " + flinkTempDirs);
+		}
+		else if (tmpDirs != null) {
+			LOG.info("Setting directories for temporary files to: " + tmpDirs);
+			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
+		}
+
+		LOG.info("Mesos task runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() +
+			"', setting user to execute Flink TaskManager to '" + effectiveUsername + "'");
+
+		// tell akka to die in case of an error
+		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(effectiveUsername);
+		for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+
+		// Infer the resource identifier from the environment variable
+		String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
+		final ResourceID resourceId = new ResourceID(containerID);
+		LOG.info("ResourceID assigned for this container: {}", resourceId);
+
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
+				}
+				catch (Throwable t) {
+					LOG.error("Error while starting the TaskManager", t);
+					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+				}
+				return null;
+			}
+		});
+	}
+}


Mime
View raw message