flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [10/50] [abbrv] flink git commit: [FLINK-3929] additional fixes for keytab security
Date Wed, 21 Sep 2016 09:52:43 GMT
[FLINK-3929] additional fixes for keytab security

- load flink-jaas.conf from classpath
- avoid using undocumented flink base dir config entry
- enable test cases to run on MacOS
- unify suffix of secure test cases
- fix error logging and reporting

This closes #2275


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68709b08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68709b08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68709b08

Branch: refs/heads/flip-6
Commit: 68709b087570402cacb7bc3088e0eb35d83c8d32
Parents: 285b6f7
Author: Maximilian Michels <mxm@apache.org>
Authored: Tue Sep 20 15:41:35 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  3 -
 .../src/main/flink-bin/conf/flink-jaas.conf     | 26 --------
 .../flink/runtime/security/SecurityContext.java | 67 ++++++++------------
 .../src/main/resources/flink-jaas.conf          | 26 ++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  2 -
 .../runtime/security/SecurityContextTest.java   |  4 +-
 .../connectors/fs/RollingSinkSecuredITCase.java |  1 -
 .../kafka/Kafka09SecureRunITCase.java           | 62 ------------------
 .../kafka/Kafka09SecuredRunITCase.java          | 62 ++++++++++++++++++
 .../flink/test/util/SecureTestEnvironment.java  | 23 +++----
 .../org/apache/flink/yarn/YarnTestBase.java     |  3 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  1 -
 .../flink/yarn/YarnTaskManagerRunner.java       |  5 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 32 +++-------
 14 files changed, 138 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 575ffad..0711758 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -161,9 +161,6 @@ public class CliFrontend {
 				"filesystem scheme from configuration.", e);
 		}
 
