flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/3] flink git commit: [FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility
Date Mon, 05 Dec 2016 23:29:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3b85f42dc -> 8d7c3ff08


http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index d844f5d..2a33c44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -23,6 +23,8 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import com.typesafe.config.Config;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,6 +292,40 @@ public class BootstrapTools {
 		config.addAll(replacement);
 	}
 
+	private static final String DYNAMIC_PROPERTIES_OPT = "D";
+
+	/**
+	 * Get an instance of the dynamic properties option.
+	 *
+	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
+	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
+     */
+	public static Option newDynamicPropertiesOption() {
+		return new Option(DYNAMIC_PROPERTIES_OPT, true, "Dynamic properties");
+	}
+
+	/**
+	 * Parse the dynamic properties (passed on the command line).
+	 */
+	public static Configuration parseDynamicProperties(CommandLine cmd) {
+		final Configuration config = new Configuration();
+
+		String[] values = cmd.getOptionValues(DYNAMIC_PROPERTIES_OPT);
+		if(values != null) {
+			for(String value : values) {
+				String[] pair = value.split("=", 2);
+				if(pair.length == 1) {
+					config.setString(pair[0], Boolean.TRUE.toString());
+				}
+				else if(pair.length == 2) {
+					config.setString(pair[0], pair[1]);
+				}
+			}
+		}
+
+		return config;
+	}
+
 	/**
 	 * Generates the shell command to start a task manager.
 	 * @param flinkConfig The Flink configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
new file mode 100644
index 0000000..508a28c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates a container specification, including artifacts, environment variables,
+ * system properties, and Flink configuration settings.
+ *
+ * The specification is mutable.
+ *
+ * Note that the Flink configuration settings are considered dynamic overrides of whatever
+ * static configuration file is present in the container.  For example, a container might be
+ * based on a Docker image with a normal Flink installation with customized settings, which these
+ * settings would (partially) override.
+ *
+ * Artifacts are copied into a sandbox directory within the container, which any Flink process
+ * launched in the container is assumed to use as a working directory.  This assumption allows
+ * for relative paths to be used in certain environment variables.
+ */
+public class ContainerSpecification implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Configuration systemProperties;
+
+	private final List<Artifact> artifacts;
+
+	private final Map<String,String> environmentVariables;
+
+	private final Configuration dynamicConfiguration;
+
+	public ContainerSpecification() {
+		this.artifacts = new LinkedList<>();
+		this.environmentVariables = new HashMap<String,String>();
+		this.systemProperties = new Configuration();
+		this.dynamicConfiguration = new Configuration();
+	}
+
+	/**
+	 * Get the container artifacts.
+     */
+	public List<Artifact> getArtifacts() {
+		return artifacts;
+	}
+
+	/**
+	 * Get the environment variables.
+     */
+	public Map<String, String> getEnvironmentVariables() {
+		return environmentVariables;
+	}
+
+	/**
+	 * Get the dynamic configuration.
+     */
+	public Configuration getDynamicConfiguration() {
+		return dynamicConfiguration;
+	}
+
+	/**
+	 * Get the system properties.
+     */
+	public Configuration getSystemProperties() {
+		return systemProperties;
+	}
+
+	@Override
+	protected Object clone() throws CloneNotSupportedException {
+		ContainerSpecification clone = new ContainerSpecification();
+		clone.artifacts.addAll(this.artifacts);
+		clone.environmentVariables.putAll(this.environmentVariables);
+		clone.systemProperties.addAll(this.systemProperties);
+		clone.dynamicConfiguration.addAll(this.dynamicConfiguration);
+		return clone;
+	}
+
+	@Override
+	public String toString() {
+		return "ContainerSpecification{" +
+			"environmentVariables=" + environmentVariables +
+			", systemProperties=" + systemProperties +
+			", dynamicConfiguration=" + dynamicConfiguration +
+			", artifacts=" + artifacts +
+			'}';
+	}
+
+	/**
+	 * An artifact to be copied into the container.
+	 */
+	public static class Artifact {
+
+		public Artifact(Path source, Path dest, boolean executable, boolean cachable, boolean extract) {
+			checkArgument(source.isAbsolute(), "source must be absolute");
+			checkArgument(!dest.isAbsolute(), "destination must be relative");
+			this.source = source;
+			this.dest = dest;
+			this.executable = executable;
+			this.cachable = cachable;
+			this.extract = extract;
+		}
+
+		public final Path source;
+		public final Path dest;
+		public final boolean executable;
+		public final boolean cachable;
+		public final boolean extract;
+
+		@Override
+		public String toString() {
+			return "Artifact{" +
+				"source=" + source +
+				", dest=" + dest +
+				", executable=" + executable +
+				", cachable=" + cachable +
+				", extract=" + extract +
+				'}';
+		}
+
+		public static Builder newBuilder() { return new Builder(); }
+
+		public static class Builder {
+
+			public Path source;
+			public Path dest;
+			public boolean executable = false;
+			public boolean cachable = true;
+			public boolean extract = false;
+
+			public Builder setSource(Path source) {
+				this.source = source;
+				return this;
+			}
+
+			public Builder setDest(Path dest) {
+				this.dest = dest;
+				return this;
+			}
+
+			public Builder setCachable(boolean cachable) {
+				this.cachable = cachable;
+				return this;
+			}
+
+			public Builder setExtract(boolean extract) {
+				this.extract = extract;
+				return this;
+			}
+
+			public Builder setExecutable(boolean executable) {
+				this.executable = executable;
+				return this;
+			}
+
+			public Artifact build() {
+				return new Artifact(source, dest, executable, cachable, extract);
+			}
+		}
+	}
+
+	/**
+	 * Format the system properties as a shell-compatible command-line argument.
+     */
+	public static String formatSystemProperties(Configuration jvmArgs) {
+		StringBuilder sb = new StringBuilder();
+		for(Map.Entry<String,String> entry : jvmArgs.toMap().entrySet()) {
+			if(sb.length() > 0) {
+				sb.append(" ");
+			}
+			boolean quoted = entry.getValue().contains(" ");
+			if(quoted) {
+				sb.append("\"");
+			}
+			sb.append("-D").append(entry.getKey()).append('=').append(entry.getValue());
+			if(quoted) {
+				sb.append("\"");
+			}
+		}
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
new file mode 100644
index 0000000..007146a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+
+/**
+ * An abstract container overlay.
+ */
+abstract class AbstractContainerOverlay implements ContainerOverlay {
+
+	/**
+	 * Add a path recursively to the container specification.
+	 *
+	 * If the path is a directory, the directory itself (not just its contents) is added to the target path.
+	 *
+	 * The execute bit is preserved; permissions aren't.
+	 *
+	 * @param sourcePath the path to add.
+	 * @param targetPath the target path.
+	 * @param env the specification to mutate.
+     * @throws IOException
+     */
+	protected void addPathRecursively(
+		final File sourcePath, final Path targetPath, final ContainerSpecification env) throws IOException {
+
+		final java.nio.file.Path sourceRoot = sourcePath.toPath().getParent();
+
+		Files.walkFileTree(sourcePath.toPath(), new SimpleFileVisitor<java.nio.file.Path>() {
+			@Override
+			public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException {
+
+				java.nio.file.Path relativePath = sourceRoot.relativize(file);
+
+				ContainerSpecification.Artifact.Builder artifact = ContainerSpecification.Artifact.newBuilder()
+					.setSource(new Path(file.toUri()))
+					.setDest(new Path(targetPath, relativePath.toString()))
+					.setExecutable(Files.isExecutable(file))
+					.setCachable(true)
+					.setExtract(false);
+
+				env.getArtifacts().add(artifact.build());
+
+				return super.visitFile(file, attrs);
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
new file mode 100644
index 0000000..11e8f21
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A composite overlay that delegates to a set of inner overlays.
+ */
+public class CompositeContainerOverlay implements ContainerOverlay {
+
+	private final List<ContainerOverlay> overlays;
+
+	public CompositeContainerOverlay(ContainerOverlay... overlays) {
+		this(Arrays.asList(overlays));
+	}
+
+	public CompositeContainerOverlay(List<ContainerOverlay> overlays) {
+		this.overlays = Collections.unmodifiableList(overlays);
+	}
+
+	@Override
+	public void configure(ContainerSpecification containerConfig) throws IOException {
+		for(ContainerOverlay overlay : overlays) {
+			overlay.configure(containerConfig);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
new file mode 100644
index 0000000..62826e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.IOException;
+
+/**
+ * A container overlay to produce a container specification.
+ *
+ * An overlay applies configuration elements, environment variables,
+ * system properties, and artifacts to a container specification.
+ */
+public interface ContainerOverlay {
+
+	/**
+	 * Configure the given container specification.
+     */
+	void configure(ContainerSpecification containerSpecification) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
new file mode 100644
index 0000000..a36cc67
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Overlays Flink into a container, based on supplied bin/conf/lib directories.
+ *
+ * The overlayed Flink is indistinguishable from (and interchangeable with)
+ * a normal installation of Flink.  For a docker image-based container, it should be
+ * possible to bypass this overlay and rely on the normal installation method.
+ *
+ * The following files are copied to the container:
+ *  - flink/bin/
+ *  - flink/conf/
+ *  - flink/lib/
+ */
+public class FlinkDistributionOverlay extends AbstractContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkDistributionOverlay.class);
+
+	static final Path TARGET_ROOT = new Path("flink");
+
+	final File flinkBinPath;
+	final File flinkConfPath;
+	final File flinkLibPath;
+
+	public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath) {
+		this.flinkBinPath = checkNotNull(flinkBinPath);
+		this.flinkConfPath = checkNotNull(flinkConfPath);
+		this.flinkLibPath = checkNotNull(flinkLibPath);
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+
+		container.getEnvironmentVariables().put(ENV_FLINK_HOME_DIR, TARGET_ROOT.toString());
+
+		// add the paths to the container specification.
+		addPathRecursively(flinkBinPath, TARGET_ROOT, container);
+		addPathRecursively(flinkConfPath, TARGET_ROOT, container);
+		addPathRecursively(flinkLibPath, TARGET_ROOT, container);
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link FlinkDistributionOverlay}.
+	 */
+	public static class Builder {
+		File flinkBinPath;
+		File flinkConfPath;
+		File flinkLibPath;
+
+		/**
+		 * Configures the overlay using the current environment.
+		 *
+		 * Locates Flink using FLINK_???_DIR environment variables as provided to all Flink processes by config.sh.
+		 *
+		 * @param globalConfiguration the current configuration.
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration) {
+
+			Map<String,String> env = System.getenv();
+			if(env.containsKey(ENV_FLINK_BIN_DIR)) {
+				flinkBinPath = new File(System.getenv(ENV_FLINK_BIN_DIR));
+			}
+			else {
+				throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_BIN_DIR));
+			}
+
+			if(env.containsKey(ENV_FLINK_CONF_DIR)) {
+				flinkConfPath = new File(System.getenv(ENV_FLINK_CONF_DIR));
+			}
+			else {
+				throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_CONF_DIR));
+			}
+
+			if(env.containsKey(ENV_FLINK_LIB_DIR)) {
+				flinkLibPath = new File(System.getenv(ENV_FLINK_LIB_DIR));
+			}
+			else {
+				throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_LIB_DIR));
+			}
+
+			return this;
+		}
+
+		public FlinkDistributionOverlay build() {
+			return new FlinkDistributionOverlay(flinkBinPath, flinkConfPath, flinkLibPath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
new file mode 100644
index 0000000..bd79218
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Overlays a Hadoop configuration into a container, based on a supplied Hadoop
+ * configuration directory.
+ *
+ * The following files are copied to the container:
+ *  - hadoop/conf/core-site.xml
+ *  - hadoop/conf/hdfs-site.xml
+ *
+ * The following environment variables are set in the container:
+ *  - HADOOP_CONF_DIR
+ *
+ * The folloowing Flink configuration entries are updated:
+ *  - fs.hdfs.hadoopconf
+ */
+public class HadoopConfOverlay implements ContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopConfOverlay.class);
+
+	/**
+	 * The (relative) directory into which the Hadoop conf is copied.
+	 */
+	static final Path TARGET_CONF_DIR = new Path("hadoop/conf");
+
+	final File hadoopConfDir;
+
+	public HadoopConfOverlay(@Nullable File hadoopConfDir) {
+		this.hadoopConfDir = hadoopConfDir;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+
+		if(hadoopConfDir == null) {
+			return;
+		}
+
+		File coreSitePath = new File(hadoopConfDir, "core-site.xml");
+		File hdfsSitePath = new File(hadoopConfDir, "hdfs-site.xml");
+
+		container.getEnvironmentVariables().put("HADOOP_CONF_DIR", TARGET_CONF_DIR.toString());
+		container.getDynamicConfiguration().setString(ConfigConstants.PATH_HADOOP_CONFIG, TARGET_CONF_DIR.toString());
+
+		container.getArtifacts().add(ContainerSpecification.Artifact
+			.newBuilder()
+			.setSource(new Path(coreSitePath.toURI()))
+			.setDest(new Path(TARGET_CONF_DIR, coreSitePath.getName()))
+			.setCachable(true)
+			.build());
+
+		container.getArtifacts().add(ContainerSpecification.Artifact
+			.newBuilder()
+			.setSource(new Path(hdfsSitePath.toURI()))
+			.setDest(new Path(TARGET_CONF_DIR, hdfsSitePath.getName()))
+			.setCachable(true)
+			.build());
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link HadoopConfOverlay}.
+	 */
+	public static class Builder {
+
+		File hadoopConfDir;
+
+		/**
+		 * Configures the overlay using the current environment's Hadoop configuration.
+		 *
+		 * The following locations are checked for a Hadoop configuration:
+		 *  - (conf) fs.hdfs.hadoopconf
+		 *  - (env)  HADOOP_CONF_DIR
+		 *  - (env)  HADOOP_HOME/conf
+		 *  - (env)  HADOOP_HOME/etc/hadoop
+		 *
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration) {
+
+			String[] possibleHadoopConfPaths = new String[4];
+			possibleHadoopConfPaths[0] = globalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+			possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
+
+			if (System.getenv("HADOOP_HOME") != null) {
+				possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf";
+				possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
+			}
+
+			for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+				if (possibleHadoopConfPath != null) {
+					File confPath = new File(possibleHadoopConfPath);
+
+					File coreSitePath = new File(confPath, "core-site.xml");
+					File hdfsSitePath = new File(confPath, "hdfs-site.xml");
+
+					if (coreSitePath.exists() && hdfsSitePath.exists()) {
+						this.hadoopConfDir = confPath;
+						break;
+					}
+				}
+			}
+
+			if(hadoopConfDir == null) {
+				LOG.warn("Unable to locate a Hadoop configuration; HDFS will use defaults.");
+			}
+
+			return this;
+		}
+
+		public HadoopConfOverlay build() {
+			return new HadoopConfOverlay(hadoopConfDir);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
new file mode 100644
index 0000000..7081aea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * Overlays a Hadoop user context into a container.
+ *
+ * The overlay essentially configures Hadoop's {@link UserGroupInformation} class,
+ * establishing the effective username for filesystem calls to HDFS in non-secure clusters.
+ *
+ * In secure clusters, the configured keytab establishes the effective user.
+ *
+ * The following environment variables are set in the container:
+ *  - HADOOP_USER_NAME
+ */
+public class HadoopUserOverlay implements ContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopUserOverlay.class);
+
+	private final UserGroupInformation ugi;
+
+	public HadoopUserOverlay(@Nullable UserGroupInformation ugi) {
+		this.ugi = ugi;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+		if(ugi != null) {
+			// overlay the Hadoop user identity (w/ tokens)
+			container.getEnvironmentVariables().put("HADOOP_USER_NAME", ugi.getUserName());
+		}
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link HadoopUserOverlay}.
+	 */
+	public static class Builder {
+
+		UserGroupInformation ugi;
+
+		/**
+		 * Configures the overlay using the current Hadoop user information (from {@link UserGroupInformation}).
+         */
+		public Builder fromEnvironment(Configuration globalConfiguration) throws IOException {
+			ugi = UserGroupInformation.getCurrentUser();
+			return this;
+		}
+
+		public HadoopUserOverlay build() {
+			return new HadoopUserOverlay(ugi);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
new file mode 100644
index 0000000..7fe5b3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container.
+ *
+ * The folloowing Flink configuration entries are updated:
+ *  - security.keytab
+ */
+public class KeytabOverlay extends AbstractContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KeytabOverlay.class);
+
+	static final Path TARGET_PATH = new Path("krb5.keytab");
+
+	final Path keytab;
+
+	public KeytabOverlay(@Nullable File keytab) {
+		this.keytab = keytab != null ? new Path(keytab.toURI()) : null;
+	}
+
+	public KeytabOverlay(@Nullable Path keytab) {
+		this.keytab = keytab;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+		if(keytab != null) {
+			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+				.setSource(keytab)
+				.setDest(TARGET_PATH)
+				.setCachable(false)
+				.build());
+			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath());
+		}
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link HadoopUserOverlay}.
+	 */
+	public static class Builder {
+
+		File keytabPath;
+
+		/**
+		 * Configures the overlay using the current environment (and global configuration).
+		 *
+		 * The following Flink configuration settings are checked for a keytab:
+		 *  - security.keytab
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration) {
+			String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+			if(keytab != null) {
+				keytabPath = new File(keytab);
+				if(!keytabPath.exists()) {
+					throw new IllegalStateException("Invalid configuration for " +
+						ConfigConstants.SECURITY_KEYTAB_KEY +
+						"; '" + keytab + "' not found.");
+				}
+			}
+
+			return this;
+		}
+
+		public KeytabOverlay build() {
+			return new KeytabOverlay(keytabPath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
new file mode 100644
index 0000000..fb161b9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays a Kerberos configuration file into a container.
+ *
+ * The following files are copied to the container:
+ *  - krb5.conf
+ *
+ * The following Java system properties are set in the container:
+ *  - java.security.krb5.conf
+ */
+public class Krb5ConfOverlay extends AbstractContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Krb5ConfOverlay.class);
+
+	static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
+
+	static final Path TARGET_PATH = new Path("krb5.conf");
+	final Path krb5Conf;
+
+	public Krb5ConfOverlay(@Nullable File krb5Conf) {
+		this.krb5Conf = krb5Conf != null ? new Path(krb5Conf.toURI()) : null;
+	}
+
+	public Krb5ConfOverlay(@Nullable Path krb5Conf) {
+		this.krb5Conf = krb5Conf;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+		if(krb5Conf != null) {
+			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+				.setSource(krb5Conf)
+				.setDest(TARGET_PATH)
+				.setCachable(true)
+				.build());
+			container.getSystemProperties().setString(JAVA_SECURITY_KRB5_CONF, TARGET_PATH.getPath());
+		}
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link Krb5ConfOverlay}.
+	 */
+	public static class Builder {
+
+		File krb5ConfPath;
+
+		/**
+		 * Configures the overlay using the current environment.
+		 *
+		 * Locates the krb5.conf configuration file as per
+		 * <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">Java documentation</a>.
+		 * Note that the JRE doesn't support the KRB5_CONFIG environment variable (JDK-7045913).
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration) {
+
+			// check the system property
+			String krb5Config = System.getProperty(JAVA_SECURITY_KRB5_CONF);
+			if(krb5Config != null && krb5Config.length() != 0) {
+				krb5ConfPath = new File(krb5Config);
+				if(!krb5ConfPath.exists()) {
+					throw new IllegalStateException("java.security.krb5.conf refers to a non-existent file");
+				}
+			}
+
+			// FUTURE: check the well-known paths
+			// - $JAVA_HOME/lib/security
+			// - %WINDIR%\krb5.ini (Windows)
+			// - /etc/krb5.conf (Linux)
+
+			return this;
+		}
+
+		public Krb5ConfOverlay build() {
+			return new Krb5ConfOverlay(krb5ConfPath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
new file mode 100644
index 0000000..dd79ca1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays an SSL keystore/truststore into a container.
+ *
+ * The following files are placed into the container:
+ *  - keystore.jks
+ *  - truststore.jks
+ *
+ * The following Flink configuration entries are set:
+ *  - security.ssl.keystore
+ *  - security.ssl.truststore
+ */
+public class SSLStoreOverlay extends AbstractContainerOverlay {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SSLStoreOverlay.class);
+
+	static final Path TARGET_KEYSTORE_PATH = new Path("keystore.jks");
+	static final Path TARGET_TRUSTSTORE_PATH = new Path("truststore.jks");
+
+	final Path keystore;
+	final Path truststore;
+
+	public SSLStoreOverlay(@Nullable File keystoreFile, @Nullable File truststoreFile) {
+		this.keystore = keystoreFile != null ? new Path(keystoreFile.toURI()) : null;
+		this.truststore = truststoreFile != null ? new Path(truststoreFile.toURI()) : null;
+	}
+
+	@Override
+	public void configure(ContainerSpecification container) throws IOException {
+		if(keystore != null) {
+			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+				.setSource(keystore)
+				.setDest(TARGET_KEYSTORE_PATH)
+				.setCachable(false)
+				.build());
+			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath());
+		}
+		if(truststore != null) {
+			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+				.setSource(truststore)
+				.setDest(TARGET_TRUSTSTORE_PATH)
+				.setCachable(false)
+				.build());
+			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath());
+		}
+	}
+
+	public static Builder newBuilder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder for the {@link Krb5ConfOverlay}.
+	 */
+	public static class Builder {
+
+		File keystorePath;
+
+		File truststorePath;
+
+		/**
+		 * Configures the overlay using the current environment (and global configuration).
+		 *
+		 * The following Flink configuration settings are used to source the keystore and truststore:
+		 *  - security.ssl.keystore
+		 *  - security.ssl.truststore
+		 */
+		public Builder fromEnvironment(Configuration globalConfiguration)  {
+
+			String keystore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null);
+			if(keystore != null) {
+				keystorePath = new File(keystore);
+				if(!keystorePath.exists()) {
+					throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_KEYSTORE);
+				}
+			}
+
+			String truststore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null);
+			if(truststore != null) {
+				truststorePath = new File(truststore);
+				if(!truststorePath.exists()) {
+					throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_TRUSTSTORE);
+				}
+			}
+
+			return this;
+		}
+
+		public SSLStoreOverlay build() {
+			return new SSLStoreOverlay(keystorePath, truststorePath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index f6e0a8c..7416cc6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -53,7 +53,7 @@ public class SecurityUtils {
 
 	public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
 
-	private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
+	public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
 
 	private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client";
 
@@ -130,6 +130,8 @@ public class SecurityUtils {
 				loginUser = UserGroupInformation.getLoginUser();
 			}
 
+			LOG.info("Hadoop user set to {}", loginUser.toString());
+
 			boolean delegationToken = false;
 			final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
 			Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
new file mode 100644
index 0000000..bbea376
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ContainerOverlayTestBase {
+
+	private Map<String, String> originalEnvironment;
+
+	@Before
+	public void before() {
+		originalEnvironment = new HashMap<>(System.getenv());
+	}
+
+	@After
+	public void after() {
+		CommonTestUtils.setEnv(originalEnvironment, true);
+	}
+
+
+	/**
+	 * Create an empty file for each given path.
+	 * @param root the root folder in which to create the files.
+	 * @param paths the relative paths to create.
+     */
+	protected static Path[] createPaths(File root, String... paths) throws Exception {
+		Path[] files = new Path[paths.length];
+		for(int i = 0; i < paths.length; i++) {
+			File file = root.toPath().resolve(paths[i]).toFile();
+			file.getParentFile().mkdirs();
+			file.createNewFile();
+			files[i] = new Path(paths[i]);
+		}
+		return files;
+	}
+
+	/**
+	 * Check that an artifact exists for the given remote path.
+     */
+	protected static ContainerSpecification.Artifact checkArtifact(ContainerSpecification spec, Path remotePath) {
+		for(ContainerSpecification.Artifact artifact : spec.getArtifacts()) {
+			if(remotePath.equals(artifact.dest)) {
+				return artifact;
+			}
+		}
+		throw new AssertionError("no such artifact (" + remotePath + ")");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
new file mode 100644
index 0000000..e77dd3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+
+import static org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay.TARGET_ROOT;
+
+public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File binFolder = tempFolder.newFolder("bin");
+		File libFolder = tempFolder.newFolder("lib");
+		File confFolder = tempFolder.newFolder("conf");
+
+		Path[] files = createPaths(
+			tempFolder.getRoot(),
+			"bin/config.sh",
+			"bin/taskmanager.sh",
+			"lib/foo.jar",
+			"lib/A/foo.jar",
+			"lib/B/foo.jar",
+			"lib/B/bar.jar");
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
+			binFolder,
+			confFolder,
+			libFolder
+		);
+		overlay.configure(containerSpecification);
+
+		for(Path file : files) {
+			checkArtifact(containerSpecification, new Path(TARGET_ROOT, file.toString()));
+		}
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+		Configuration conf = new Configuration();
+
+		File binFolder = tempFolder.newFolder("bin");
+		File libFolder = tempFolder.newFolder("lib");
+		File confFolder = tempFolder.newFolder("conf");
+
+		// adjust the test environment for the purposes of this test
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		map.put(ENV_FLINK_BIN_DIR, binFolder.getAbsolutePath());
+		map.put(ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
+		map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
+ 		CommonTestUtils.setEnv(map);
+
+		FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf);
+
+		assertEquals(binFolder.getAbsolutePath(), builder.flinkBinPath.getAbsolutePath());
+		assertEquals(libFolder.getAbsolutePath(), builder.flinkLibPath.getAbsolutePath());
+		assertEquals(confFolder.getAbsolutePath(), builder.flinkConfPath.getAbsolutePath());
+	}
+
+	@Test
+	public void testBuilderFromEnvironmentBad() throws Exception {
+		Configuration conf = new Configuration();
+
+		// adjust the test environment for the purposes of this test
+		Map<String, String> map = new HashMap<>(System.getenv());
+		map.remove(ENV_FLINK_BIN_DIR);
+		map.remove(ENV_FLINK_LIB_DIR);
+		map.remove(ENV_FLINK_CONF_DIR);
+		CommonTestUtils.setEnv(map);
+
+		try {
+			FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf);
+			fail();
+		}
+		catch(IllegalStateException e) {
+			// expected
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
new file mode 100644
index 0000000..c3ea41b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay.TARGET_CONF_DIR;
+
+public class HadoopConfOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File confDir = tempFolder.newFolder();
+		initConfDir(confDir);
+
+		HadoopConfOverlay overlay = new HadoopConfOverlay(confDir);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR"));
+		assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null));
+
+		checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml"));
+		checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml"));
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		HadoopConfOverlay overlay = new HadoopConfOverlay(null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+
+		// verify that the builder picks up various environment locations
+		HadoopConfOverlay.Builder builder;
+		Map<String, String> env;
+
+		// fs.hdfs.hadoopconf
+		File confDir = tempFolder.newFolder();
+		initConfDir(confDir);
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.PATH_HADOOP_CONFIG, confDir.getAbsolutePath());
+		builder = HadoopConfOverlay.newBuilder().fromEnvironment(conf);
+		assertEquals(confDir, builder.hadoopConfDir);
+
+		// HADOOP_CONF_DIR
+		env = new HashMap<String, String>(System.getenv());
+		env.remove("HADOOP_HOME");
+		env.put("HADOOP_CONF_DIR", confDir.getAbsolutePath());
+		CommonTestUtils.setEnv(env);
+		builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration());
+		assertEquals(confDir, builder.hadoopConfDir);
+
+		// HADOOP_HOME/conf
+		File homeDir = tempFolder.newFolder();
+		confDir = initConfDir(new File(homeDir, "conf"));
+		env = new HashMap<String, String>(System.getenv());
+		env.remove("HADOOP_CONF_DIR");
+		env.put("HADOOP_HOME", homeDir.getAbsolutePath());
+		CommonTestUtils.setEnv(env);
+		builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration());
+		assertEquals(confDir, builder.hadoopConfDir);
+
+		// HADOOP_HOME/etc/hadoop
+		homeDir = tempFolder.newFolder();
+		confDir = initConfDir(new File(homeDir, "etc/hadoop"));
+		env = new HashMap<String, String>(System.getenv());
+		env.remove("HADOOP_CONF_DIR");
+		env.put("HADOOP_HOME", homeDir.getAbsolutePath());
+		CommonTestUtils.setEnv(env);
+		builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration());
+		assertEquals(confDir, builder.hadoopConfDir);
+	}
+
+	private File initConfDir(File confDir) throws Exception {
+		confDir.mkdirs();
+		new File(confDir, "core-site.xml").createNewFile();
+		new File(confDir, "hdfs-site.xml").createNewFile();
+		return confDir;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
new file mode 100644
index 0000000..7a463b8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.security.PrivilegedAction;
+
+import static org.junit.Assert.assertEquals;
+
+public class HadoopUserOverlayTest extends ContainerOverlayTestBase {
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
+
+		HadoopUserOverlay overlay = new HadoopUserOverlay(ugi);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(ugi.getUserName(), spec.getEnvironmentVariables().get("HADOOP_USER_NAME"));
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		HadoopUserOverlay overlay = new HadoopUserOverlay(null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+
+		final Configuration conf = new Configuration();
+		final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
+
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					HadoopUserOverlay.Builder builder = HadoopUserOverlay.newBuilder().fromEnvironment(conf);
+					assertEquals(ugi, builder.ugi);
+					return null;
+				}
+				catch(Exception ex) {
+					throw new AssertionError(ex);
+				}
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
new file mode 100644
index 0000000..0570f28
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay.TARGET_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class KeytabOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File keytab = tempFolder.newFile();
+
+		KeytabOverlay overlay = new KeytabOverlay(keytab);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, null));
+		checkArtifact(spec, TARGET_PATH);
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		KeytabOverlay overlay = new KeytabOverlay((Path) null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+
+		final Configuration conf = new Configuration();
+		File keytab = tempFolder.newFile();
+
+		conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab.getAbsolutePath());
+		KeytabOverlay.Builder builder = KeytabOverlay.newBuilder().fromEnvironment(conf);
+		assertEquals(builder.keytabPath, keytab);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
new file mode 100644
index 0000000..1f86b89
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.JAVA_SECURITY_KRB5_CONF;
+import static org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.TARGET_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class Krb5ConfOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File krb5conf = tempFolder.newFile();
+
+		Krb5ConfOverlay overlay = new Krb5ConfOverlay(krb5conf);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(TARGET_PATH.getPath(), spec.getSystemProperties().getString(JAVA_SECURITY_KRB5_CONF, null));
+		checkArtifact(spec, TARGET_PATH);
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		Krb5ConfOverlay overlay = new Krb5ConfOverlay((Path) null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
new file mode 100644
index 0000000..0894ce6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_KEYSTORE_PATH;
+import static org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_TRUSTSTORE_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class SSLStoreOverlayTest extends ContainerOverlayTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		File keystore = tempFolder.newFile();
+		File truststore = tempFolder.newFile();
+		SSLStoreOverlay overlay = new SSLStoreOverlay(keystore, truststore);
+
+		ContainerSpecification spec = new ContainerSpecification();
+		overlay.configure(spec);
+
+		assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null));
+		checkArtifact(spec, TARGET_KEYSTORE_PATH);
+
+		assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null));
+		checkArtifact(spec, TARGET_TRUSTSTORE_PATH);
+	}
+
+	@Test
+	public void testNoConf() throws Exception {
+		SSLStoreOverlay overlay = new SSLStoreOverlay(null, null);
+
+		ContainerSpecification containerSpecification = new ContainerSpecification();
+		overlay.configure(containerSpecification);
+	}
+
+	@Test
+	public void testBuilderFromEnvironment() throws Exception {
+
+		final Configuration conf = new Configuration();
+		File keystore = tempFolder.newFile();
+		File truststore = tempFolder.newFile();
+
+		conf.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, keystore.getAbsolutePath());
+		conf.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, truststore.getAbsolutePath());
+
+		SSLStoreOverlay.Builder builder = SSLStoreOverlay.newBuilder().fromEnvironment(conf);
+		assertEquals(builder.keystorePath, keystore);
+		assertEquals(builder.truststorePath, truststore);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index d318a3c..45c5a77 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -29,6 +29,9 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.Map;
 
 import static org.junit.Assert.fail;
 
