gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibuen...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-332] Fetching Hive tokens in TokenUtils
Date Wed, 06 Dec 2017 16:59:47 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 571746c2b -> 0afdc45c3


[GOBBLIN-332] Fetching Hive tokens in TokenUtils

Closes #2184 from autumnust/hivefortokenutils


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0afdc45c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0afdc45c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0afdc45c

Branch: refs/heads/master
Commit: 0afdc45c39a80944d9d6bcb1ec9767749ae6dfef
Parents: 571746c
Author: Lei Sun <autumnust@gmail.com>
Authored: Wed Dec 6 08:59:40 2017 -0800
Committer: Issac Buenrostro <ibuenros@apache.org>
Committed: Wed Dec 6 08:59:40 2017 -0800

----------------------------------------------------------------------
 .../gobblin/azkaban/AzkabanJobLauncher.java     |   7 +-
 gobblin-utility/build.gradle                    |   1 +
 .../apache/gobblin/util/hadoop/TokenUtils.java  | 159 +++++++++++++++++--
 3 files changed, 152 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0afdc45c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 5c9fc1d..89c7646 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.azkaban;
 
+import com.google.common.base.Optional;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -35,6 +36,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
@@ -171,7 +173,10 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
       // see javadoc for more information
       LOG.info(String.format("Job type %s does not provide Hadoop tokens. Negotiating Hadoop
tokens.",
           props.getProperty(JOB_TYPE)));
-      File tokenFile = TokenUtils.getHadoopTokens(new State(props));
+
+      File tokenFile = File.createTempFile("mr-azkaban", ".token");
+      TokenUtils.getHadoopTokens(new State(props), Optional.of(tokenFile), new Credentials());
+
       System.setProperty(HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
       System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath());
       this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0afdc45c/gobblin-utility/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index cd79bde..9d7505b 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -30,6 +30,7 @@ dependencies {
   compile externalDependency.guava
   compile externalDependency.slf4j
   compile externalDependency.avro
+  compile externalDependency.hiveMetastore
   compile externalDependency.jodaTime
   compile externalDependency.jacksonCore
   compile externalDependency.jasypt

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0afdc45c/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
index 5cb9723..fb45e14 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.util.hadoop;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -25,12 +28,15 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-
 import java.util.regex.Pattern;
+import org.apache.gobblin.configuration.State;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -46,16 +52,12 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Logger;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
-import org.apache.gobblin.configuration.State;
+import org.apache.thrift.TException;
 
 
 /**
@@ -83,14 +85,65 @@ public class TokenUtils {
   private static final String KERBEROS_REALM = "kerberos.realm";
 
   /**
+   * the key that will be used to set proper signature for each of the hcat token when multiple
hcat
+   * tokens are required to be fetched.
+   */
+  private static final String HIVE_TOKEN_SIGNATURE_KEY = "hive.metastore.token.signature";
+  /**
+   * User can specify the hcat location that they used specifically. It could contains addtional
hcat location,
+   * comma-separated.
+   */
+  private static final String USER_DEFINED_HIVE_LOCATIONS = "user.defined.hcatLocation";
+
+  /**
+   * Get Hadoop tokens (tokens for job history server, job tracker, hive and HDFS) using
Kerberos keytab,
+   * on behalf on a proxy user, embed tokens into a {@link UserGroupInformation} as returned
result, persist in-memory
+   * credentials if tokenFile specified
+   *
+   * Note that when a super-user is fetching tokens for other users,
+   * {@link #fetchHcatToken(String, HiveConf, String, IMetaStoreClient)} getDelegationToken}
explicitly
+   * contains a string parameter indicating proxy user, while other hadoop services require
impersonation first.
+   *
+   * @param state A {@link State} object that should contain properties.
+   * @param tokenFile If present, the file will store materialized credentials.
+   * @param ugi The {@link UserGroupInformation} that used to impersonate into the proxy
user by a "doAs block".
+   * @param targetUser The user to be impersonated as, for fetching hadoop tokens.
+   * @return A {@link UserGroupInformation} containing negotiated credentials.
+   */
+  public static UserGroupInformation getHadoopAndHiveTokensForProxyUser(final State state,
Optional<File> tokenFile,
+      UserGroupInformation ugi, IMetaStoreClient client, String targetUser) throws IOException,
InterruptedException {
+    final Credentials cred = new Credentials();
+    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        getHadoopTokens(state, Optional.absent(), cred);
+        return null;
+      }
+    });
+
+    ugi.getCredentials().addAll(cred);
+    // Will add hive tokens into ugi in this method.
+    getHiveToken(state, client, cred, targetUser, ugi);
+
+    if (tokenFile.isPresent()){
+      persistTokens(cred, tokenFile.get());
+    }
+    // at this point, tokens in ugi can be more than that in Credential object,
+    // since hive token is not put in Credential object.
+    return ugi;
+  }
+
+  /**
    * Get Hadoop tokens (tokens for job history server, job tracker and HDFS) using Kerberos
keytab.
    *
    * @param state A {@link State} object that should contain property {@link #USER_TO_PROXY},
    * {@link #KEYTAB_USER} and {@link #KEYTAB_LOCATION}. To obtain tokens for
    * other namenodes, use property {@link #OTHER_NAMENODES} with comma separated HDFS URIs.
-   * @return A {@link File} containing the negotiated credentials.
+   * @param tokenFile If present, the file will store materialized credentials.
+   * @param cred A im-memory representation of credentials.
    */
