flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/3] flink git commit: [FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility
Date Mon, 05 Dec 2016 23:29:11 GMT
[FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility

- introduced ContainerSpecification.
- reworked how the TM container environment is constructed; eliminated
- special-case environment variables, file layout.
- added dynamic configuration support to GlobalConfiguration.
- integrated the SecurityContext into AM/TM runners.
- added config setting for Mesos framework user.
- support DCOS side-channel authentication.
- set the FS default scheme.
- made the artifact server more generic (no assumption about existence
- of dispatcher, Path-based).
- moved some test code related to overriding the JVM’s env.
- moved the Mesos containerizer config code to the MesosTaskManagerParameters.

This closes #2915.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/230bf17b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/230bf17b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/230bf17b

Branch: refs/heads/master
Commit: 230bf17bac3d76959a5cb6aa73ac685757c51cab
Parents: 3b85f42
Author: wrighe3 <eron.wright@emc.com>
Authored: Thu Dec 1 00:21:28 2016 -0800
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Dec 6 00:29:25 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  15 +
 .../configuration/GlobalConfiguration.java      |  27 +-
 flink-dist/src/main/assemblies/bin.xml          |   6 +
 .../main/flink-bin/mesos-bin/mesos-appmaster.sh |  51 +++
 .../flink-bin/mesos-bin/mesos-taskmanager.sh    |  60 +++
 .../main/java/org/apache/flink/mesos/Utils.java |  21 +
 .../clusterframework/LaunchableMesosWorker.java | 106 ++++-
 .../MesosApplicationMasterRunner.java           | 456 ++++++-------------
 .../clusterframework/MesosConfigKeys.java       |  25 +-
 .../MesosFlinkResourceManager.java              |  45 +-
 .../MesosTaskManagerParameters.java             | 106 ++++-
 .../MesosTaskManagerRunner.java                 |  73 ++-
 .../flink/mesos/util/MesosArtifactResolver.java |  31 ++
 .../flink/mesos/util/MesosArtifactServer.java   | 146 ++++--
 .../MesosFlinkResourceManagerTest.java          |  19 +-
 .../clusterframework/BootstrapTools.java        |  36 ++
 .../ContainerSpecification.java                 | 206 +++++++++
 .../overlays/AbstractContainerOverlay.java      |  72 +++
 .../overlays/CompositeContainerOverlay.java     |  49 ++
 .../overlays/ContainerOverlay.java              |  37 ++
 .../overlays/FlinkDistributionOverlay.java      | 126 +++++
 .../overlays/HadoopConfOverlay.java             | 147 ++++++
 .../overlays/HadoopUserOverlay.java             |  83 ++++
 .../overlays/KeytabOverlay.java                 | 102 +++++
 .../overlays/Krb5ConfOverlay.java               | 111 +++++
 .../overlays/SSLStoreOverlay.java               | 124 +++++
 .../flink/runtime/security/SecurityUtils.java   |   4 +-
 .../overlays/ContainerOverlayTestBase.java      |  73 +++
 .../overlays/FlinkDistributionOverlayTest.java  | 117 +++++
 .../overlays/HadoopConfOverlayTest.java         | 119 +++++
 .../overlays/HadoopUserOverlayTest.java         |  73 +++
 .../overlays/KeytabOverlayTest.java             |  71 +++
 .../overlays/Krb5ConfOverlayTest.java           |  59 +++
 .../overlays/SSLStoreOverlayTest.java           |  78 ++++
 .../flink/core/testutils/CommonTestUtils.java   |  39 ++
 .../apache/flink/test/util/TestBaseUtils.java   |  38 +-
 36 files changed, 2450 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 6bc5e2e..a515c33 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -440,6 +440,11 @@ public final class ConfigConstants {
 	// ------------------------ Mesos Configuration ------------------------
 
 	/**
+	 * The initial number of Mesos tasks to allocate.
+	 */
+	public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
+
+	/**
 	 * The maximum number of failed Mesos tasks before entirely stopping
 	 * the Mesos session / job on Mesos.
 	 *
@@ -484,6 +489,8 @@ public final class ConfigConstants {
 
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret";
 
+	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
+
 	/**
 	 * The cpus to acquire from Mesos.
 	 *
@@ -1186,6 +1193,8 @@ public final class ConfigConstants {
 
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
 
+	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
+
 	/** Default value to override SSL support for the Artifact Server */
 	public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
 
@@ -1405,6 +1414,12 @@ public final class ConfigConstants {
 	/** The environment variable name which contains the location of the lib folder */
 	public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+	/** The environment variable name which contains the location of the bin directory */
+	public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";
+
+	/** The environment variable name which contains the Flink installation root directory */
+	public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
+
 	// -------------------------------- Security -------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index ecfbc72..dca6307 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -39,12 +39,31 @@ public final class GlobalConfiguration {
 
 	public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
 
+
 	// --------------------------------------------------------------------------------------------
 
 	private GlobalConfiguration() {}
 
 	// --------------------------------------------------------------------------------------------
 
+	private static Configuration dynamicProperties = null;
+
+	/**
+	 * Set the process-wide dynamic properties to be merged with the loaded configuration.
+     */
+	public static void setDynamicProperties(Configuration dynamicProperties) {
+		GlobalConfiguration.dynamicProperties = new Configuration(dynamicProperties);
+	}
+
+	/**
+	 * Get the dynamic properties.
+     */
+	public static Configuration getDynamicProperties() {
+		return GlobalConfiguration.dynamicProperties;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an
 	 * empty configuration object if the environment variable is not set. In production this variable is set but
@@ -90,7 +109,13 @@ public final class GlobalConfiguration {
 					"' (" + confDirFile.getAbsolutePath() + ") does not exist.");
 		}
 
-		return loadYAMLResource(yamlConfigFile);
+		Configuration conf = loadYAMLResource(yamlConfigFile);
+
+		if(dynamicProperties != null) {
+			conf.addAll(dynamicProperties);
+		}
+
+		return conf;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index b4291d3..901cac9 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -82,6 +82,12 @@ under the License.
 			<outputDirectory>bin</outputDirectory>
 			<fileMode>0755</fileMode>
 		</fileSet>
+		<!-- copy Mesos start scripts -->
+		<fileSet>
+			<directory>src/main/flink-bin/mesos-bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
 		
 		<!-- copy default configuration -->
 		<fileSet>

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
new file mode 100755
index 0000000..d65c6b0
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+# auxilliary function to construct a lightweight classpath for the
+# Flink AppMaster
+constructAppMasterClassPath() {
+
+    while read -d '' -r jarfile ; do
+        if [[ $CC_CLASSPATH = "" ]]; then
+            CC_CLASSPATH="$jarfile";
+        else
+            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+        fi
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+
+    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+}
+
+CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)`
+
+log=flink-appmaster.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner "$@"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
new file mode 100755
index 0000000..ff03abd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
@@ -0,0 +1,60 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+# auxilliary function to construct a lightweight classpath for the
+# Flink TaskManager
+constructTaskManagerClassPath() {
+
+    while read -d '' -r jarfile ; do
+        if [[ $CC_CLASSPATH = "" ]]; then
+            CC_CLASSPATH="$jarfile";
+        else
+            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+        fi
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+
+    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+}
+
+CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)`
+
+log=flink-taskmanager.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+# Add precomputed memory JVM options
+if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
+    FLINK_ENV_JAVA_OPTS_MEM=""
+fi
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"
+
+# Add TaskManager-specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager "$@"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index 173ae33..7787e40 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.mesos;
 
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.mesos.Protos;
+import scala.Option;
 
 import java.net.URL;
 import java.util.Arrays;
@@ -46,6 +49,24 @@ public class Utils {
 	}
 
 	/**
+	 * Construct a Mesos URI.
+	 */
+	public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
+		Option<URL> url = resolver.resolve(artifact.dest);
+		if(url.isEmpty()) {
+			throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
+		}
+
+		return Protos.CommandInfo.URI.newBuilder()
+			.setValue(url.get().toExternalForm())
+			.setOutputFile(artifact.dest.toString())
+			.setExtract(artifact.extract)
+			.setCache(artifact.cachable)
+			.setExecutable(artifact.executable)
+			.build();
+	}
+
+	/**
 	 * Construct a scalar resource value.
 	 */
 	public static Protos.Resource scalar(String name, double value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 5f940b5..c6e51f1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -23,8 +23,11 @@ import com.netflix.fenzo.TaskAssignmentResult;
 import com.netflix.fenzo.TaskRequest;
 import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.Utils;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.mesos.Protos;
 
 import java.util.Collections;
@@ -38,7 +41,10 @@ import static org.apache.flink.mesos.Utils.ranges;
 import static org.apache.flink.mesos.Utils.scalar;
 
 /**
- * Specifies how to launch a Mesos worker.
+ * Implements the launch of a Mesos worker.
+ *
+ * Translates the abstract {@link ContainerSpecification} into a concrete
+ * Mesos-specific {@link org.apache.mesos.Protos.TaskInfo}.
  */
 public class LaunchableMesosWorker implements LaunchableTask {
 
@@ -49,20 +55,24 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		"taskmanager.rpc.port",
 		"taskmanager.data.port" };
 
+	private final MesosArtifactResolver resolver;
+	private final ContainerSpecification containerSpec;
 	private final MesosTaskManagerParameters params;
-	private final Protos.TaskInfo.Builder template;
 	private final Protos.TaskID taskID;
 	private final Request taskRequest;
 
 	/**
 	 * Construct a launchable Mesos worker.
 	 * @param params the TM parameters such as memory, cpu to acquire.
-	 * @param template a template for the TaskInfo to be constructed at launch time.
+	 * @param containerSpec an abstract container specification for launch time.
 	 * @param taskID the taskID for this worker.
 	 */
-	public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+	public LaunchableMesosWorker(
+		MesosArtifactResolver resolver, MesosTaskManagerParameters params,
+		ContainerSpecification containerSpec, Protos.TaskID taskID) {
+		this.resolver = resolver;
 		this.params = params;
-		this.template = template;
+		this.containerSpec = containerSpec;
 		this.taskID = taskID;
 		this.taskRequest = new Request();
 	}
@@ -157,17 +167,25 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	@Override
 	public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assignment) {
 
+		ContaineredTaskManagerParameters tmParams = params.containeredParameters();
+
 		final Configuration dynamicProperties = new Configuration();
 
-		// specialize the TaskInfo template with assigned resources, environment variables, etc
-		final Protos.TaskInfo.Builder taskInfo = template
-			.clone()
+		// incorporate the dynamic properties set by the template
+		dynamicProperties.addAll(containerSpec.getDynamicConfiguration());
+
+		// build a TaskInfo with assigned resources, environment variables, etc
+		final Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder()
 			.setSlaveId(slaveId)
 			.setTaskId(taskID)
 			.setName(taskID.getValue())
 			.addResources(scalar("cpus", assignment.getRequest().getCPUs()))
 			.addResources(scalar("mem", assignment.getRequest().getMemory()));
 
+		final Protos.CommandInfo.Builder cmd = taskInfo.getCommandBuilder();
+		final Protos.Environment.Builder env = cmd.getEnvironmentBuilder();
+		final StringBuilder jvmArgs = new StringBuilder();
+
 		// use the assigned ports for the TM
 		if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) {
 			throw new IllegalArgumentException("unsufficient # of ports assigned");
@@ -179,17 +197,69 @@ public class LaunchableMesosWorker implements LaunchableTask {
 			dynamicProperties.setInteger(key, port);
 		}
 
-		// finalize environment variables
-		final Protos.Environment.Builder environmentBuilder = taskInfo.getCommandBuilder().getEnvironmentBuilder();
+		// ship additional files
+		for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
+			cmd.addUris(Utils.uri(resolver, artifact));
+		}
 
-		// propagate the Mesos task ID to the TM
-		environmentBuilder
-			.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
+		// propagate environment variables
+		for (Map.Entry<String, String> entry : params.containeredParameters().taskManagerEnv().entrySet()) {
+			env.addVariables(variable(entry.getKey(), entry.getValue()));
+		}
+		for (Map.Entry<String, String> entry : containerSpec.getEnvironmentVariables().entrySet()) {
+			env.addVariables(variable(entry.getKey(), entry.getValue()));
+		}
 
-		// propagate the dynamic configuration properties to the TM
-		String dynamicPropertiesEncoded = FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties);
-		environmentBuilder
-			.addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded));
+		// propagate the Mesos task ID to the TM
+		env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
+
+		// finalize the memory parameters
+		jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
+		jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
+		jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+
+		// pass dynamic system properties
+		jvmArgs.append(' ').append(
+			ContainerSpecification.formatSystemProperties(containerSpec.getSystemProperties()));
+
+		// finalize JVM args
+		env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, jvmArgs.toString()));
+
+		// build the launch command w/ dynamic application properties
+		StringBuilder launchCommand = new StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh ");
+		launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+		cmd.setValue(launchCommand.toString());
+
+		// build the container info
+		Protos.ContainerInfo.Builder containerInfo = null;
+		switch(params.containerType()) {
+			case MESOS:
+				if(params.containerImageName().isDefined()) {
+					containerInfo = Protos.ContainerInfo.newBuilder()
+						.setType(Protos.ContainerInfo.Type.MESOS)
+						.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
+						.setImage(Protos.Image.newBuilder()
+							.setType(Protos.Image.Type.DOCKER)
+							.setDocker(Protos.Image.Docker.newBuilder()
+								.setName(params.containerImageName().get()))));
+				}
+				break;
+
+			case DOCKER:
+				assert(params.containerImageName().isDefined());
+				containerInfo = Protos.ContainerInfo.newBuilder()
+					.setType(Protos.ContainerInfo.Type.DOCKER)
+					.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
+						.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
+						.setImage(params.containerImageName().get()));
+				break;
+
+			default:
+				throw new IllegalStateException("unsupported container type");
+		}
+		if(containerInfo != null) {
+			taskInfo.setContainer(containerInfo);
+		}
 
 		return taskInfo.build();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index ef58250..4b9bd82 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,14 +22,17 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
