flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [10/14] flink git commit: [FLINK-6496] [security] Port SSL config parameters to ConfigOptions
Date Sat, 01 Jul 2017 07:36:01 GMT
[FLINK-6496] [security] Port SSL config parameters to ConfigOptions

This closes #3855.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67bf467a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67bf467a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67bf467a

Branch: refs/heads/master
Commit: 67bf467a10f68a26ad573d5426c307f0402423bd
Parents: 9ed9ea6
Author: zentol <chesnay@apache.org>
Authored: Tue May 9 10:46:18 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Sat Jul 1 15:33:42 2017 +0800

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 60 +++++++++++++----
 .../configuration/HistoryServerOptions.java     |  2 +-
 .../flink/configuration/SecurityOptions.java    | 69 ++++++++++++++++++++
 .../overlays/SSLStoreOverlay.java               | 14 ++--
 .../org/apache/flink/runtime/net/SSLUtils.java  | 62 ++++++------------
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 32 +++------
 .../flink/runtime/blob/BlobClientSslTest.java   | 31 +++++----
 .../overlays/SSLStoreOverlayTest.java           | 10 +--
 .../network/netty/NettyClientServerSslTest.java | 18 ++---
 .../apache/flink/runtime/net/SSLUtilsTest.java  | 68 +++++++++----------
 .../flink/runtime/akka/AkkaSslITCase.scala      | 46 ++++++-------
 11 files changed, 240 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 476797e..35d3d13 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -837,35 +837,57 @@ public final class ConfigConstants {
 	// ----------------------------- Transport SSL Settings--------------------
 
 	/**
-	 * Enable SSL support
+	 * @deprecated use {@link SecurityOptions#SSL_ENABLED} instead
 	 */
+	@Deprecated
 	public static final String SECURITY_SSL_ENABLED = "security.ssl.enabled";
 
-	/** The Java keystore file containing the flink endpoint key and certificate */
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_KEYSTORE} instead
+	 */
+	@Deprecated
 	public static final String SECURITY_SSL_KEYSTORE = "security.ssl.keystore";
 
-	/** secret to decrypt the keystore file */
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_KEYSTORE_PASSWORD} instead
+	 */
+	@Deprecated
 	public static final String SECURITY_SSL_KEYSTORE_PASSWORD = "security.ssl.keystore-password";
 
-	/** secret to decrypt the server key */
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_KEY_PASSWORD} instead
+	 */
+	@Deprecated
 	public static final String SECURITY_SSL_KEY_PASSWORD = "security.ssl.key-password";
 
-	/** The truststore file containing the public CA certificates to verify the ssl peers */
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_TRUSTSTORE} instead
+	 */
+	@Deprecated
 	public static final String SECURITY_SSL_TRUSTSTORE = "security.ssl.truststore";
 
-	/** Secret to decrypt the truststore */
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_TRUSTSTORE_PASSWORD} instead
+	 */
+	@Deprecated
 	public static final String SECURITY_SSL_TRUSTSTORE_PASSWORD = "security.ssl.truststore-password";
 
-	/** SSL protocol version to be supported */
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_PROTOCOL} instead
+	 */
+	@Deprecated
 	public static final String SECURITY_SSL_PROTOCOL = "security.ssl.protocol";
 
 	/**
-	 * The standard SSL algorithms to be supported
-	 * More options here - http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites
-	 * */
+	 * @deprecated use {@link SecurityOptions#SSL_ALGORITHMS} instead
+	 */
+	@Deprecated
 	public static final String SECURITY_SSL_ALGORITHMS = "security.ssl.algorithms";
 
-	/** Flag to enable/disable hostname verification for the ssl connections */
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_VERIFY_HOSTNAME} instead
+	 */
+	@Deprecated
 	public static final String SECURITY_SSL_VERIFY_HOSTNAME = "security.ssl.verify-hostname";
 
 	// ----------------------------- Streaming --------------------------------