-  public static File getHadoopTokens(final State state) throws IOException, InterruptedException
{
+  public static void getHadoopTokens(final State state, Optional<File> tokenFile, Credentials
cred)
+      throws IOException, InterruptedException {
 
     Preconditions.checkArgument(state.contains(KEYTAB_USER), "Missing required property "
+ KEYTAB_USER);
     Preconditions.checkArgument(state.contains(KEYTAB_LOCATION), "Missing required property
" + KEYTAB_LOCATION);
@@ -103,17 +156,15 @@ public class TokenUtils {
     final Optional<String> userToProxy = Strings.isNullOrEmpty(state.getProp(USER_TO_PROXY))
? Optional.<String>absent()
         : Optional.fromNullable(state.getProp(USER_TO_PROXY));
     final Configuration conf = new Configuration();
-    final Credentials cred = new Credentials();
 
     LOG.info("Getting tokens for " + userToProxy);
 
     getJhToken(conf, cred);
     getFsAndJtTokens(state, conf, userToProxy, cred);
 
-    File tokenFile = File.createTempFile("mr-azkaban", ".token");
-    persistTokens(cred, tokenFile);
-
-    return tokenFile;
+    if (tokenFile.isPresent()) {
+      persistTokens(cred, tokenFile.get());
+    }
   }
 
   /**
@@ -132,6 +183,86 @@ public class TokenUtils {
     }
   }
 
+  /**
+   *
+   * @param userToProxy The user that hiveClient is impersonating as to fetch the delegation
tokens.
+   * @param ugi The {@link UserGroupInformation} that to be added with negotiated credentials.
+   */
+  public static void getHiveToken(final State state, IMetaStoreClient hiveClient, Credentials
cred,
+      final String userToProxy, UserGroupInformation ugi) {
+    try {
+      // Fetch and save the default hcat token.
+      LOG.info("Fetching default Hive MetaStore token from hive");
+      HiveConf hiveConf = new HiveConf();
+
+      Token<DelegationTokenIdentifier> hcatToken = fetchHcatToken(userToProxy, hiveConf,
null, hiveClient);
+      cred.addToken(hcatToken.getService(), hcatToken);
+      ugi.addToken(hcatToken);
+
+      // Fetch extra Hcat location user specified.
+      final List<String> extraHcatLocations =
+          state.contains(USER_DEFINED_HIVE_LOCATIONS) ? state.getPropAsList(USER_DEFINED_HIVE_LOCATIONS)
+              : Collections.EMPTY_LIST;
+      if (!extraHcatLocations.isEmpty()) {
+        LOG.info("Need to fetch extra metaStore tokens from hive.");
+
+        // start to process the user inputs.
+        for (final String thriftUrl : extraHcatLocations) {
+          LOG.info("Fetching metaStore token from : " + thriftUrl);
+
+          hiveConf = new HiveConf();
+          hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
+          hcatToken = fetchHcatToken(userToProxy, hiveConf, thriftUrl, hiveClient);
+          cred.addToken(hcatToken.getService(), hcatToken);
+          ugi.addToken(hcatToken);
+
+          LOG.info("Successfully fetched token for:" + thriftUrl);
+        }
+      }
+    } catch (final Throwable t) {
+      final String message = "Failed to get hive metastore token." + t.getMessage() + t.getCause();
+      LOG.error(message, t);
+      throw new RuntimeException(message);
+    }
+  }
+
+  /**
+   * function to fetch hcat token as per the specified hive configuration and then store
the token
+   * in to the credential store specified .
+   *
+   * @param userToProxy String value indicating the name of the user the token will be fetched
for.
+   * @param hiveConf the configuration based off which the hive client will be initialized.
+   */
+  private static Token<DelegationTokenIdentifier> fetchHcatToken(final String userToProxy,
final HiveConf hiveConf,
+      final String tokenSignatureOverwrite, final IMetaStoreClient hiveClient)
+      throws IOException, TException, InterruptedException {
+
+    LOG.info(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname + ": " + hiveConf.get(
+        HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname));
+
+    LOG.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": " + hiveConf.get(
+        HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
+
+    final Token<DelegationTokenIdentifier> hcatToken = new Token<>();
+
+    hcatToken.decodeFromUrlString(
+        hiveClient.getDelegationToken(userToProxy, UserGroupInformation.getLoginUser().getShortUserName()));
+
+    // overwrite the value of the service property of the token if the signature
+    // override is specified.
+    // If the service field is set, do not overwrite that
+    if (hcatToken.getService().getLength() <= 0 && tokenSignatureOverwrite !=
null
+        && tokenSignatureOverwrite.trim().length() > 0) {
+      hcatToken.setService(new Text(tokenSignatureOverwrite.trim().toLowerCase()));
+
+      LOG.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite);
+    }
+
+    LOG.info("Created hive metastore token for user:" + userToProxy + " with kind[" + hcatToken.getKind()
+ "]"
+        + " and service[" + hcatToken.getService() + "]");
+    return hcatToken;
+  }
+
   private static void getJhToken(Configuration conf, Credentials cred) throws IOException
{
     YarnRPC rpc = YarnRPC.create(conf);
     final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);


Mime
View raw message