@@ -114,4 +117,40 @@ public class CommonTestUtils {
 			fail("Cannot determine Java version: " + e.getMessage());
 		}
 	}
+
+	// This code is taken from: http://stackoverflow.com/a/7201825/568695
+	// it changes the environment variables of this JVM. Use only for testing purposes!
+	@SuppressWarnings("unchecked")
+	public static void setEnv(Map<String, String> newenv) {
+		try {
+			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
+			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
+			theEnvironmentField.setAccessible(true);
+			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
+			env.putAll(newenv);
+			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+			theCaseInsensitiveEnvironmentField.setAccessible(true);
+			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
+			cienv.putAll(newenv);
+		} catch (NoSuchFieldException e) {
+			try {
+				Class<?>[] classes = Collections.class.getDeclaredClasses();
+				Map<String, String> env = System.getenv();
+				for (Class<?> cl : classes) {
+					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+						Field field = cl.getDeclaredField("m");
+						field.setAccessible(true);
+						Object obj = field.get(env);
+						Map<String, String> map = (Map<String, String>) obj;
+						map.clear();
+						map.putAll(newenv);
+					}
+				}
+			} catch (Exception e2) {
+				throw new RuntimeException(e2);
+			}
+		} catch (Exception e1) {
+			throw new RuntimeException(e1);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index aa5e7d3..804b3d4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
@@ -58,7 +59,6 @@ import java.io.FileReader;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Field;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -542,42 +542,10 @@ public class TestBaseUtils extends TestLogger {
 		return configs;
 	}
 
-	// This code is taken from: http://stackoverflow.com/a/7201825/568695
-	// it changes the environment variables of this JVM. Use only for testing purposes!
-	@SuppressWarnings("unchecked")
 	public static void setEnv(Map<String, String> newenv) {
-		try {
-			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
-			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
-			theEnvironmentField.setAccessible(true);
-			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
-			env.putAll(newenv);
-			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
-			theCaseInsensitiveEnvironmentField.setAccessible(true);
-			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
-			cienv.putAll(newenv);
-		} catch (NoSuchFieldException e) {
-			try {
-				Class<?>[] classes = Collections.class.getDeclaredClasses();
-				Map<String, String> env = System.getenv();
-				for (Class<?> cl : classes) {
-					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
-						Field field = cl.getDeclaredField("m");
-						field.setAccessible(true);
-						Object obj = field.get(env);
-						Map<String, String> map = (Map<String, String>) obj;
-						map.clear();
-						map.putAll(newenv);
-					}
-				}
-			} catch (Exception e2) {
-				throw new RuntimeException(e2);
-			}
-		} catch (Exception e1) {
-			throw new RuntimeException(e1);
-		}
+		CommonTestUtils.setEnv(newenv);
 	}
-	
+
 	private static ExecutionContext defaultExecutionContext() {
 		return ExecutionContext$.MODULE$.global();
 	}


Mime
View raw message