hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject hbase git commit: HBASE-12493 Make User and TokenUtil public
Date Tue, 16 Dec 2014 01:27:08 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 a16c01c51 -> 4fcde44d2


HBASE-12493 Make User and TokenUtil public

Conflicts:
	hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4fcde44d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4fcde44d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4fcde44d

Branch: refs/heads/0.98
Commit: 4fcde44d223af7b7916e00ce7b779b424051d26f
Parents: a16c01c
Author: Gary Helmling <gary@cask.co>
Authored: Mon Dec 15 17:03:46 2014 -0800
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Mon Dec 15 17:07:24 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/security/token/TokenUtil.java  | 381 +++++++++++++++++++
 .../org/apache/hadoop/hbase/security/User.java  |  37 +-
 .../hadoop/hbase/mapred/TableMapReduceUtil.java |  39 +-
 .../hbase/mapreduce/TableMapReduceUtil.java     |  59 +--
 .../hadoop/hbase/security/token/TokenUtil.java  | 198 ----------
 .../security/token/TestTokenAuthentication.java |  37 +-
 6 files changed, 475 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4fcde44d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
new file mode 100644
index 0000000..e46841a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+
+import com.google.protobuf.ServiceException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility methods for obtaining authentication tokens.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TokenUtil {
+  // This class is referenced indirectly by User out in common; instances are created by
reflection
+  private static Log LOG = LogFactory.getLog(TokenUtil.class);
+
+  /**
+   * Obtain and return an authentication token for the current user.
+   * @param conf the configuration for connecting to the cluster
+   * @return the authentication token instance
+   * @deprecated Replaced by {@link #obtainToken(HConnection)}
+   */
+  @Deprecated
+  public static Token<AuthenticationTokenIdentifier> obtainToken(
+      Configuration conf) throws IOException {
+    HConnection connection = null;
+    try {
+      connection = HConnectionManager.createConnection(conf);
+      return obtainToken(connection);
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  /**
+   * Obtain and return an authentication token for the current user.
+   * @param conn The HBase cluster connection
+   * @return the authentication token instance
+   */
+  public static Token<AuthenticationTokenIdentifier> obtainToken(
+      HConnection conn) throws IOException {
+    HTableInterface meta = null;
+    try {
+      meta = conn.getTable(TableName.META_TABLE_NAME);
+      CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
+      AuthenticationProtos.AuthenticationService.BlockingInterface service =
+          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
+      AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
+          AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
+
+      return ProtobufUtil.toToken(response.getToken());
+    } catch (ServiceException se) {
+      ProtobufUtil.toIOException(se);
+    } finally {
+      if (meta != null) {
+        meta.close();
+      }
+    }
+    // dummy return for ServiceException block
+    return null;
+  }
+
+  /**
+   * Obtain and return an authentication token for the current user.
+   * @param conn The HBase cluster connection
+   * @return the authentication token instance
+   */
+  public static Token<AuthenticationTokenIdentifier> obtainToken(
+      final HConnection conn, User user) throws IOException, InterruptedException {
+    return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>()
{
+      @Override
+      public Token<AuthenticationTokenIdentifier> run() throws Exception {
+        return obtainToken(conn);
+      }
+    });
+  }
+
+
+  private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
+      throws IOException {
+    return token.getService() != null
+        ? token.getService() : new Text("default");
+  }
+
+  /**
+   * Obtain an authentication token for the given user and add it to the
+   * user's credentials.
+   * @param conf The configuration for connecting to the cluster
+   * @param user The user for whom to obtain the token
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   * @deprecated Replaced by {@link #obtainAndCacheToken(HConnection,User)}
+   */
+  @Deprecated
+  public static void obtainAndCacheToken(final Configuration conf,
+                                         UserGroupInformation user)
+      throws IOException, InterruptedException {
+    HConnection conn = HConnectionManager.createConnection(conf);
+    try {
+      UserProvider userProvider = UserProvider.instantiate(conf);
+      obtainAndCacheToken(conn, userProvider.create(user));
+    } finally {
+      conn.close();
+    }
+  }
+
+  /**
+   * Obtain an authentication token for the given user and add it to the
+   * user's credentials.
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainAndCacheToken(final HConnection conn,
+      User user)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+      if (token == null) {
+        throw new IOException("No token returned for user " + user.getName());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+            user.getName());
+      }
+      user.addToken(token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user " + user.getName());
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param conf The configuration for connecting to the cluster
+   * @param user The user for whom to obtain the token
+   * @param job The job instance in which the token should be stored
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   * @deprecated Replaced by {@link #obtainTokenForJob(HConnection,User,Job)}
+   */
+  @Deprecated
+  public static void obtainTokenForJob(final Configuration conf,
+                                       UserGroupInformation user, Job job)
+      throws IOException, InterruptedException {
+    HConnection conn = HConnectionManager.createConnection(conf);
+    try {
+      UserProvider userProvider = UserProvider.instantiate(conf);
+      obtainTokenForJob(conn, userProvider.create(user), job);
+    } finally {
+      conn.close();
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @param job The job instance in which the token should be stored
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainTokenForJob(final HConnection conn,
+      User user, Job job)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+      if (token == null) {
+        throw new IOException("No token returned for user " + user.getName());
+      }
+      Text clusterId = getClusterId(token);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+            user.getName() + " on cluster " + clusterId.toString());
+      }
+      job.getCredentials().addToken(clusterId, token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user " + user.getName());
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param user The user for whom to obtain the token
+   * @param job The job configuration in which the token should be stored
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   * @deprecated Replaced by {@link #obtainTokenForJob(HConnection,JobConf,User)}
+   */
+  @Deprecated
+  public static void obtainTokenForJob(final JobConf job,
+                                       UserGroupInformation user)
+      throws IOException, InterruptedException {
+    HConnection conn = HConnectionManager.createConnection(job);
+    try {
+      UserProvider userProvider = UserProvider.instantiate(job);
+      obtainTokenForJob(conn, job, userProvider.create(user));
+    } finally {
+      conn.close();
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @param job The job configuration in which the token should be stored
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainTokenForJob(final HConnection conn, final JobConf job, User user)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+      if (token == null) {
+        throw new IOException("No token returned for user " + user.getName());
+      }
+      Text clusterId = getClusterId(token);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+            user.getName() + " on cluster " + clusterId.toString());
+      }
+      job.getCredentials().addToken(clusterId, token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user "+user.getName());
+    }
+  }
+
+  /**
+   * Checks for an authentication token for the given user, obtaining a new token if necessary,
+   * and adds it to the credentials for the given map reduce job.
+   *
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @param job The job configuration in which the token should be stored
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void addTokenForJob(final HConnection conn, final JobConf job, User user)
+      throws IOException, InterruptedException {
+
+    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(),
user);
+    if (token == null) {
+      token = obtainToken(conn, user);
+    }
+    job.getCredentials().addToken(token.getService(), token);
+  }
+
+  /**
+   * Checks for an authentication token for the given user, obtaining a new token if necessary,
+   * and adds it to the credentials for the given map reduce job.
+   *
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @param job The job instance in which the token should be stored
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void addTokenForJob(final HConnection conn, User user, Job job)
+      throws IOException, InterruptedException {
+    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(),
user);
+    if (token == null) {
+      token = obtainToken(conn, user);
+    }
+    job.getCredentials().addToken(token.getService(), token);
+  }
+
+  /**
+   * Checks if an authentication tokens exists for the connected cluster,
+   * obtaining one if needed and adding it to the user's credentials.
+   *
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @throws IOException If making a remote call to the authentication service fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   * @return true if the token was added, false if it already existed
+   */
+  public static boolean addTokenIfMissing(HConnection conn, User user)
+      throws IOException, InterruptedException {
+    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(),
user);
+    if (token == null) {
+      token = obtainToken(conn, user);
+      user.getUGI().addToken(token.getService(), token);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Get the authentication token of the user for the cluster specified in the configuration
+   * @return null if the user does not have the token, otherwise the auth token for the cluster.
+   */
+  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf,
User user)
+      throws IOException, InterruptedException {
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
+    try {
+      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
+      if (clusterId == null) {
+        throw new IOException("Failed to get cluster ID");
+      }
+      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } finally {
+      zkw.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fcde44d/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
index 0c001f6..6c99d81 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
@@ -23,17 +23,20 @@ import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 
 /**
  * Wrapper to abstract out usage of user and group information in HBase.
@@ -46,7 +49,8 @@ import org.apache.hadoop.security.token.Token;
  * HBase, but can be extended as needs change.
  * </p>
  */
-@InterfaceAudience.Private
+@InterfaceAudience.Public
+@InterfaceStability.Stable
 public abstract class User {
   public static final String HBASE_SECURITY_CONF_KEY =
       "hbase.security.authentication";
@@ -62,6 +66,7 @@ public abstract class User {
   /**
    * Returns the full user name.  For Kerberos principals this will include
    * the host and realm portions of the principal name.
+   *
    * @return User full name.
    */
   public String getName() {
@@ -80,6 +85,7 @@ public abstract class User {
   /**
    * Returns the shortened version of the user name -- the portion that maps
    * to an operating system user name.
+   *
    * @return Short name
    */
   public abstract String getShortName();
@@ -100,7 +106,10 @@ public abstract class User {
    * user's credentials.
    *
    * @throws IOException
+   * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(HConnection,User,Job)}
+   *     instead.
    */
+  @Deprecated
   public abstract void obtainAuthTokenForJob(Configuration conf, Job job)
       throws IOException, InterruptedException;
 
@@ -109,7 +118,10 @@ public abstract class User {
    * user's credentials.
    *
    * @throws IOException
+   * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(HConnection,JobConf,User)}
+   *     instead.
    */
+  @Deprecated
   public abstract void obtainAuthTokenForJob(JobConf job)
       throws IOException, InterruptedException;
 
@@ -122,16 +134,31 @@ public abstract class User {
    * @return the token of the specified kind.
    */
   public Token<?> getToken(String kind, String service) throws IOException {
-    for (Token<?> token: ugi.getTokens()) {
+    for (Token<?> token : ugi.getTokens()) {
       if (token.getKind().toString().equals(kind) &&
-          (service != null && token.getService().toString().equals(service)))
-      {
+          (service != null && token.getService().toString().equals(service))) {
         return token;
       }
     }
     return null;
   }
 
+  /**
+   * Returns all the tokens stored in the user's credentials.
+   */
+  public Collection<Token<? extends TokenIdentifier>> getTokens() {
+    return ugi.getTokens();
+  }
+
+  /**
+   * Adds the given Token to the user's credentials.
+   *
+   * @param token the token to add
+   */
+  public void addToken(Token<? extends TokenIdentifier> token) {
+    ugi.addToken(token);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fcde44d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
index c05fad5..410fd31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -20,32 +20,26 @@ package org.apache.hadoop.hbase.mapred;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.security.token.Token;
-import org.apache.zookeeper.KeeperException;
-import org.cliffc.high_scale_lib.Counter;
 
 /**
  * Utility for {@link TableMap} and {@link TableReduce}
@@ -236,40 +230,21 @@ public class TableMapReduceUtil {
     }
 
     if (userProvider.isHBaseSecurityEnabled()) {
+      HConnection conn = HConnectionManager.createConnection(job);
       try {
         // login the server principal (if using secure Hadoop)
         User user = userProvider.getCurrent();
-        Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
-        if (authToken == null) {
-          user.obtainAuthTokenForJob(job);
-        } else {
-          job.getCredentials().addToken(authToken.getService(), authToken);
-        }
+        TokenUtil.addTokenForJob(conn, job, user);
       } catch (InterruptedException ie) {
         ie.printStackTrace();
         Thread.currentThread().interrupt();
+      } finally {
+        conn.close();
       }
     }
   }
 
   /**
-   * Get the authentication token of the user for the cluster specified in the configuration
-   * @return null if the user does not have the token, otherwise the auth token for the cluster.
-   */
-  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf,
User user)
-      throws IOException, InterruptedException {
-    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
-    try {
-      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
-      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
-    } catch (KeeperException e) {
-      throw new IOException(e);
-    } finally {
-      zkw.close();
-    }
-  }
-
-  /**
    * Ensures that the given number of reduce tasks for the given job
    * configuration does not exceed the number of regions for the given table.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fcde44d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index eed29b4..bfb978a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -52,22 +54,15 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.KeeperException;
-import org.cliffc.high_scale_lib.Counter;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -426,10 +421,20 @@ public class TableMapReduceUtil {
         if (quorumAddress != null) {
           Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
           ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-          obtainAuthTokenForJob(job, peerConf, user);
+          HConnection peerConn = HConnectionManager.createConnection(peerConf);
+          try {
+            TokenUtil.addTokenForJob(peerConn, user, job);
+          } finally {
+            peerConn.close();
+          }
         }
 
-        obtainAuthTokenForJob(job, job.getConfiguration(), user);
+        HConnection conn = HConnectionManager.createConnection(job.getConfiguration());
+        try {
+          TokenUtil.addTokenForJob(conn, user, job);
+        } finally {
+          conn.close();
+        }
       } catch (InterruptedException ie) {
         LOG.info("Interrupted obtaining user authentication token");
         Thread.currentThread().interrupt();
@@ -455,7 +460,12 @@ public class TableMapReduceUtil {
       try {
         Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
         ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-        obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
+        HConnection peerConn = HConnectionManager.createConnection(peerConf);
+        try {
+          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
+        } finally {
+          peerConn.close();
+        }
       } catch (InterruptedException e) {
         LOG.info("Interrupted obtaining user authentication token");
         Thread.interrupted();
@@ -463,33 +473,6 @@ public class TableMapReduceUtil {
     }
   }
 
-  private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
-      throws IOException, InterruptedException {
-    Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);
-    if (authToken == null) {
-      user.obtainAuthTokenForJob(conf, job);
-    } else {
-      job.getCredentials().addToken(authToken.getService(), authToken);
-    }
-  }
-
-  /**
-   * Get the authentication token of the user for the cluster specified in the configuration
-   * @return null if the user does not have the token, otherwise the auth token for the cluster.
-   */
-  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf,
User user)
-      throws IOException, InterruptedException {
-    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
-    try {
-      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
-      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
-    } catch (KeeperException e) {
-      throw new IOException(e);
-    } finally {
-      zkw.close();
-    }
-  }
-
   /**
    * Writes the given scan into a Base64 encoded string.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fcde44d/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
deleted file mode 100644
index bc10cec..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.security.token;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.security.PrivilegedExceptionAction;
-
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-
-/**
- * Utility methods for obtaining authentication tokens.
- */
-@InterfaceAudience.Private
-public class TokenUtil {
-  private static Log LOG = LogFactory.getLog(TokenUtil.class);
-
-  /**
-   * Obtain and return an authentication token for the current user.
-   * @param conf The configuration for connecting to the cluster
-   * @return the authentication token instance
-   */
-  public static Token<AuthenticationTokenIdentifier> obtainToken(
-      Configuration conf) throws IOException {
-    HTable meta = null;
-    try {
-      meta = new HTable(conf, TableName.META_TABLE_NAME);
-      CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
-      AuthenticationProtos.AuthenticationService.BlockingInterface service =
-          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
-      AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
-          AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
-
-      return ProtobufUtil.toToken(response.getToken());
-    } catch (ServiceException se) {
-      ProtobufUtil.toIOException(se);
-    } finally {
-      if (meta != null) {
-        meta.close();
-      }
-    }
-    // dummy return for ServiceException catch block
-    return null;
-  }
-
-  private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
-      throws IOException {
-    return token.getService() != null
-        ? token.getService() : new Text("default");
-  }
-
-  /**
-   * Obtain an authentication token for the given user and add it to the
-   * user's credentials.
-   * @param conf The configuration for connecting to the cluster
-   * @param user The user for whom to obtain the token
-   * @throws IOException If making a remote call to the {@link TokenProvider} fails
-   * @throws InterruptedException If executing as the given user is interrupted
-   */
-  public static void obtainAndCacheToken(final Configuration conf,
-      UserGroupInformation user)
-      throws IOException, InterruptedException {
-    try {
-      Token<AuthenticationTokenIdentifier> token =
-          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>()
{
-            public Token<AuthenticationTokenIdentifier> run() throws Exception {
-              return obtainToken(conf);
-            }
-          });
-
-      if (token == null) {
-        throw new IOException("No token returned for user "+user.getUserName());
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Obtained token "+token.getKind().toString()+" for user "+
-            user.getUserName());
-      }
-      user.addToken(token);
-    } catch (IOException ioe) {
-      throw ioe;
-    } catch (InterruptedException ie) {
-      throw ie;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (Exception e) {
-      throw new UndeclaredThrowableException(e,
-          "Unexpected exception obtaining token for user "+user.getUserName());
-    }
-  }
-
-  /**
-   * Obtain an authentication token on behalf of the given user and add it to
-   * the credentials for the given map reduce job.
-   * @param conf The configuration for connecting to the cluster
-   * @param user The user for whom to obtain the token
-   * @param job The job instance in which the token should be stored
-   * @throws IOException If making a remote call to the {@link TokenProvider} fails
-   * @throws InterruptedException If executing as the given user is interrupted
-   */
-  public static void obtainTokenForJob(final Configuration conf,
-      UserGroupInformation user, Job job)
-      throws IOException, InterruptedException {
-    try {
-      Token<AuthenticationTokenIdentifier> token =
-          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>()
{
-            public Token<AuthenticationTokenIdentifier> run() throws Exception {
-              return obtainToken(conf);
-            }
-          });
-
-      if (token == null) {
-        throw new IOException("No token returned for user "+user.getUserName());
-      }
-      Text clusterId = getClusterId(token);
-      LOG.info("Obtained token "+token.getKind().toString()+" for user "+
-          user.getUserName() + " on cluster "+clusterId.toString());
-      job.getCredentials().addToken(clusterId, token);
-    } catch (IOException ioe) {
-      throw ioe;
-    } catch (InterruptedException ie) {
-      throw ie;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (Exception e) {
-      throw new UndeclaredThrowableException(e,
-          "Unexpected exception obtaining token for user "+user.getUserName());
-    }
-  }
-
-  /**
-   * Obtain an authentication token on behalf of the given user and add it to
-   * the credentials for the given map reduce job.
-   * @param user The user for whom to obtain the token
-   * @param job The job configuration in which the token should be stored
-   * @throws IOException If making a remote call to the {@link TokenProvider} fails
-   * @throws InterruptedException If executing as the given user is interrupted
-   */
-  public static void obtainTokenForJob(final JobConf job,
-      UserGroupInformation user)
-      throws IOException, InterruptedException {
-    try {
-      Token<AuthenticationTokenIdentifier> token =
-          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>()
{
-            public Token<AuthenticationTokenIdentifier> run() throws Exception {
-              return obtainToken(job);
-            }
-          });
-
-      if (token == null) {
-        throw new IOException("No token returned for user "+user.getUserName());
-      }
-      Text clusterId = getClusterId(token);
-      LOG.info("Obtained token "+token.getKind().toString()+" for user "+
-          user.getUserName()+" on cluster "+clusterId.toString());
-      job.getCredentials().addToken(clusterId, token);
-    } catch (IOException ioe) {
-      throw ioe;
-    } catch (InterruptedException ie) {
-      throw ie;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (Exception e) {
-      throw new UndeclaredThrowableException(e,
-          "Unexpected exception obtaining token for user "+user.getUserName());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fcde44d/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index 387d0a4..4a47caa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.security.token;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -42,6 +44,8 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
@@ -391,11 +395,11 @@ public class TestTokenAuthentication {
                 System.currentTimeMillis());
         try {
           BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
-            User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+              User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
           AuthenticationProtos.AuthenticationService.BlockingInterface stub =
-            AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
+              AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
           AuthenticationProtos.WhoAmIResponse response =
-            stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
+              stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
           String myname = response.getUsername();
           assertEquals("testuser", myname);
           String authMethod = response.getAuthMethod();
@@ -407,4 +411,31 @@ public class TestTokenAuthentication {
       }
     });
   }
+
+  @Test
+  public void testUseExistingToken() throws Exception {
+    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
+        new String[]{"testgroup"});
+    Token<AuthenticationTokenIdentifier> token =
+        secretManager.generateToken(user.getName());
+    assertNotNull(token);
+    user.addToken(token);
+
+    // make sure we got a token
+    Token<AuthenticationTokenIdentifier> firstToken =
+        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
+    assertNotNull(firstToken);
+    assertEquals(token, firstToken);
+
+    HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
+    try {
+      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
+      // make sure we still have the same token
+      Token<AuthenticationTokenIdentifier> secondToken =
+          new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
+      assertEquals(firstToken, secondToken);
+    } finally {
+      conn.close();
+    }
+  }
 }


Mime
View raw message