-		this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDirectory.getAbsolutePath()
-				+ ".." + File.separator);
-
 		this.clientTimeout = AkkaUtils.getClientTimeout(config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
deleted file mode 100644
index d476e24..0000000
--- a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
+++ /dev/null
@@ -1,26 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# We are using this file as an workaround for the Kafka and ZK SASL implementation
-# since they explicitly look for java.security.auth.login.config property
-# The file itself is not used by the application since the internal implementation
-# uses a process-wide in-memory java security configuration object.
-# Please do not edit/delete this file - See FLINK-3929
-sample {
-  useKeyTab=false
-  useTicketCache=true;
-};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 4b8b69b..be6611f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -22,7 +22,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -33,7 +34,12 @@ import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 
@@ -170,15 +176,12 @@ public class SecurityContext {
 	 * Kafka current code behavior.
 	 */
 	private static void populateSystemSecurityProperties(Configuration configuration) {
+		Preconditions.checkNotNull(configuration, "The supplied configuation was null");
 
 		//required to be empty for Kafka but we will override the property
 		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
 		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
 
-		if(configuration == null) {
-			return;
-		}
-
 		boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
 				ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
 		if(disableSaslClient) {
@@ -188,46 +191,26 @@ public class SecurityContext {
 			return;
 		}
 
-		String baseDir = configuration.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
-		if(baseDir == null) {
-			String message = "SASL auth is enabled for ZK but unable to locate pseudo Jaas config
" +
-					"since " + ConfigConstants.FLINK_BASE_DIR_PATH_KEY + " is not provided";
-			LOG.error(message);
-			throw new IllegalConfigurationException(message);
-		}
-
-		File f = new File(baseDir);
-		if(!f.exists() || !f.isDirectory()) {
-			LOG.error("Invalid flink base directory {} configuration provided", baseDir);
-			throw new IllegalConfigurationException("Invalid flink base directory configuration provided");
-		}
-
-		File jaasConfigFile = new File(f, JAAS_CONF_FILENAME);
-
-		if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
-
-			//check if there is a conf directory
-			File confDir = new File(f, "conf");
-			if(!confDir.exists() || !confDir.isDirectory()) {
-				LOG.error("Could not locate " + JAAS_CONF_FILENAME);
-				throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
-			}
-
-			jaasConfigFile = new File(confDir, JAAS_CONF_FILENAME);
-
-			if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
-				LOG.error("Could not locate " + JAAS_CONF_FILENAME);
-				throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME);
-			}
+		// load Jaas config file to initialize SASL
+		final File jaasConfFile;
+		try {
+			Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, "");
+			InputStream jaasConfStream = SecurityContext.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
+			Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
+			jaasConfFile = jaasConfPath.toFile();
+			jaasConfFile.deleteOnExit();
+		} catch (IOException e) {
+			throw new RuntimeException("SASL auth is enabled for ZK but unable to " +
+				"locate pseudo Jaas config provided with Flink", e);
 		}
 
 		LOG.info("Enabling {} property with pseudo JAAS config file: {}",
-				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile);
+				JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
 
 		//ZK client module lookup the configuration to handle SASL.
 		//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile.getAbsolutePath());
-		System.setProperty(ZOOKEEPER_SASL_CLIENT,"true");
+		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
+		System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
 
 		String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME,
null);
 		if(!StringUtils.isBlank(zkSaslServiceName)) {
@@ -250,6 +233,10 @@ public class SecurityContext {
 
 		String principal;
 
+		public SecurityConfiguration() {
+			this.flinkConf = GlobalConfiguration.loadConfiguration();
+		}
+
 		public String getKeytab() {
 			return keytab;
 		}
@@ -310,4 +297,4 @@ public class SecurityContext {
 		T run() throws Exception;
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/resources/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/flink-jaas.conf b/flink-runtime/src/main/resources/flink-jaas.conf
new file mode 100644
index 0000000..7f0f06b
--- /dev/null
+++ b/flink-runtime/src/main/resources/flink-jaas.conf
@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# We are using this file as an workaround for the Kafka and ZK SASL implementation
+# since they explicitly look for java.security.auth.login.config property
+# The file itself is not used by the application since the internal implementation
+# uses a process-wide in-memory java security configuration object.
+# Please do not edit/delete this file - See FLINK-3929
+sample {
+  useKeyTab=false
+  useTicketCache=true;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8534ee1..9e2feb5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1583,8 +1583,6 @@ object TaskManager {
       }
     }
 
-    conf.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, cliConfig.getConfigDir() + "/..")
-
     conf
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
index 5f3d76a..3c48e8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
@@ -35,7 +35,7 @@ public class SecurityContextTest {
 		SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
 		try {
 			SecurityContext.install(sc);
-			assertEquals(UserGroupInformation.getLoginUser().getUserName(),getOSUserName());
+			assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName());
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}
@@ -59,7 +59,7 @@ public class SecurityContextTest {
 		if( osName.contains( "windows" ) ){
 			className = "com.sun.security.auth.module.NTSystem";
 		}
-		else if( osName.contains( "linux" ) ){
+		else if( osName.contains( "linux" ) || osName.contains( "mac" )  ){
 			className = "com.sun.security.auth.module.UnixSystem";
 		}
 		else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 930ddd2..051175a 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -227,7 +227,6 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 			TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
 
 		} catch (Exception e) {
-			LOG.error("Exception occured while creating MiniFlink cluster. Reason: {}", e);
 			throw new RuntimeException(e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
deleted file mode 100644
index d12ec65..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/*
- * Kafka Secure Connection (kerberos) IT test case
- */
-public class Kafka09SecureRunITCase extends KafkaConsumerTestBase {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecureRunITCase.class);
-
-	@BeforeClass
-	public static void prepare() throws IOException, ClassNotFoundException {
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Starting Kafka09SecureRunITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-
-		SecureTestEnvironment.prepare(tempFolder);
-		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
-
-		startClusters(true);
-	}
-
-	@AfterClass
-	public static void shutDownServices() {
-		shutdownClusters();
-		SecureTestEnvironment.cleanup();
-	}
-
-
-	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
-	//The timeout for the test case is 2 times timeout of ZK connection
-	@Test(timeout = 600000)
-	public void testMultipleTopics() throws Exception {
-		runProduceConsumeMultipleTopics();
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
new file mode 100644
index 0000000..e748537
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
+
+	@BeforeClass
+	public static void prepare() throws IOException, ClassNotFoundException {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting Kafka09SecuredRunITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		SecureTestEnvironment.prepare(tempFolder);
+		SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+		startClusters(true);
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+		shutdownClusters();
+		SecureTestEnvironment.cleanup();
+	}
+
+
+	//timeout interval is large since in Travis, ZK connection timeout occurs frequently
+	//The timeout for the test case is 2 times timeout of ZK connection
+	@Test(timeout = 600000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index 00b19f1..b5e622b 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.rules.TemporaryFolder;
@@ -60,12 +61,10 @@ public class SecureTestEnvironment {
 
 	private static String hadoopServicePrincipal = null;
 
-	private static File baseDirForSecureRun = null;
-
 	public static void prepare(TemporaryFolder tempFolder) {
 
 		try {
-			baseDirForSecureRun = tempFolder.newFolder();
+			File baseDirForSecureRun = tempFolder.newFolder();
 			LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun);
 
 			String hostName = "localhost";
@@ -113,19 +112,17 @@ public class SecureTestEnvironment {
 			//See Yarn test case module for reference
 			createJaasConfig(baseDirForSecureRun);
 			SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration();
-			Configuration flinkConfig = new Configuration();
+			Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
 			flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
 			flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
 			flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
-			flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirForSecureRun.getAbsolutePath());
 			ctx.setFlinkConfiguration(flinkConfig);
 			TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
 
-			populateSystemEnvVariables();
+			populateJavaPropertyVariables();
 
 		} catch(Exception e) {
-			LOG.error("Exception occured while preparing secure environment. Reason: {}", e);
-			throw new RuntimeException(e);
+			throw new RuntimeException("Exception occured while preparing secure environment.", e);
 		}
 
 	}
@@ -145,14 +142,12 @@ public class SecureTestEnvironment {
 		testPrincipal = null;
 		testZkServerPrincipal = null;
 		hadoopServicePrincipal = null;
-		baseDirForSecureRun = null;
 
 	}
 
-	private static void populateSystemEnvVariables() {
+	private static void populateJavaPropertyVariables() {
 
 		if(LOG.isDebugEnabled()) {
-			System.setProperty("FLINK_JAAS_DEBUG", "true");
 			System.setProperty("sun.security.krb5.debug", "true");
 		}
 
@@ -165,7 +160,6 @@ public class SecureTestEnvironment {
 
 	private static void resetSystemEnvVariables() {
 		System.clearProperty("java.security.krb5.conf");
-		System.clearProperty("FLINK_JAAS_DEBUG");
 		System.clearProperty("sun.security.krb5.debug");
 
 		System.clearProperty("zookeeper.authProvider.1");
@@ -227,7 +221,7 @@ public class SecureTestEnvironment {
 	}
 
 	/*
-	 * Helper method to create a temporary JAAS configuration file to ger around the Kafka and
ZK SASL
+	 * Helper method to create a temporary JAAS configuration file to get around the Kafka and
ZK SASL
 	 * implementation lookup java.security.auth.login.config
 	 */
 	private static void  createJaasConfig(File baseDirForSecureRun) {
@@ -241,8 +235,7 @@ public class SecureTestEnvironment {
 			out.println("useTicketCache=true;");
 			out.println("};");
 		} catch (IOException e) {
-			LOG.error("Exception occured while trying to create JAAS config. Reason: {}", e.getMessage());
-			throw new RuntimeException(e);
+			throw new RuntimeException("Exception occured while trying to create JAAS config.", e);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 605aa44..afdd400 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -427,8 +427,7 @@ public abstract class YarnTestBase extends TestLogger {
 					out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
 					out.println("");
 				} catch (IOException e) {
-					LOG.error("Exception occured while trying to append the security configurations. Reason:
{}", e.getMessage());
-					throw new RuntimeException(e);
+					throw new RuntimeException("Exception occured while trying to append the security configurations.",
e);
 				}
 
 				String configDir = tempConfPathForSecureRun.getAbsolutePath();

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index efb658a..b27876b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -176,7 +176,6 @@ public class YarnApplicationMasterRunner {
 				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
 				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
-			flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
 			SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index c70a30b..21ed52e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -127,7 +127,6 @@ public class YarnTaskManagerRunner {
 				configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
 				configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
-			configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
 			SecurityContext.install(sc.setFlinkConfiguration(configuration));
 
@@ -145,9 +144,9 @@ public class YarnTaskManagerRunner {
 				}
 			});
 		} catch(Exception e) {
-			LOG.error("Exception occurred while launching Task Manager. Reason: {}", e);
+			LOG.error("Exception occurred while launching Task Manager", e);
 			throw new RuntimeException(e);
 		}
 
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index b5364f0..d09340c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,7 +24,6 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
@@ -463,27 +462,17 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		}
 	}
 
-	public static void main(final String[] args) {
-		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for
the YARN session
-
-		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
-		GlobalConfiguration.loadConfiguration(confDirPath);
+	public static void main(final String[] args) throws Exception {
+		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN
session
 		Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
-		flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, confDirPath);
-		try {
-			SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
-			int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>()
{
-				@Override
-				public Integer run() {
-					return cli.run(args);
-				}
-			});
-			System.exit(retCode);
-		} catch(Exception e) {
-			e.printStackTrace();
-			LOG.error("Exception Occured. Reason: {}", e);
-			return;
-		}
+		SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
+		int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>()
{
+			@Override
+			public Integer run() {
+				return cli.run(args);
+			}
+		});
+		System.exit(retCode);
 	}
 
 	@Override
@@ -544,7 +533,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		try {
 			return yarnClusterDescriptor.deploy();
 		} catch (Exception e) {
-			LOG.error("Error while deploying YARN cluster: "+e.getMessage(), e);
 			throw new RuntimeException("Error deploying the YARN cluster", e);
 		}
 


Mime
View raw message