-
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
 import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
@@ -38,14 +41,20 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.mesos.util.ZooKeeperUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -53,21 +62,15 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
-
-import org.apache.hadoop.security.UserGroupInformation;
-
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import scala.Option;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
-import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -75,9 +78,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.mesos.Utils.uri;
-import static org.apache.flink.mesos.Utils.variable;
-
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -106,6 +106,18 @@ public class MesosApplicationMasterRunner {
 	private static final int ACTOR_DIED_EXIT_CODE = 32;
 
 	// ------------------------------------------------------------------------
+	//  Command-line options
+	// ------------------------------------------------------------------------
+
+	private static final Options ALL_OPTIONS;
+
+	static {
+		ALL_OPTIONS =
+			new Options()
+			.addOption(BootstrapTools.newDynamicPropertiesOption());
+	}
+
+	// ------------------------------------------------------------------------
 	//  Program entry point
 	// ------------------------------------------------------------------------
 
@@ -126,36 +138,44 @@ public class MesosApplicationMasterRunner {
 
 	/**
 	 * The instance entry point for the Mesos AppMaster. Obtains user group
-	 * information and calls the main work method {@link #runPrivileged(Configuration)} as a
+	 * information and calls the main work method {@link #runPrivileged(Configuration,Configuration)} as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
 	 * @return The process exit code.
 	 */
-	protected int run(String[] args) {
+	protected int run(final String[] args) {
 		try {
 			LOG.debug("All environment variables: {}", ENV);
 
-			final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-			checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+			// loading all config values here has the advantage that the program fails fast, if any
+			// configuration problem occurs
 
-			// Flink configuration
-			final Configuration dynamicProperties =
-					FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+			CommandLineParser parser = new PosixParser();
+			CommandLine cmd = parser.parse(ALL_OPTIONS, args);
 
-			final Configuration configuration = createConfiguration(workingDir, dynamicProperties);
+			final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+			GlobalConfiguration.setDynamicProperties(dynamicProperties);
+			final Configuration config = GlobalConfiguration.loadConfiguration();
 
-			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+			// configure the default filesystem
+			try {
+				FileSystem.setDefaultScheme(config);
+			} catch (IOException e) {
+				throw new IOException("Error while setting the default " +
+					"filesystem scheme from configuration.", e);
+			}
+
+			// configure security
+			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config);
 			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
 			SecurityUtils.install(sc);
 
-			LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());
-
+			// run the actual work in the installed security context
 			return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 				@Override
-				public Integer call() {
-					return runPrivileged(configuration);
+				public Integer call() throws Exception {
+					return runPrivileged(config, dynamicProperties);
 				}
 			});
 		}
@@ -175,78 +195,38 @@ public class MesosApplicationMasterRunner {
 	 *
 	 * @return The return code for the Java process.
 	 */
-	protected int runPrivileged(Configuration config) {
+	protected int runPrivileged(Configuration config, Configuration dynamicProperties) {
 
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
 		MesosArtifactServer artifactServer = null;
-
-		// ------- (1) load and parse / validate all configurations -------
-
-		// loading all config values here has the advantage that the program fails fast, if any
-		// configuration problem occurs
-
-		final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-
-		final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
-		checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
-
-		// Note that we use the "appMasterHostname" given by the system, to make sure
-		// we use the hostnames consistently throughout akka.
-		// for akka "localhost" and "localhost.localdomain" are different actors.
-		final String appMasterHostname;
+		ExecutorService futureExecutor = null;
+		ExecutorService ioExecutor = null;
 
 		try {
-			appMasterHostname = InetAddress.getLocalHost().getHostName();
-		} catch (UnknownHostException uhe) {
-			LOG.error("Could not retrieve the local hostname.", uhe);
+			// ------- (1) load and parse / validate all configurations -------
 
-			return INIT_ERROR_EXIT_CODE;
-		}
+			// Note that we use the "appMasterHostname" given by the system, to make sure
+			// we use the hostnames consistently throughout akka.
+			// for akka "localhost" and "localhost.localdomain" are different actors.
+			final String appMasterHostname = InetAddress.getLocalHost().getHostName();
 
-		// Mesos configuration
-		final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
+			// Mesos configuration
+			final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
 
-		int numberProcessors = Hardware.getNumberCPUCores();
+			// JM configuration
+			int numberProcessors = Hardware.getNumberCPUCores();
 
-		final ExecutorService futureExecutor = Executors.newFixedThreadPool(
-			numberProcessors,
-			new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+			futureExecutor = Executors.newFixedThreadPool(
+				numberProcessors,
+				new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
 
-		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
-			numberProcessors,
-			new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+			ioExecutor = Executors.newFixedThreadPool(
+				numberProcessors,
+				new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
 
-		try {
-			// environment values related to TM
-			final int taskManagerContainerMemory;
-			final int numInitialTaskManagers;
-			final int slotsPerTaskManager;
-
-			try {
-				taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
-			} catch (NumberFormatException e) {
-				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY + " : "
-					+ e.getMessage());
-			}
-			try {
-				numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
-			} catch (NumberFormatException e) {
-				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT + " : "
-					+ e.getMessage());
-			}
-			try {
-				slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
-			} catch (NumberFormatException e) {
-				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " : "
-					+ e.getMessage());
-			}
-
-			final ContaineredTaskManagerParameters containeredParameters =
-				ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager);
-
-			final MesosTaskManagerParameters taskManagerParameters =
-				MesosTaskManagerParameters.create(config, containeredParameters);
+			// TM configuration
+			final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config);
 
 			LOG.info("TaskManagers will be created with {} task slots",
 				taskManagerParameters.containeredParameters().numSlots());
@@ -257,7 +237,7 @@ public class MesosApplicationMasterRunner {
 				taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
 				taskManagerParameters.cpus());
 
-			// JM endpoint, which should be explicitly configured by the dispatcher (based on acquired net resources)
+			// JM endpoint, which should be explicitly configured based on acquired net resources
 			final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
 				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
 			checkState(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" +
@@ -279,18 +259,28 @@ public class MesosApplicationMasterRunner {
 			LOG.debug("Starting Artifact Server");
 			final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
 				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
-			artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort, config);
+			final String artifactServerPrefix = UUID.randomUUID().toString();
+			artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
 
 			// ----------------- (3) Generate the configuration for the TaskManagers -------------------
 
+			// generate a container spec which conveys the artifacts/vars needed to launch a TM
+			ContainerSpecification taskManagerContainerSpec = new ContainerSpecification();
+
+			// propagate the AM dynamic configuration to the TM
+			taskManagerContainerSpec.getDynamicConfiguration().addAll(dynamicProperties);
+
+			// propagate newly-generated configuration elements
 			final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
-				config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
-			LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+				new Configuration(), akkaHostname, akkaPort, taskManagerParameters.containeredParameters().numSlots(),
+				TASKMANAGER_REGISTRATION_TIMEOUT);
+			taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig);
+
+			// apply the overlays
+			applyOverlays(config, taskManagerContainerSpec);
 
-			final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext(
-				config, ENV,
-				taskManagerParameters, taskManagerConfig,
-				workingDir, getTaskManagerClass(), artifactServer, LOG);
+			// configure the artifact server to serve the specified artifacts
+			configureArtifactServer(artifactServer, taskManagerContainerSpec);
 
 			// ----------------- (4) start the actors -------------------
 
@@ -341,8 +331,8 @@ public class MesosApplicationMasterRunner {
 				workerStore,
 				leaderRetriever,
 				taskManagerParameters,
-				taskManagerContext,
-				numInitialTaskManagers,
+				taskManagerContainerSpec,
+				artifactServer,
 				LOG);
 
 			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
@@ -389,8 +379,21 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			futureExecutor.shutdownNow();
-			ioExecutor.shutdownNow();
+			if(futureExecutor != null) {
+				try {
+					futureExecutor.shutdownNow();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down future executor", tt);
+				}
+			}
+
+			if(ioExecutor != null) {
+				try {
+					ioExecutor.shutdownNow();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down io executor", tt);
+				}
+			}
 
 			return INIT_ERROR_EXIT_CODE;
 		}
@@ -442,35 +445,12 @@ public class MesosApplicationMasterRunner {
 		return MemoryArchivist.class;
 	}
 
-	protected Class<? extends TaskManager> getTaskManagerClass() {
-		return MesosTaskManager.class;
-	}
-
-	/**
-	 *
-	 * @param baseDirectory
-	 * @param additional
-	 *
-	 * @return The configuration to be used by the TaskManagers.
-	 */
-	private static Configuration createConfiguration(String baseDirectory, Configuration additional) {
-		LOG.info("Loading config from directory {}", baseDirectory);
-
-		Configuration configuration = GlobalConfiguration.loadConfiguration();
-
-		// add dynamic properties to JobManager configuration.
-		configuration.addAll(additional);
-
-		return configuration;
-	}
-
 	/**
 	 * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration.
 	 */
 	public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) {
 
 		Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
-			.setUser("")
 			.setHostname(hostname);
 		Protos.Credential.Builder credential = null;
 
@@ -494,6 +474,10 @@ public class MesosApplicationMasterRunner {
 			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
 			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
 
+		frameworkInfo.setUser(flinkConfig.getString(
+			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
+			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
+
 		if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
 			frameworkInfo.setPrincipal(flinkConfig.getString(
 				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
@@ -501,15 +485,16 @@ public class MesosApplicationMasterRunner {
 			credential = Protos.Credential.newBuilder();
 			credential.setPrincipal(frameworkInfo.getPrincipal());
 
-			if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
-				throw new IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET + " must be configured.");
+			// some environments use a side-channel to communicate the secret to Mesos,
+			// and thus don't set the 'secret' configuration setting
+			if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+				credential.setSecret(flinkConfig.getString(
+					ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
 			}
-			credential.setSecret(flinkConfig.getString(
-				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
 		}
 
 		MesosConfiguration mesos =
-			new MesosConfiguration(masterUrl, frameworkInfo, Option.apply(credential));
+			new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential));
 
 		return mesos;
 	}
@@ -533,203 +518,34 @@ public class MesosApplicationMasterRunner {
 	}
 
 	/**
-	 * Creates a Mesos task info template, which describes how to bring up a TaskManager process as
-	 * a Mesos task.
+	 * Generate a container specification as a TaskManager template.
 	 *
 	 * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
-	 * needs (such as JAR file, config file, ...) and all environment variables in a task info record.
+	 * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
 	 * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
 	 * A lightweight HTTP server serves the artifacts to the fetcher.
-	 *
-	 * <p>We do this work before we start the ResourceManager actor in order to fail early if
-	 * any of the operations here fail.
-	 *
-	 * @param flinkConfig
-	 *         The Flink configuration object.
-	 * @param env
-	 *         The environment variables.
-	 * @param tmParams
-	 *         The TaskManager container memory parameters.
-	 * @param taskManagerConfig
-	 *         The configuration for the TaskManagers.
-	 * @param workingDirectory
-	 *         The current application master container's working directory.
-	 * @param taskManagerMainClass
-	 *         The class with the main method.
-	 * @param artifactServer
-	 *         The artifact server.
-	 * @param log
-	 *         The logger.
-	 *
-	 * @return The task info template for the TaskManager processes.
-	 *
-	 * @throws Exception Thrown if the task info could not be created, for example if
-	 *                   the resources could not be copied.
-	 */
-	public static Protos.TaskInfo.Builder createTaskManagerContext(
-		Configuration flinkConfig,
-		Map<String, String> env,
-		MesosTaskManagerParameters tmParams,
-		Configuration taskManagerConfig,
-		String workingDirectory,
-		Class<?> taskManagerMainClass,
-		MesosArtifactServer artifactServer,
-		Logger log) throws Exception {
-
-
-		Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder();
-		Protos.CommandInfo.Builder cmd = Protos.CommandInfo.newBuilder();
-
-		log.info("Setting up artifacts for TaskManagers");
-
-		String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
-		checkState(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
-
-		String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
-		checkState(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
-
-		// register the Flink jar
-		final File flinkJarFile = new File(workingDirectory, "flink.jar");
-		cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true));
-
-		String hadoopConfDir = env.get("HADOOP_CONF_DIR");
-		LOG.debug("ENV: hadoopConfDir = {}", hadoopConfDir);
-
-		//upload Hadoop configurations to artifact server
-		boolean hadoopConf = false;
-		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
-			File source = new File(hadoopConfDir);
-			if(source.exists() && source.isDirectory()) {
-				hadoopConf = true;
-				File[] fileList = source.listFiles();
-				for(File file: fileList) {
-					if(file.getName().equals("core-site.xml") || file.getName().equals("hdfs-site.xml")) {
-						LOG.debug("Adding local file: [{}] to artifact server", file);
-						File f = new File(hadoopConfDir, file.getName());
-						cmd.addUris(uri(artifactServer.addFile(f, file.getName()), true));
-					}
-				}
-			}
-		}
-
-		//upload keytab to the artifact server
-		String keytabFileName = null;
-		String keytab = flinkConfig.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
-		if(keytab != null) {
-			File source = new File(keytab);
-			if(source.exists()) {
-				LOG.debug("Adding keytab file: [{}] to artifact server", source);
-				keytabFileName = source.getName();
-				cmd.addUris(uri(artifactServer.addFile(source, source.getName()), true));
-			}
-		}
-
-		String principal = flinkConfig.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
-		if(keytabFileName != null && principal != null) {
-			//reset the configurations since we will use in-memory reference from within the TM instance
-			taskManagerConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,"");
-			taskManagerConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,"");
-		}
-
-		// register the TaskManager configuration
-		final File taskManagerConfigFile =
-			new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
-		LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
-		BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
-		cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, GlobalConfiguration.FLINK_CONF_FILENAME), true));
-
-		// prepare additional files to be shipped
-		for (String pathStr : shipListString.split(",")) {
-			if (!pathStr.isEmpty()) {
-				File shipFile = new File(workingDirectory, pathStr);
-				cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true));
-			}
-		}
-
-		log.info("Creating task info for TaskManagers");
-
-		// build the launch command
-		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
-		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
-		boolean hasKrb5 = false;
-
-		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
-			flinkConfig, tmParams.containeredParameters(), ".", ".",
-			hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
-		cmd.setValue(launchCommand);
-
-		// build the environment variables
-		Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder();
-		for (Map.Entry<String, String> entry : tmParams.containeredParameters().taskManagerEnv().entrySet()) {
-			envBuilder.addVariables(variable(entry.getKey(), entry.getValue()));
-		}
-		envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString));
-
-		//add hadoop config directory to the environment
-		if(hadoopConf) {
-			envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_CONF_DIR, "."));
-		}
-
-		//add keytab and principal to environment
-		if(keytabFileName != null && principal != null) {
-			envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB, keytabFileName));
-			envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL, principal));
-		}
-
-		envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_USER_NAME,
-				UserGroupInformation.getCurrentUser().getUserName()));
-
-		cmd.setEnvironment(envBuilder);
-
-		info.setCommand(cmd);
-
-		// Set container for task manager if specified in configs.
-		String tmImageName = flinkConfig.getString(
-			ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME, "");
-
-		if (tmImageName.length() > 0) {
-			String taskManagerContainerType = flinkConfig.getString(
-				ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE,
-				ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE);
-
-			Protos.ContainerInfo.Builder containerInfo;
-
-			switch (taskManagerContainerType) {
-				case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
-					containerInfo = Protos.ContainerInfo.newBuilder()
-						.setType(Protos.ContainerInfo.Type.MESOS)
-						.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
-							.setImage(Protos.Image.newBuilder()
-								.setType(Protos.Image.Type.DOCKER)
-								.setDocker(Protos.Image.Docker.newBuilder()
-									.setName(tmImageName))));
-					break;
-				case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
-					containerInfo = Protos.ContainerInfo.newBuilder()
-						.setType(Protos.ContainerInfo.Type.DOCKER)
-						.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
-							.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
-							.setImage(tmImageName));
-					break;
-				default:
-					LOG.warn(
-						"Invalid container type '{}' provided for setting {}. Valid values are '{}' or '{}'. " +
-							"Starting task managers now without container.",
-						taskManagerContainerType,
-						ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE,
-						ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS,
-						ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER);
-
-					containerInfo = null;
-
-					break;
-			}
+     */
+	private static void applyOverlays(
+		Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
+
+		// create the overlays that will produce the specification
+		CompositeContainerOverlay overlay = new CompositeContainerOverlay(
+			FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+			SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build()
+		);
+
+		// apply the overlays
+		overlay.configure(containerSpec);
+	}
 