@@ -1541,12 +1563,28 @@ public final class ConfigConstants {
 
 	// ----------------------------- SSL Values --------------------------------
 
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_ENABLED} instead
+	 */
+	@Deprecated
 	public static boolean DEFAULT_SECURITY_SSL_ENABLED = false;
 
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_PROTOCOL} instead
+	 */
+	@Deprecated
 	public static String DEFAULT_SECURITY_SSL_PROTOCOL = "TLSv1.2";
 
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_ALGORITHMS} instead
+	 */
+	@Deprecated
 	public static String DEFAULT_SECURITY_SSL_ALGORITHMS = "TLS_RSA_WITH_AES_128_CBC_SHA";
 
+	/**
+	 * @deprecated use {@link SecurityOptions#SSL_VERIFY_HOSTNAME} instead
+	 */
+	@Deprecated
 	public static boolean DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME = true;
 
 	// ----------------------------- Streaming Values --------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index ebe4f2b..27c56d4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -71,7 +71,7 @@ public class HistoryServerOptions {
 
 	/**
 	 * Enables/Disables SSL support for the HistoryServer web-frontend. Only relevant if
-	 * {@link ConfigConstants#SECURITY_SSL_ENABLED} is enabled.
+	 * {@link SecurityOptions#SSL_ENABLED} is enabled.
 	 */
 	public static final ConfigOption<Boolean> HISTORY_SERVER_WEB_SSL_ENABLED =
 		key("historyserver.web.ssl.enabled")

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 3763198..2c353d8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -66,4 +66,73 @@ public class SecurityOptions {
 	public static final ConfigOption<String> ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME =
 		key("zookeeper.sasl.login-context-name")
 			.defaultValue("Client");
+
+	// ------------------------------------------------------------------------
+	//  SSL Security Options
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Enable SSL support.
+	 */
+	public static final ConfigOption<Boolean> SSL_ENABLED =
+		key("security.ssl.enabled")
+			.defaultValue(false);
+
+	/**
+	 * The Java keystore file containing the flink endpoint key and certificate.
+	 */
+	public static final ConfigOption<String> SSL_KEYSTORE =
+		key("security.ssl.keystore")
+			.noDefaultValue();
+
+	/**
+	 * Secret to decrypt the keystore file.
+	 */
+	public static final ConfigOption<String> SSL_KEYSTORE_PASSWORD =
+		key("security.ssl.keystore-password")
+			.noDefaultValue();
+
+	/**
+	 * Secret to decrypt the server key.
+	 */
+	public static final ConfigOption<String> SSL_KEY_PASSWORD =
+		key("security.ssl.key-password")
+			.noDefaultValue();
+
+	/**
+	 * The truststore file containing the public CA certificates to verify the ssl peers.
+	 */
+	public static final ConfigOption<String> SSL_TRUSTSTORE =
+		key("security.ssl.truststore")
+			.noDefaultValue();
+
+	/**
+	 * Secret to decrypt the truststore.
+	 */
+	public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD =
+		key("security.ssl.truststore-password")
+			.noDefaultValue();
+
+	/**
+	 * SSL protocol version to be supported.
+	 */
+	public static final ConfigOption<String> SSL_PROTOCOL =
+		key("security.ssl.protocol")
+			.defaultValue("TLSv1.2");
+
+	/**
+	 * The standard SSL algorithms to be supported.
+	 *
+	 * <p>More options here - http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites
+	 */
+	public static final ConfigOption<String> SSL_ALGORITHMS =
+		key("security.ssl.algorithms")
+			.defaultValue("TLS_RSA_WITH_AES_128_CBC_SHA");
+
+	/**
+	 * Flag to enable/disable hostname verification for the ssl connections.
+	 */
+	public static final ConfigOption<Boolean> SSL_VERIFY_HOSTNAME =
+		key("security.ssl.verify-hostname")
+			.defaultValue(true);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
index dd79ca1..84d407b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.overlays;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.slf4j.Logger;
@@ -64,7 +64,7 @@ public class SSLStoreOverlay extends AbstractContainerOverlay {
 				.setDest(TARGET_KEYSTORE_PATH)
 				.setCachable(false)
 				.build());
-			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath());
+			container.getDynamicConfiguration().setString(SecurityOptions.SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath());
 		}
 		if(truststore != null) {
 			container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
@@ -72,7 +72,7 @@ public class SSLStoreOverlay extends AbstractContainerOverlay {
 				.setDest(TARGET_TRUSTSTORE_PATH)
 				.setCachable(false)
 				.build());
-			container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
TARGET_TRUSTSTORE_PATH.getPath());
+			container.getDynamicConfiguration().setString(SecurityOptions.SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath());
 		}
 	}
 
