flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-2977] Using reflection to load HBase Kerberos tokens
Date Mon, 16 Nov 2015 13:19:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master feb6994ff -> aba377945


[FLINK-2977] Using reflection to load HBase Kerberos tokens

This closes #1342


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aba37794
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aba37794
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aba37794

Branch: refs/heads/master
Commit: aba377945a66f37198e4a16bb9256c180ffcae79
Parents: feb6994
Author: Niels Basjes <niels@basjes.nl>
Authored: Tue Nov 10 11:04:00 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Nov 16 14:18:02 2015 +0100

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/config.sh     | 10 +++-
 .../main/java/org/apache/flink/yarn/Utils.java  | 55 +++++++++++++++++++-
 2 files changed, 62 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aba37794/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 653944c..3b2b4d7 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -249,7 +249,15 @@ if [ -n "$HADOOP_HOME" ]; then
     fi
 fi
 
-INTERNAL_HADOOP_CLASSPATHS="$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR"
+INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
+
+if [ -n "${HBASE_CONF_DIR}" ]; then
+    # Setup the HBase classpath.
+    INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:`hbase classpath`"
+
+    # We add the HBASE_CONF_DIR last to ensure the right config directory is used.
+    INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
+fi
 
 # Auxilliary function which extracts the name of host from a line which
 # also potentialy includes topology information and the taskManager type

http://git-wip-us.apache.org/repos/asf/flink/blob/aba37794/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index c562cdc..ea49066 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -20,11 +20,11 @@ package org.apache.flink.yarn;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -116,6 +117,8 @@ public final class Utils {
 		Credentials credentials = new Credentials();
 		// for HDFS
 		TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
+		// for HBase
+		obtainTokenForHBase(credentials, conf);
 		// for user
 		UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
 		
@@ -135,7 +138,55 @@ public final class Utils {
 		ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 		amContainer.setTokens(securityTokens);
 	}
-	
+
+	/**
+	 * Obtain Kerberos security token for HBase.
+	 */
+	private static void obtainTokenForHBase(Credentials credentials, Configuration conf) throws
IOException {
+		if (UserGroupInformation.isSecurityEnabled()) {
+			LOG.info("Attempting to obtain Kerberos security token for HBase");
+			try {
+				// ----
+				// Intended call: HBaseConfiguration.addHbaseResources(conf);
+				Class
+						.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+						.getMethod("addHbaseResources", Configuration.class )
+						.invoke(null, conf);
+				// ----
+
+				LOG.info("HBase security setting: {}", conf.get("hbase.security.authentication"));
+
+				if (!"kerberos".equals(conf.get("hbase.security.authentication"))) {
+					LOG.info("HBase has not been configured to use Kerberos.");
+					return;
+				}
+
+				LOG.info("Obtaining Kerberos security token for HBase");
+				// ----
+				// Intended call: Token<AuthenticationTokenIdentifier> token = TokenUtil.obtainToken(conf);
+				Token<?> token = (Token<?>) Class
+						.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+						.getMethod("obtainToken", Configuration.class)
+						.invoke(null, conf);
+				// ----
+
+				if (token == null) {
+					LOG.error("No Kerberos security token for HBase available");
+					return;
+				}
+
+				credentials.addToken(token.getService(), token);
+				LOG.info("Added HBase Kerberos security token to credentials.");
+			} catch ( ClassNotFoundException
+					| NoSuchMethodException
+					| IllegalAccessException
+					| InvocationTargetException e) {
+				LOG.info("HBase is not available (not packaged with this application): {} : \"{}\".",
+						e.getClass().getSimpleName(), e.getMessage());
+			}
+		}
+	}
+
 	public static void logFilesInCurrentDirectory(final Logger logger) {
 		new File(".").list(new FilenameFilter() {
 			


Mime
View raw message