-			if (containerInfo != null) {
-				info.setContainer(containerInfo);
-			}
+	private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
+		// serve the artifacts associated with the container environment
+		for(ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+			server.addPath(artifact.source, artifact.dest);
 		}
-
-		return info;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index bc6dde4..ebd9af5 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -26,21 +26,20 @@ public class MesosConfigKeys {
 	//  Environment variable names
 	// ------------------------------------------------------------------------
 
-	public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
-	public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
-	public static final String ENV_SLOTS = "_SLOTS";
-	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
-	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
+	/**
+	 * The Mesos task ID, used by the TM for informational purposes
+	 */
 	public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+
+	/**
+	 * Reserved for future enhancement
+	 */
 	public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
-	public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
-	public static final String ENV_CLASSPATH = "CLASSPATH";
-	public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
-	public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
-	public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
-	public static final String ENV_KEYTAB = "_KEYTAB_FILE";
-	public static final String ENV_KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+
+	/**
+	 * JVM arguments, used by the JM and TM
+	 */
+	public static final String ENV_JVM_ARGS = "JVM_ARGS";
 
 	/** Private constructor to prevent instantiation */
 	private MesosConfigKeys() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 6b24ee8..a7321a3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -27,6 +27,7 @@ import com.netflix.fenzo.functions.Action1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
@@ -44,9 +45,11 @@ import org.apache.flink.mesos.scheduler.messages.ReRegistered;
 import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -76,8 +79,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	/** The TaskManager container parameters (like container memory size) */
 	private final MesosTaskManagerParameters taskManagerParameters;
 
-	/** Context information used to start a TaskManager Java process */
-	private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+	/** Container specification for launching a TM */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Resolver for HTTP artifacts **/
+	private final MesosArtifactResolver artifactResolver;
 
 	/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
 	private final int maxFailedTasks;
@@ -112,7 +118,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		MesosWorkerStore workerStore,
 		LeaderRetrievalService leaderRetrievalService,
 		MesosTaskManagerParameters taskManagerParameters,
-		Protos.TaskInfo.Builder taskManagerLaunchContext,
+		ContainerSpecification taskManagerContainerSpec,
+		MesosArtifactResolver artifactResolver,
 		int maxFailedTasks,
 		int numInitialTaskManagers) {
 
@@ -121,9 +128,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		this.mesosConfig = requireNonNull(mesosConfig);
 
 		this.workerStore = requireNonNull(workerStore);
+		this.artifactResolver = requireNonNull(artifactResolver);
 
 		this.taskManagerParameters = requireNonNull(taskManagerParameters);
-		this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext);
+		this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec);
 		this.maxFailedTasks = maxFailedTasks;
 
 		this.workersInNew = new HashMap<>();