@@ -98,19 +98,19 @@ public class SSLStoreOverlay extends AbstractContainerOverlay {
 		 */
 		public Builder fromEnvironment(Configuration globalConfiguration)  {
 
-			String keystore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_KEYSTORE,
null);
+			String keystore = globalConfiguration.getString(SecurityOptions.SSL_KEYSTORE);
 			if(keystore != null) {
 				keystorePath = new File(keystore);
 				if(!keystorePath.exists()) {
-					throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_KEYSTORE);
+					throw new IllegalStateException("Invalid configuration for " + SecurityOptions.SSL_KEYSTORE.key());
 				}
 			}
 
-			String truststore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
null);
+			String truststore = globalConfiguration.getString(SecurityOptions.SSL_TRUSTSTORE);
 			if(truststore != null) {
 				truststorePath = new File(truststore);
 				if(!truststorePath.exists()) {
-					throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_TRUSTSTORE);
+					throw new IllegalStateException("Invalid configuration for " + SecurityOptions.SSL_TRUSTSTORE.key());
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 2267eac..015b3d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.net;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -54,8 +54,7 @@ public class SSLUtils {
 
 		Preconditions.checkNotNull(sslConfig);
 
-		return sslConfig.getBoolean( ConfigConstants.SECURITY_SSL_ENABLED,
-			ConfigConstants.DEFAULT_SECURITY_SSL_ENABLED);
+		return sslConfig.getBoolean(SecurityOptions.SSL_ENABLED);
 	}
 
 	/**
@@ -67,13 +66,9 @@ public class SSLUtils {
 	 */
 	public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration config) {
 		if (socket instanceof SSLServerSocket) {
-			final String[] protocols = config.getString(
-					ConfigConstants.SECURITY_SSL_PROTOCOL,
-					ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(",");
+			final String[] protocols = config.getString(SecurityOptions.SSL_PROTOCOL).split(",");
 
-			final String[] cipherSuites = config.getString(
-					ConfigConstants.SECURITY_SSL_ALGORITHMS,
-					ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(",");
+			final String[] cipherSuites = config.getString(SecurityOptions.SSL_ALGORITHMS).split(",");
 
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Configuring TLS version and cipher suites on SSL socket {} / {}",
@@ -93,12 +88,8 @@ public class SSLUtils {
 	 *        The application configuration
 	 */
 	public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) {
-		engine.setEnabledProtocols(config.getString(
-			ConfigConstants.SECURITY_SSL_PROTOCOL,
-			ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
-		engine.setEnabledCipherSuites(config.getString(
-			ConfigConstants.SECURITY_SSL_ALGORITHMS,
-			ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(","));
+		engine.setEnabledProtocols(config.getString(SecurityOptions.SSL_PROTOCOL).split(","));
+		engine.setEnabledCipherSuites(config.getString(SecurityOptions.SSL_ALGORITHMS).split(","));
 	}
 
 	/**
@@ -114,8 +105,7 @@ public class SSLUtils {
 		Preconditions.checkNotNull(sslConfig);
 		Preconditions.checkNotNull(sslParams);
 
-		boolean verifyHostname = sslConfig.getBoolean(ConfigConstants.SECURITY_SSL_VERIFY_HOSTNAME,
-			ConfigConstants.DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME);
+		boolean verifyHostname = sslConfig.getBoolean(SecurityOptions.SSL_VERIFY_HOSTNAME);
 		if (verifyHostname) {
 			sslParams.setEndpointIdentificationAlgorithm("HTTPS");
 		}
@@ -139,18 +129,12 @@ public class SSLUtils {
 		if (getSSLEnabled(sslConfig)) {
 			LOG.debug("Creating client SSL context from configuration");
 
-			String trustStoreFilePath = sslConfig.getString(
-				ConfigConstants.SECURITY_SSL_TRUSTSTORE,
-				null);
-			String trustStorePassword = sslConfig.getString(
-				ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD,
-				null);
-			String sslProtocolVersion = sslConfig.getString(
-				ConfigConstants.SECURITY_SSL_PROTOCOL,
-				ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL);
+			String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
+			String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+			String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
 
-			Preconditions.checkNotNull(trustStoreFilePath, ConfigConstants.SECURITY_SSL_TRUSTSTORE
+ " was not configured.");
-			Preconditions.checkNotNull(trustStorePassword, ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD
+ " was not configured.");
+			Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() +
" was not configured.");
+			Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key()
+ " was not configured.");
 
 			KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
 
@@ -193,25 +177,17 @@ public class SSLUtils {
 		if (getSSLEnabled(sslConfig)) {
 			LOG.debug("Creating server SSL context from configuration");
 
-			String keystoreFilePath = sslConfig.getString(
-				ConfigConstants.SECURITY_SSL_KEYSTORE,
-				null);
+			String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
 
-			String keystorePassword = sslConfig.getString(
-				ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD,
-				null);
+			String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
 
-			String certPassword = sslConfig.getString(
-				ConfigConstants.SECURITY_SSL_KEY_PASSWORD,
-				null);
+			String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
 
-			String sslProtocolVersion = sslConfig.getString(
-				ConfigConstants.SECURITY_SSL_PROTOCOL,
-				ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL);
+			String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
 
-			Preconditions.checkNotNull(keystoreFilePath, ConfigConstants.SECURITY_SSL_KEYSTORE + "
was not configured.");
-			Preconditions.checkNotNull(keystorePassword, ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD
+ " was not configured.");
-			Preconditions.checkNotNull(certPassword, ConfigConstants.SECURITY_SSL_KEY_PASSWORD + "
was not configured.");
+			Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was
not configured.");
+			Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key()
+ " was not configured.");
+			Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was
not configured.");
 
 			KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
 			FileInputStream keyStoreFile = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 60a33ba..2f8445a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -26,9 +26,9 @@ import akka.actor._
 import akka.pattern.{ask => akkaAsk}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, Configuration, SecurityOptions}
 import org.apache.flink.runtime.net.SSLUtils
-import org.apache.flink.util.{ConfigurationException, NetUtils, Preconditions}
+import org.apache.flink.util.NetUtils
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
 import org.slf4j.LoggerFactory
 
@@ -301,33 +301,19 @@ object AkkaUtils {
 
     val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"
 
-    val akkaSSLKeyStore = configuration.getString(
-      ConfigConstants.SECURITY_SSL_KEYSTORE,
-      null)
+    val akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_KEYSTORE)
 
-    val akkaSSLKeyStorePassword = configuration.getString(
-      ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD,
-      null)
+    val akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD)
 
-    val akkaSSLKeyPassword = configuration.getString(
-      ConfigConstants.SECURITY_SSL_KEY_PASSWORD,
-      null)
+    val akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_KEY_PASSWORD)
 
-    val akkaSSLTrustStore = configuration.getString(
-      ConfigConstants.SECURITY_SSL_TRUSTSTORE,
-      null)
+    val akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_TRUSTSTORE)
 
-    val akkaSSLTrustStorePassword = configuration.getString(
-      ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD,
-      null)
+    val akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD)
 
-    val akkaSSLProtocol = configuration.getString(
-      ConfigConstants.SECURITY_SSL_PROTOCOL,
-      ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL)
+    val akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL)
 
-    val akkaSSLAlgorithmsString = configuration.getString(
-      ConfigConstants.SECURITY_SSL_ALGORITHMS,
-      ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS)
+    val akkaSSLAlgorithmsString = configuration.getString(SecurityOptions.SSL_ALGORITHMS)
     val akkaSSLAlgorithms = akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",",
"]")
 
     val configString =

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index f9052e1..340ac42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -32,9 +32,9 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -67,17 +67,16 @@ public class BlobClientSslTest extends TestLogger {
 	@BeforeClass
 	public static void startSSLServer() throws IOException {
 		Configuration config = new Configuration();
-		config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-		config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		config.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
 		BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
 
-
 		sslClientConfig = new Configuration();
-		sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password");
+		sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
+		sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 	}
 
 	/**
@@ -86,18 +85,18 @@ public class BlobClientSslTest extends TestLogger {
 	@BeforeClass
 	public static void startNonSSLServer() throws IOException {
 		Configuration config = new Configuration();
-		config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		config.setBoolean(SecurityOptions.SSL_ENABLED, true);
 		config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
-		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-		config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
 		BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
 
 		clientConfig = new Configuration();
-		clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
 		clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false);
-		clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password");
+		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
+		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
index 0894ce6..ce48ce4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.overlays;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.junit.Rule;
 import org.junit.Test;
@@ -46,10 +46,10 @@ public class SSLStoreOverlayTest extends ContainerOverlayTestBase {
 		ContainerSpecification spec = new ContainerSpecification();
 		overlay.configure(spec);
 
-		assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_KEYSTORE,
null));
+		assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.SSL_KEYSTORE));
 		checkArtifact(spec, TARGET_KEYSTORE_PATH);
 
-		assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
null));
+		assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.SSL_TRUSTSTORE));
 		checkArtifact(spec, TARGET_TRUSTSTORE_PATH);
 	}
 
@@ -68,8 +68,8 @@ public class SSLStoreOverlayTest extends ContainerOverlayTestBase {
 		File keystore = tempFolder.newFile();
 		File truststore = tempFolder.newFile();
 
-		conf.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, keystore.getAbsolutePath());
-		conf.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, truststore.getAbsolutePath());
+		conf.setString(SecurityOptions.SSL_KEYSTORE, keystore.getAbsolutePath());
+		conf.setString(SecurityOptions.SSL_TRUSTSTORE, truststore.getAbsolutePath());
 
 		SSLStoreOverlay.Builder builder = SSLStoreOverlay.newBuilder().fromEnvironment(conf);
 		assertEquals(builder.keystorePath, keystore);

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index da678bd..d0e875b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -22,8 +22,8 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.handler.codec.string.StringDecoder;
 import io.netty.handler.codec.string.StringEncoder;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.util.NetUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -87,7 +87,7 @@ public class NettyClientServerSslTest {
 
 		Configuration config = createSslConfig();
 		// Modify the keystore password to an incorrect one
-		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "invalidpassword");
+		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "invalidpassword");
 
 		NettyConfig nettyConfig = new NettyConfig(
 			InetAddress.getLoopbackAddress(),
@@ -126,7 +126,7 @@ public class NettyClientServerSslTest {
 		Configuration config = createSslConfig();
 
 		// Use a server certificate which is not present in the truststore
-		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/untrusted.keystore");
+		config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/untrusted.keystore");
 
 		NettyConfig nettyConfig = new NettyConfig(
 			InetAddress.getLoopbackAddress(),
@@ -149,12 +149,12 @@ public class NettyClientServerSslTest {
 	private Configuration createSslConfig() throws Exception {
 
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-		flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-		flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password");
+		flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		flinkConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+		flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
+		flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 		return flinkConfig;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index d28d693..a3c2b7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.runtime.net;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -40,9 +40,9 @@ public class SSLUtilsTest {
 	public void testCreateSSLClientContext() throws Exception {
 
 		Configuration clientConfig = new Configuration();
-		clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password");
+		clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
+		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 
 		SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
 		Assert.assertNotNull(clientContext);
@@ -55,7 +55,7 @@ public class SSLUtilsTest {
 	public void testCreateSSLClientContextWithSSLDisabled() throws Exception {
 
 		Configuration clientConfig = new Configuration();
-		clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false);
+		clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
 
 		SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
 		Assert.assertNull(clientContext);
@@ -68,9 +68,9 @@ public class SSLUtilsTest {
 	public void testCreateSSLClientContextMisconfiguration() {
 
 		Configuration clientConfig = new Configuration();
-		clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "badpassword");
+		clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
+		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "badpassword");
 
 		try {
 			SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
@@ -87,10 +87,10 @@ public class SSLUtilsTest {
 	public void testCreateSSLServerContext() throws Exception {
 
 		Configuration serverConfig = new Configuration();
-		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
 
 		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 		Assert.assertNotNull(serverContext);
@@ -103,7 +103,7 @@ public class SSLUtilsTest {
 	public void testCreateSSLServerContextWithSSLDisabled() throws Exception {
 
 		Configuration serverConfig = new Configuration();
-		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false);
+		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
 
 		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 		Assert.assertNull(serverContext);
@@ -116,10 +116,10 @@ public class SSLUtilsTest {
 	public void testCreateSSLServerContextMisconfiguration() {
 
 		Configuration serverConfig = new Configuration();
-		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "badpassword");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "badpassword");
+		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "badpassword");
+		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "badpassword");
 
 		try {
 			SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
@@ -136,11 +136,11 @@ public class SSLUtilsTest {
 	public void testCreateSSLServerContextWithMultiProtocols() {
 
 		Configuration serverConfig = new Configuration();
-		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2");
+		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1,TLSv1.2");
 
 		try {
 			SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
@@ -157,12 +157,12 @@ public class SSLUtilsTest {
 	public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception {
 
 		Configuration serverConfig = new Configuration();
-		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
+		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1.1");
+		serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
 
 		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 		ServerSocket socket = null;
@@ -198,12 +198,12 @@ public class SSLUtilsTest {
 	public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception {
 
 		Configuration serverConfig = new Configuration();
-		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1");
-		serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
+		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1");
+		serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
 
 		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 		SSLEngine engine = serverContext.createSSLEngine();

http://git-wip-us.apache.org/repos/asf/flink/blob/67bf467a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index daf0f47..4671981 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.akka
 
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
-import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils, ScalaTestingUtils}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, SecurityOptions}
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 import org.scalatest.junit.JUnitRunner
@@ -54,15 +54,15 @@ class AkkaSslITCase(_system: ActorSystem)
       config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
-      config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
-      config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE,
+      config.setBoolean(SecurityOptions.SSL_ENABLED, true)
+      config.setString(SecurityOptions.SSL_KEYSTORE,
         getClass.getResource("/local127.keystore").getPath)
-      config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password")
-      config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password")
-      config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
+      config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password")
+      config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password")
+      config.setString(SecurityOptions.SSL_TRUSTSTORE,
         getClass.getResource("/local127.truststore").getPath)
 
-      config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password")
+      config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password")
 
       val cluster = new TestingCluster(config, false)
 
@@ -81,16 +81,16 @@ class AkkaSslITCase(_system: ActorSystem)
         config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
-        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
-        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE,
+        config.setBoolean(SecurityOptions.SSL_ENABLED, true)
+        config.setString(SecurityOptions.SSL_KEYSTORE,
           getClass.getResource("/local127.keystore").getPath)
-        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password")
-        config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password")
-        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
+        config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password")
+        config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password")
+        config.setString(SecurityOptions.SSL_TRUSTSTORE,
           getClass.getResource("/local127.truststore").getPath)
 
-        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password")
-        config.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLSv1,TLSv1.1")
+        config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password")
+        config.setString(SecurityOptions.SSL_ALGORITHMS, "TLSv1,TLSv1.1")
 
         val cluster = new TestingCluster(config, false)
 
@@ -103,7 +103,7 @@ class AkkaSslITCase(_system: ActorSystem)
       val config = new Configuration()
       config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
-      config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false)
+      config.setBoolean(SecurityOptions.SSL_ENABLED, false)
 
       val cluster = new TestingCluster(config, false)
 
@@ -121,12 +121,12 @@ class AkkaSslITCase(_system: ActorSystem)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
         config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 
-        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
-        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "invalid.keystore")
-        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password")
-        config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password")
-        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "invalid.keystore")
-        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password")
+        config.setBoolean(SecurityOptions.SSL_ENABLED, true)
+        config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore")
+        config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password")
+        config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password")
+        config.setString(SecurityOptions.SSL_TRUSTSTORE, "invalid.keystore")
+        config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password")
 
         val cluster = new TestingCluster(config, false)
 
@@ -143,7 +143,7 @@ class AkkaSslITCase(_system: ActorSystem)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
         config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 
-        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
+        config.setBoolean(SecurityOptions.SSL_ENABLED, true)
 
         val cluster = new TestingCluster(config, false)
 


Mime
View raw message