@@ -661,7 +669,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
 		LaunchableMesosWorker launchable =
-			new LaunchableMesosWorker(taskManagerParameters, taskManagerLaunchContext, taskID);
+			new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec, taskID);
 		return launchable;
 	}
 
@@ -723,10 +731,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	 *             The Flink configuration object.
 	 * @param taskManagerParameters
 	 *             The parameters for launching TaskManager containers.
-	 * @param taskManagerLaunchContext
-	 *             The parameters for launching the TaskManager processes in the TaskManager containers.
-	 * @param numInitialTaskManagers
-	 *             The initial number of TaskManagers to allocate.
+	 * @param taskManagerContainerSpec
+	 *             The container specification.
+	 * @param artifactResolver
+	 *             The artifact resolver to locate artifacts
 	 * @param log
 	 *             The logger to log to.
 	 *
@@ -738,10 +746,22 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			MesosWorkerStore workerStore,
 			LeaderRetrievalService leaderRetrievalService,
 			MesosTaskManagerParameters taskManagerParameters,
-			Protos.TaskInfo.Builder taskManagerLaunchContext,
-			int numInitialTaskManagers,
+			ContainerSpecification taskManagerContainerSpec,
+			MesosArtifactResolver artifactResolver,
 			Logger log)
 	{
+
+		final int numInitialTaskManagers = flinkConfig.getInteger(
+			ConfigConstants.MESOS_INITIAL_TASKS, 0);
+		if (numInitialTaskManagers >= 0) {
+			log.info("Mesos framework to allocate {} initial tasks",
+				numInitialTaskManagers);
+		}
+		else {
+			throw new IllegalConfigurationException("Invalid value for " +
+				ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero.");
+		}
+
 		final int maxFailedTasks = flinkConfig.getInteger(
 			ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers);
 		if (maxFailedTasks >= 0) {
@@ -755,7 +775,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			workerStore,
 			leaderRetrievalService,
 			taskManagerParameters,
-			taskManagerLaunchContext,
+			taskManagerContainerSpec,
+			artifactResolver,
 			maxFailedTasks,
 			numInitialTaskManagers);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 1b19d08..7fae58c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -19,10 +19,14 @@
 package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import scala.Option;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
  * This class describes the Mesos-specific parameters for launching a TaskManager process.
@@ -32,13 +36,43 @@ import static java.util.Objects.requireNonNull;
  */
 public class MesosTaskManagerParameters {
 
-	private double cpus;
+	public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
+			key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
+			.defaultValue(1);
 
-	private ContaineredTaskManagerParameters containeredParameters;
+	public static final ConfigOption<Integer> MESOS_RM_TASKS_MEMORY_MB =
+			key("mesos.resourcemanager.tasks.mem")
+			.defaultValue(1024);
 
-	public MesosTaskManagerParameters(double cpus, ContaineredTaskManagerParameters containeredParameters) {
+	public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
+			key("mesos.resourcemanager.tasks.cpus")
+			.defaultValue(0.0);
+
+	public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
+		key("mesos.resourcemanager.tasks.container.type")
+			.defaultValue("mesos");
+
+	public static final ConfigOption<String> MESOS_RM_CONTAINER_IMAGE_NAME =
+		key("mesos.resourcemanager.tasks.container.image.name")
+			.noDefaultValue();
+
+	private final double cpus;
+
+	private final ContainerType containerType;
+
+	private final Option<String> containerImageName;
+
+	private final ContaineredTaskManagerParameters containeredParameters;
+
+	public MesosTaskManagerParameters(
+		double cpus,
+		ContainerType containerType,
+		Option<String> containerImageName,
+		ContaineredTaskManagerParameters containeredParameters) {
 		requireNonNull(containeredParameters);
 		this.cpus = cpus;
+		this.containerType = containerType;
+		this.containerImageName = containerImageName;
 		this.containeredParameters = containeredParameters;
 	}
 
@@ -50,6 +84,22 @@ public class MesosTaskManagerParameters {
 	}
 
 	/**
+	 * Get the container type (Mesos or Docker).  The default is Mesos.
+	 *
+	 * Mesos provides a facility for a framework to specify which containerizer to use.
+     */
+	public ContainerType containerType() {
+		return containerType;
+	}
+
+	/**
+	 * Get the container image name.
+     */
+	public Option<String> containerImageName() {
+		return containerImageName;
+	}
+
+	/**
 	 * Get the common containered parameters.
      */
 	public ContaineredTaskManagerParameters containeredParameters() {
@@ -60,6 +110,8 @@ public class MesosTaskManagerParameters {
 	public String toString() {
 		return "MesosTaskManagerParameters{" +
 			"cpus=" + cpus +
+			", containerType=" + containerType +
+			", containerImageName=" + containerImageName +
 			", containeredParameters=" + containeredParameters +
 			'}';
 	}
@@ -67,15 +119,49 @@ public class MesosTaskManagerParameters {
 	/**
 	 * Create the Mesos TaskManager parameters.
 	 * @param flinkConfig the TM configuration.
-	 * @param containeredParameters additional containered parameters.
      */
-	public static MesosTaskManagerParameters create(
-		Configuration flinkConfig,
-		ContaineredTaskManagerParameters containeredParameters) {
+	public static MesosTaskManagerParameters create(Configuration flinkConfig) {
 
-		double cpus = flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS,
-			Math.max(containeredParameters.numSlots(), 1.0));
+		// parse the common parameters
+		ContaineredTaskManagerParameters containeredParameters = ContaineredTaskManagerParameters.create(
+			flinkConfig,
+			flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB),
+			flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
+
+		double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS);
+		if(cpus <= 0.0) {
+			cpus = Math.max(containeredParameters.numSlots(), 1.0);
+		}
+
+		// parse the containerization parameters
+		String imageName = flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME);
+
+		ContainerType containerType;
+		String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
+		switch(containerTypeString) {
+			case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
+				containerType = ContainerType.MESOS;
+				break;
+			case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
+				containerType = ContainerType.DOCKER;
+				if(imageName == null || imageName.length() == 0) {
+					throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +
+						" must be specified for docker container type");
+				}
+				break;
+			default:
+				throw new IllegalConfigurationException("invalid container type: " + containerTypeString);
+		}
+
+		return new MesosTaskManagerParameters(
+			cpus,
+			containerType,
+			Option.apply(imageName),
+			containeredParameters);
+	}
 
-		return new MesosTaskManagerParameters(cpus, containeredParameters);
+	public enum ContainerType {
+		MESOS,
+		DOCKER
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 5100deb..75b5043 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,15 +18,20 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -35,7 +40,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,24 +51,33 @@ public class MesosTaskManagerRunner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class);
 
+	private static final Options ALL_OPTIONS;
+
+	static {
+		ALL_OPTIONS =
+			new Options()
+				.addOption(BootstrapTools.newDynamicPropertiesOption());
+	}
+
 	/** The process environment variables */
 	private static final Map<String, String> ENV = System.getenv();
 
-	public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws IOException {
+	public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception {
 		EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args);
 		SignalHandler.register(LOG);
 		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
 		// try to parse the command line arguments
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
 		final Configuration configuration;
 		try {
-			configuration = TaskManager.parseArgsAndLoadConfig(args);
-
-			// add dynamic properties to TaskManager configuration.
-			final Configuration dynamicProperties =
-				FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+			GlobalConfiguration.setDynamicProperties(dynamicProperties);
 			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
-			configuration.addAll(dynamicProperties);
+
+			configuration = GlobalConfiguration.loadConfiguration();
 		}
 		catch (Throwable t) {
 			LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
@@ -74,7 +87,6 @@ public class MesosTaskManagerRunner {
 
 		// read the environment variables
 		final Map<String, String> envs = System.getenv();
-		final String effectiveUsername = envs.get(MesosConfigKeys.ENV_HADOOP_USER_NAME);
 		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
 
 		// configure local directory
@@ -88,20 +100,12 @@ public class MesosTaskManagerRunner {
 			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
 		}
 
-		final String keytab = envs.get(MesosConfigKeys.ENV_KEYTAB);
-		LOG.info("Keytab file:{}", keytab);
-
-		final String principal = envs.get(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL);
-		LOG.info("Keytab principal:{}", principal);
-
-		if(keytab != null && keytab.length() != 0) {
-			File f = new File(".", keytab);
-			if(!f.exists()) {
-				LOG.error("Could not locate keytab file:[" + keytab + "]");
-				System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-			}
-			configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab);
-			configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, principal);
+		// configure the default filesystem
+		try {
+			FileSystem.setDefaultScheme(configuration);
+		} catch (IOException e) {
+			throw new IOException("Error while setting the default " +
+				"filesystem scheme from configuration.", e);
 		}
 
 		// tell akka to die in case of an error
@@ -112,23 +116,17 @@ public class MesosTaskManagerRunner {
 		final ResourceID resourceId = new ResourceID(containerID);
 		LOG.info("ResourceID assigned for this container: {}", resourceId);
 
-		String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
-		LOG.info("hadoopConfDir: {}", hadoopConfDir);
-
+		// Run the TM in the security context
 		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
-		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
-			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
-		}
+		sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+		SecurityUtils.install(sc);
 
 		try {
-			SecurityUtils.install(sc);
-			LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
-					UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
-			SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
+			SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 				@Override
-				public Object call() throws Exception {
+				public Integer call() throws Exception {
 					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
-					return null;
+					return 0;
 				}
 			});
 		}
@@ -136,6 +134,5 @@ public class MesosTaskManagerRunner {
 			LOG.error("Error while starting the TaskManager", t);
 			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
 		}
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
new file mode 100644
index 0000000..a6a26dc
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.flink.core.fs.Path;
+import scala.Option;
+
+import java.net.URL;
+
+/**
+ * An interface for resolving artifact URIs.
+ */
+public interface MesosArtifactResolver {
+	Option<URL> resolve(Path remoteFile);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index fbf61ac..37cb260 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -26,7 +26,6 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.DefaultFileRegion;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -43,24 +42,32 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.stream.ChunkedStream;
+
+import io.netty.handler.stream.ChunkedWriteHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.CharsetUtil;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.jets3t.service.utils.Mimetypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
 
 import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -82,7 +89,7 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
  * http://mesos.apache.org/documentation/latest/fetcher/
  * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
  */
-public class MesosArtifactServer {
+public class MesosArtifactServer implements MesosArtifactResolver {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class);
 
@@ -92,17 +99,19 @@ public class MesosArtifactServer {
 
 	private Channel serverChannel;
 
-	private URL baseURL;
+	private final URL baseURL;
+
+	private final Map<Path,URL> paths = new HashMap<>();
 
 	private final SSLContext serverSSLContext;
 
-	public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort, Configuration config)
-			throws Exception {
+	public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config)
+		throws Exception {
 		if (configuredPort < 0 || configuredPort > 0xFFFF) {
 			throw new IllegalArgumentException("File server port is invalid: " + configuredPort);
 		}
 
-		// Config to enable https access to the web-ui
+		// Config to enable https access to the artifact server
 		boolean enableSSL = config.getBoolean(
 				ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
 				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
@@ -136,6 +145,7 @@ public class MesosArtifactServer {
 
 				ch.pipeline()
 					.addLast(new HttpServerCodec())
+					.addLast(new ChunkedWriteHandler())
 					.addLast(handler.name(), handler)
 					.addLast(new UnknownFileHandler());
 			}
@@ -159,11 +169,15 @@ public class MesosArtifactServer {
 
 		String httpProtocol = (serverSSLContext != null) ? "https": "http";
 
-		baseURL = new URL(httpProtocol, serverHostname, port, "/" + sessionID + "/");
+		baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
 
 		LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", baseURL, address, port);
 	}
 
+	public URL baseURL() {
+		return baseURL;
+	}
+
 	/**
 	 * Get the server port on which the artifact server is listening.
 	 */
@@ -185,13 +199,51 @@ public class MesosArtifactServer {
 	 * @param remoteFile the remote path with which to locate the file.
 	 * @return the fully-qualified remote path to the file.
 	 * @throws MalformedURLException if the remote path is invalid.
+     */
+	public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException {
+		return addPath(new Path(localFile.toURI()), new Path(remoteFile));
+	}
+
+	/**
+	 * Adds a path to the artifact server.
+	 * @param path the qualified FS path to serve (local, hdfs, etc).
+	 * @param remoteFile the remote path with which to locate the file.
+	 * @return the fully-qualified remote path to the file.
+	 * @throws MalformedURLException if the remote path is invalid.
 	 */
-	public synchronized URL addFile(File localFile, String remoteFile) throws MalformedURLException {
-		URL fileURL = new URL(baseURL, remoteFile);
-		router.ANY(fileURL.getPath(), new VirtualFileServerHandler(localFile));
+	public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException {
+		if(paths.containsKey(remoteFile)) {
+			throw new IllegalArgumentException("duplicate path registered");
+		}
+		if(remoteFile.isAbsolute()) {
+			throw new IllegalArgumentException("not expecting an absolute path");
+		}
+		URL fileURL = new URL(baseURL, remoteFile.toString());
+		router.ANY(fileURL.getPath(), new VirtualFileServerHandler(path));
+
+		paths.put(remoteFile, fileURL);
+
 		return fileURL;
 	}
 
+	public synchronized void removePath(Path remoteFile) {
+		if(paths.containsKey(remoteFile)) {
+			URL fileURL = null;
+			try {
+				fileURL = new URL(baseURL, remoteFile.toString());
+			} catch (MalformedURLException e) {
+				throw new RuntimeException(e);
+			}
+			router.removePath(fileURL.getPath());
+		}
+	}
+
+	@Override
+	public synchronized Option<URL> resolve(Path remoteFile) {
+		Option<URL> resolved = Option.apply(paths.get(remoteFile));
+		return resolved;
+	}
+
 	/**
 	 * Stops the artifact server.
 	 * @throws Exception
@@ -215,12 +267,17 @@ public class MesosArtifactServer {
 	@ChannelHandler.Sharable
 	public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<Routed> {
 
-		private final File file;
+		private FileSystem fs;
+		private Path path;
 
-		public VirtualFileServerHandler(File file) {
-			this.file = file;
-			if(!file.exists()) {
-				throw new IllegalArgumentException("no such file: " + file.getAbsolutePath());
+		public VirtualFileServerHandler(Path path) throws IOException {
+			this.path = path;
+			if(!path.isAbsolute()) {
+				throw new IllegalArgumentException("path must be absolute: " + path.toString());
+			}
+			this.fs = path.getFileSystem();
+			if(!fs.exists(path) || fs.getFileStatus(path).isDir()) {
+				throw new IllegalArgumentException("no such file: " + path.toString());
 			}
 		}
 
@@ -230,7 +287,7 @@ public class MesosArtifactServer {
 			HttpRequest request = routed.request();
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} request for file '{}'", request.getMethod(), file.getAbsolutePath());
+				LOG.debug("{} request for file '{}'", request.getMethod(), path);
 			}
 
 			if(!(request.getMethod() == GET || request.getMethod() == HEAD)) {
@@ -238,47 +295,40 @@ public class MesosArtifactServer {
 				return;
 			}
 
-			final RandomAccessFile raf;
+
+			final FileStatus status;
 			try {
-				raf = new RandomAccessFile(file, "r");
+				status = fs.getFileStatus(path);
 			}
-			catch (FileNotFoundException e) {
+			catch (IOException e) {
+				LOG.error("unable to stat file", e);
 				sendError(ctx, GONE);
 				return;
 			}
-			try {
-				long fileLength = raf.length();
 
-				// compose the response
-				HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-				if (HttpHeaders.isKeepAlive(request)) {
-					response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-				}
-				HttpHeaders.setHeader(response, CACHE_CONTROL, "private");
-				HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM);
-				HttpHeaders.setContentLength(response, fileLength);
+			// compose the response
+			HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+			HttpHeaders.setHeader(response, CONNECTION, HttpHeaders.Values.CLOSE);
+			HttpHeaders.setHeader(response, CACHE_CONTROL, "private");
+			HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM);
+			HttpHeaders.setContentLength(response, status.getLen());
 
-				ctx.write(response);
+			ctx.write(response);
 
-				if (request.getMethod() == GET) {
-					// write the content.  Netty's DefaultFileRegion will close the file.
-					ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
+			if (request.getMethod() == GET) {
+				// write the content.  Netty will close the stream.
+				final FSDataInputStream stream = fs.open(path);
+				try {
+					ctx.write(new ChunkedStream(stream));
 				}
-				else {
-					// close the file immediately in HEAD case
-					raf.close();
+				catch(Exception e) {
+					stream.close();
+					throw e;
 				}
-				ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-
-				// close the connection, if no keep-alive is needed
-				if (!HttpHeaders.isKeepAlive(request)) {
-					lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-				}
-			}
-			catch(Exception ex) {
-				raf.close();
-				throw ex;
 			}
+
+			ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index f287e13..93ccf68 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -33,10 +33,12 @@ import org.apache.flink.mesos.scheduler.LaunchCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
 import org.apache.flink.mesos.scheduler.messages.*;
 import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.*;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -79,6 +81,7 @@ public class MesosFlinkResourceManagerTest {
 
 	private static Configuration config = new Configuration() {{
 		setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
+		setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
 	}};
 
 	@BeforeClass
@@ -107,12 +110,13 @@ public class MesosFlinkResourceManagerTest {
 			MesosWorkerStore workerStore,
 			LeaderRetrievalService leaderRetrievalService,
 			MesosTaskManagerParameters taskManagerParameters,
-			Protos.TaskInfo.Builder taskManagerLaunchContext,
+			ContainerSpecification taskManagerContainerSpec,
+			MesosArtifactResolver artifactResolver,
 			int maxFailedTasks,
 			int numInitialTaskManagers) {
 
 			super(flinkConfig, mesosConfig, workerStore, leaderRetrievalService, taskManagerParameters,
-				taskManagerLaunchContext, maxFailedTasks, numInitialTaskManagers);
+				taskManagerContainerSpec, artifactResolver, maxFailedTasks, numInitialTaskManagers);
 		}
 
 		@Override
@@ -141,6 +145,7 @@ public class MesosFlinkResourceManagerTest {
 		public LeaderRetrievalService retrievalService;
 		public MesosConfiguration mesosConfig;
 		public MesosWorkerStore workerStore;
+		public MesosArtifactResolver artifactResolver;
 		public SchedulerDriver schedulerDriver;
 		public TestingMesosFlinkResourceManager resourceManagerInstance;
 		public ActorGateway resourceManager;
@@ -176,6 +181,9 @@ public class MesosFlinkResourceManagerTest {
 				// worker store
 				workerStore = mock(MesosWorkerStore.class);
 				when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
+
+				// artifact
+				artifactResolver = mock(MesosArtifactResolver.class);
 			} catch (Exception ex) {
 				throw new RuntimeException(ex);
 			}
@@ -185,15 +193,16 @@ public class MesosFlinkResourceManagerTest {
 		 * Initialize the resource manager.
 		 */
 		public void initialize() {
+			ContainerSpecification containerSpecification = new ContainerSpecification();
 			ContaineredTaskManagerParameters containeredParams =
 				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
-			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(1.0, containeredParams);
-			Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder();
+			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
+				1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams);
 
 			TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
 				TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(
 					TestingMesosFlinkResourceManager.class,
-					config, mesosConfig, workerStore, retrievalService, tmParams, taskInfo, 0, LOG));
+					config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG));
 			resourceManagerInstance = resourceManagerRef.underlyingActor();
 			resourceManager = new AkkaActorGateway(resourceManagerRef, null);
 


Mime
View raw message