hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vihan...@apache.org
Subject [2/3] hive git commit: HIVE-17371 : Move tokenstores to metastore module (Vihang Karajgaonkar, reviewed by Alan Gates, Thejas M Nair)
Date Tue, 17 Oct 2017 02:43:04 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
deleted file mode 100644
index f6e2420..0000000
--- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.thrift;
-
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
-
-/**
- * A delegation token that is specialized for Hive
- */
-
-public class DelegationTokenSelector
-    extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
-
-  public DelegationTokenSelector() {
-    super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
deleted file mode 100644
index 867b4ed..0000000
--- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.thrift;
-
-import java.io.Closeable;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
-
-/**
- * Interface for pluggable token store that can be implemented with shared external
- * storage for load balancing and high availability (for example using ZooKeeper).
- * Internal, store specific errors are translated into {@link TokenStoreException}.
- */
-public interface DelegationTokenStore extends Configurable, Closeable {
-
-  /**
-   * Exception for internal token store errors that typically cannot be handled by the caller.
-   */
-  public static class TokenStoreException extends RuntimeException {
-    private static final long serialVersionUID = -8693819817623074083L;
-
-    public TokenStoreException(Throwable cause) {
-      super(cause);
-    }
-
-    public TokenStoreException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Add new master key. The token store assigns and returns the sequence number.
-   * Caller needs to use the identifier to update the key (since it is embedded in the key).
-   *
-   * @param s
-   * @return sequence number for new key
-   */
-  int addMasterKey(String s) throws TokenStoreException;
-
-  /**
-   * Update master key (for expiration and setting store assigned sequence within key)
-   * @param keySeq
-   * @param s
-   * @throws TokenStoreException
-   */
-  void updateMasterKey(int keySeq, String s) throws TokenStoreException;
-
-  /**
-   * Remove key for given id.
-   * @param keySeq
-   * @return false if key no longer present, true otherwise.
-   */
-  boolean removeMasterKey(int keySeq);
-
-  /**
-   * Return all master keys.
-   * @return
-   * @throws TokenStoreException
-   */
-  String[] getMasterKeys() throws TokenStoreException;
-
-  /**
-   * Add token. If identifier is already present, token won't be added.
-   * @param tokenIdentifier
-   * @param token
-   * @return true if token was added, false for existing identifier
-   */
-  boolean addToken(DelegationTokenIdentifier tokenIdentifier,
-      DelegationTokenInformation token) throws TokenStoreException;
-
-  /**
-   * Get token. Returns null if the token does not exist.
-   * @param tokenIdentifier
-   * @return
-   */
-  DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
-      throws TokenStoreException;
-
-  /**
-   * Remove token. Return value can be used by caller to detect concurrency.
-   * @param tokenIdentifier
-   * @return true if token was removed, false if it was already removed.
-   * @throws TokenStoreException
-   */
-  boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException;
-
-  /**
-   * List of all token identifiers in the store. This is used to remove expired tokens
-   * and a potential scalability improvement would be to partition by master key id
-   * @return
-   */
-  List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException;
-
-  /**
-   * @param hmsHandler ObjectStore used by DBTokenStore
-   * @param smode Indicate whether this is a metastore or hiveserver2 token store
-   */
-  void init(Object hmsHandler, ServerMode smode);
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
deleted file mode 100644
index 2d39bea..0000000
--- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
+++ /dev/null
@@ -1,689 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.thrift;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedExceptionAction;
-import java.util.Locale;
-import java.util.Map;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.commons.codec.binary.Base64;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
-import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
-import org.apache.hadoop.hive.thrift.DelegationTokenSecretManager;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSaslServerTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
-
-/**
- * Functions that bridge Thrift's SASL transports to Hadoop's
- * SASL callback handlers and authentication classes.
- * HIVE-11378 This class is not directly used anymore.  It now exists only as a shell to be
- * extended by HadoopThriftAuthBridge23 in 0.23 shims.  I have made it abstract
- * to avoid maintenance errors.
- */
-public abstract class HadoopThriftAuthBridge {
-  private static final Logger LOG = LoggerFactory.getLogger(HadoopThriftAuthBridge.class);
-
-  public Client createClient() {
-    return new Client();
-  }
-
-  public Client createClientWithConf(String authMethod) {
-    UserGroupInformation ugi;
-    try {
-      ugi = UserGroupInformation.getLoginUser();
-    } catch(IOException e) {
-      throw new IllegalStateException("Unable to get current login user: " + e, e);
-    }
-    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
-      LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
-      return new Client();
-    } else {
-      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
-      Configuration conf = new Configuration();
-      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
-      UserGroupInformation.setConfiguration(conf);
-      return new Client();
-    }
-  }
-
-  public Server createServer(String keytabFile, String principalConf, String clientConf) throws TTransportException {
-    return new Server(keytabFile, principalConf, clientConf);
-  }
-
-  public String getServerPrincipal(String principalConfig, String host)
-      throws IOException {
-    String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
-    String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
-    if (names.length != 3) {
-      throw new IOException(
-          "Kerberos principal name does NOT have the expected hostname part: "
-              + serverPrincipal);
-    }
-    return serverPrincipal;
-  }
-
-  /**
-   * Method to get canonical-ized hostname, given a hostname (possibly a CNAME).
-   * This should allow for service-principals to use simplified CNAMEs.
-   * @param hostName The hostname to be canonical-ized.
-   * @return Given a CNAME, the canonical-ized hostname is returned. If not found, the original hostname is returned.
-   */
-  public String getCanonicalHostName(String hostName) {
-    try {
-      return InetAddress.getByName(hostName).getCanonicalHostName();
-    }
-    catch(UnknownHostException exception) {
-      LOG.warn("Could not retrieve canonical hostname for " + hostName, exception);
-      return hostName;
-    }
-  }
-
-  public UserGroupInformation getCurrentUGIWithConf(String authMethod)
-      throws IOException {
-    UserGroupInformation ugi;
-    try {
-      ugi = UserGroupInformation.getCurrentUser();
-    } catch(IOException e) {
-      throw new IllegalStateException("Unable to get current user: " + e, e);
-    }
-    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
-      LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current.");
-      return ugi;
-    } else {
-      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current.");
-      Configuration conf = new Configuration();
-      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
-      UserGroupInformation.setConfiguration(conf);
-      return UserGroupInformation.getCurrentUser();
-    }
-  }
-
-  /**
-   * Return true if the current login user is already using the given authMethod.
-   *
-   * Used above to ensure we do not create a new Configuration object and as such
-   * lose other settings such as the cluster to which the JVM is connected. Required
-   * for oozie since it does not have a core-site.xml see HIVE-7682
-   */
-  private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) {
-    AuthenticationMethod authMethod;
-    try {
-      // based on SecurityUtil.getAuthenticationMethod()
-      authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH));
-    } catch (IllegalArgumentException iae) {
-      throw new IllegalArgumentException("Invalid attribute value for " +
-          HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae);
-    }
-    LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod());
-    return ugi.getAuthenticationMethod().equals(authMethod);
-  }
-
-
-  /**
-   * Read and return Hadoop SASL configuration which can be configured using
-   * "hadoop.rpc.protection"
-   * @param conf
-   * @return Hadoop SASL configuration
-   */
-
-  public abstract Map<String, String> getHadoopSaslProperties(Configuration conf);
-
-  public static class Client {
-    /**
-     * Create a client-side SASL transport that wraps an underlying transport.
-     *
-     * @param methodStr The authentication method to use. Currently only KERBEROS is
-     *               supported.
-     * @param principalConfig The Kerberos principal of the target server.
-     * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
-     * @param saslProps the sasl properties to create the client with
-     */
-
-
-    public TTransport createClientTransport(
-        String principalConfig, String host,
-        String methodStr, String tokenStrForm, final TTransport underlyingTransport,
-        final Map<String, String> saslProps) throws IOException {
-      final AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
-
-      TTransport saslTransport = null;
-      switch (method) {
-      case DIGEST:
-        Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
-        t.decodeFromUrlString(tokenStrForm);
-        saslTransport = new TSaslClientTransport(
-            method.getMechanismName(),
-            null,
-            null, SaslRpcServer.SASL_DEFAULT_REALM,
-            saslProps, new SaslClientCallbackHandler(t),
-            underlyingTransport);
-        return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
-
-      case KERBEROS:
-        String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
-        final String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
-        if (names.length != 3) {
-          throw new IOException(
-              "Kerberos principal name does NOT have the expected hostname part: "
-                  + serverPrincipal);
-        }
-        try {
-          return UserGroupInformation.getCurrentUser().doAs(
-              new PrivilegedExceptionAction<TUGIAssumingTransport>() {
-                @Override
-                public TUGIAssumingTransport run() throws IOException {
-                  TTransport saslTransport = new TSaslClientTransport(
-                    method.getMechanismName(),
-                    null,
-                    names[0], names[1],
-                    saslProps, null,
-                    underlyingTransport);
-                  return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
-                }
-              });
-        } catch (InterruptedException | SaslException se) {
-          throw new IOException("Could not instantiate SASL transport", se);
-        }
-
-      default:
-        throw new IOException("Unsupported authentication method: " + method);
-      }
-    }
-    private static class SaslClientCallbackHandler implements CallbackHandler {
-      private final String userName;
-      private final char[] userPassword;
-
-      public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
-        this.userName = encodeIdentifier(token.getIdentifier());
-        this.userPassword = encodePassword(token.getPassword());
-      }
-
-
-      @Override
-      public void handle(Callback[] callbacks)
-          throws UnsupportedCallbackException {
-        NameCallback nc = null;
-        PasswordCallback pc = null;
-        RealmCallback rc = null;
-        for (Callback callback : callbacks) {
-          if (callback instanceof RealmChoiceCallback) {
-            continue;
-          } else if (callback instanceof NameCallback) {
-            nc = (NameCallback) callback;
-          } else if (callback instanceof PasswordCallback) {
-            pc = (PasswordCallback) callback;
-          } else if (callback instanceof RealmCallback) {
-            rc = (RealmCallback) callback;
-          } else {
-            throw new UnsupportedCallbackException(callback,
-                "Unrecognized SASL client callback");
-          }
-        }
-        if (nc != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("SASL client callback: setting username: " + userName);
-          }
-          nc.setName(userName);
-        }
-        if (pc != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("SASL client callback: setting userPassword");
-          }
-          pc.setPassword(userPassword);
-        }
-        if (rc != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("SASL client callback: setting realm: "
-                + rc.getDefaultText());
-          }
-          rc.setText(rc.getDefaultText());
-        }
-      }
-
-      static String encodeIdentifier(byte[] identifier) {
-        return new String(Base64.encodeBase64(identifier));
-      }
-
-      static char[] encodePassword(byte[] password) {
-        return new String(Base64.encodeBase64(password)).toCharArray();
-      }
-    }
-  }
-
-  public static class Server {
-    public enum ServerMode {
-      HIVESERVER2, METASTORE
-    };
-
-    protected final UserGroupInformation realUgi;
-    protected final UserGroupInformation clientValidationUGI;
-    protected DelegationTokenSecretManager secretManager;
-
-    public Server() throws TTransportException {
-      try {
-        realUgi = UserGroupInformation.getCurrentUser();
-        clientValidationUGI = UserGroupInformation.getCurrentUser();
-      } catch (IOException ioe) {
-        throw new TTransportException(ioe);
-      }
-    }
-    /**
-     * Create a server with a kerberos keytab/principal.
-     */
-    protected Server(String keytabFile, String principalConf, String clientConf)
-        throws TTransportException {
-      if (keytabFile == null || keytabFile.isEmpty()) {
-        throw new TTransportException("No keytab specified");
-      }
-      if (principalConf == null || principalConf.isEmpty()) {
-        throw new TTransportException("No principal specified");
-      }
-      if (clientConf == null || clientConf.isEmpty()) {
-        // Don't bust existing setups.
-        LOG.warn("Client-facing principal not set. Using server-side setting: " + principalConf);
-        clientConf = principalConf;
-      }
-
-      // Login from the keytab
-      String kerberosName;
-      try {
-        LOG.info("Logging in via CLIENT based principal ");
-        kerberosName =
-            SecurityUtil.getServerPrincipal(clientConf, "0.0.0.0");
-        UserGroupInformation.loginUserFromKeytab(
-            kerberosName, keytabFile);
-        clientValidationUGI = UserGroupInformation.getLoginUser();
-        assert clientValidationUGI.isFromKeytab();
-
-        LOG.info("Logging in via SERVER based principal ");
-        kerberosName =
-            SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
-        UserGroupInformation.loginUserFromKeytab(
-            kerberosName, keytabFile);
-        realUgi = UserGroupInformation.getLoginUser();
-        assert realUgi.isFromKeytab();
-      } catch (IOException ioe) {
-        throw new TTransportException(ioe);
-      }
-    }
-
-    public void setSecretManager(DelegationTokenSecretManager secretManager) {
-      this.secretManager = secretManager;
-    }
-
-    /**
-     * Create a TTransportFactory that, upon connection of a client socket,
-     * negotiates a Kerberized SASL transport. The resulting TTransportFactory
-     * can be passed as both the input and output transport factory when
-     * instantiating a TThreadPoolServer, for example.
-     *
-     * @param saslProps Map of SASL properties
-     */
-
-    public TTransportFactory createTransportFactory(Map<String, String> saslProps)
-        throws TTransportException {
-
-      TSaslServerTransport.Factory transFactory = createSaslServerTransportFactory(saslProps);
-
-      return new TUGIAssumingTransportFactory(transFactory, clientValidationUGI);
-    }
-
-    /**
-     * Create a TSaslServerTransport.Factory that, upon connection of a client
-     * socket, negotiates a Kerberized SASL transport.
-     *
-     * @param saslProps Map of SASL properties
-     */
-    public TSaslServerTransport.Factory createSaslServerTransportFactory(
-        Map<String, String> saslProps) throws TTransportException {
-      // Parse out the kerberos principal, host, realm.
-      String kerberosName = clientValidationUGI.getUserName();
-      final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
-      if (names.length != 3) {
-        throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
-      }
-
-      TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
-      transFactory.addServerDefinition(
-          AuthMethod.KERBEROS.getMechanismName(),
-          names[0], names[1],  // two parts of kerberos principal
-          saslProps,
-          new SaslRpcServer.SaslGssCallbackHandler());
-      transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
-          null, SaslRpcServer.SASL_DEFAULT_REALM,
-          saslProps, new SaslDigestCallbackHandler(secretManager));
-
-      return transFactory;
-    }
-
-    /**
-     * Wrap a TTransportFactory in such a way that, before processing any RPC, it
-     * assumes the UserGroupInformation of the user authenticated by
-     * the SASL transport.
-     */
-    public TTransportFactory wrapTransportFactory(TTransportFactory transFactory) {
-      return new TUGIAssumingTransportFactory(transFactory, realUgi);
-    }
-
-    /**
-     * Wrap a TProcessor in such a way that, before processing any RPC, it
-     * assumes the UserGroupInformation of the user authenticated by
-     * the SASL transport.
-     */
-
-    public TProcessor wrapProcessor(TProcessor processor) {
-      return new TUGIAssumingProcessor(processor, secretManager, true);
-    }
-
-    /**
-     * Wrap a TProcessor to capture the client information like connecting userid, ip etc
-     */
-
-    public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
-      return new TUGIAssumingProcessor(processor, secretManager, false);
-    }
-
-    final static ThreadLocal<InetAddress> remoteAddress =
-        new ThreadLocal<InetAddress>() {
-
-      @Override
-      protected InetAddress initialValue() {
-        return null;
-      }
-    };
-
-    public InetAddress getRemoteAddress() {
-      return remoteAddress.get();
-    }
-
-    final static ThreadLocal<AuthenticationMethod> authenticationMethod =
-        new ThreadLocal<AuthenticationMethod>() {
-
-      @Override
-      protected AuthenticationMethod initialValue() {
-        return AuthenticationMethod.TOKEN;
-      }
-    };
-
-    private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
-
-      @Override
-      protected String initialValue() {
-        return null;
-      }
-    };
-
-
-    public String getRemoteUser() {
-      return remoteUser.get();
-    }
-
-    private final static ThreadLocal<String> userAuthMechanism =
-        new ThreadLocal<String>() {
-
-      @Override
-      protected String initialValue() {
-        return AuthMethod.KERBEROS.getMechanismName();
-      }
-    };
-
-    public String getUserAuthMechanism() {
-      return userAuthMechanism.get();
-    }
-    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
-    // This code is pretty much completely based on Hadoop's
-    // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
-    // use that Hadoop class as-is was because it needs a Server.Connection object
-    // which is relevant in hadoop rpc but not here in the metastore - so the
-    // code below does not deal with the Connection Server.object.
-    static class SaslDigestCallbackHandler implements CallbackHandler {
-      private final DelegationTokenSecretManager secretManager;
-
-      public SaslDigestCallbackHandler(
-          DelegationTokenSecretManager secretManager) {
-        this.secretManager = secretManager;
-      }
-
-      private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
-        return encodePassword(secretManager.retrievePassword(tokenid));
-      }
-
-      private char[] encodePassword(byte[] password) {
-        return new String(Base64.encodeBase64(password)).toCharArray();
-      }
-      /** {@inheritDoc} */
-
-      @Override
-      public void handle(Callback[] callbacks) throws InvalidToken,
-      UnsupportedCallbackException {
-        NameCallback nc = null;
-        PasswordCallback pc = null;
-        AuthorizeCallback ac = null;
-        for (Callback callback : callbacks) {
-          if (callback instanceof AuthorizeCallback) {
-            ac = (AuthorizeCallback) callback;
-          } else if (callback instanceof NameCallback) {
-            nc = (NameCallback) callback;
-          } else if (callback instanceof PasswordCallback) {
-            pc = (PasswordCallback) callback;
-          } else if (callback instanceof RealmCallback) {
-            continue; // realm is ignored
-          } else {
-            throw new UnsupportedCallbackException(callback,
-                "Unrecognized SASL DIGEST-MD5 Callback");
-          }
-        }
-        if (pc != null) {
-          DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
-              getIdentifier(nc.getDefaultName(), secretManager);
-          char[] password = getPassword(tokenIdentifier);
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("SASL server DIGEST-MD5 callback: setting password "
-                + "for client: " + tokenIdentifier.getUser());
-          }
-          pc.setPassword(password);
-        }
-        if (ac != null) {
-          String authid = ac.getAuthenticationID();
-          String authzid = ac.getAuthorizationID();
-          if (authid.equals(authzid)) {
-            ac.setAuthorized(true);
-          } else {
-            ac.setAuthorized(false);
-          }
-          if (ac.isAuthorized()) {
-            if (LOG.isDebugEnabled()) {
-              String username =
-                  SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
-              LOG.debug("SASL server DIGEST-MD5 callback: setting "
-                  + "canonicalized client ID: " + username);
-            }
-            ac.setAuthorizedID(authzid);
-          }
-        }
-      }
-    }
-
-    /**
-     * Processor that pulls the SaslServer object out of the transport, and
-     * assumes the remote user's UGI before calling through to the original
-     * processor.
-     *
-     * This is used on the server side to set the UGI for each specific call.
-     */
-    protected class TUGIAssumingProcessor implements TProcessor {
-      final TProcessor wrapped;
-      DelegationTokenSecretManager secretManager;
-      boolean useProxy;
-      TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
-          boolean useProxy) {
-        this.wrapped = wrapped;
-        this.secretManager = secretManager;
-        this.useProxy = useProxy;
-      }
-
-
-      @Override
-      public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
-        TTransport trans = inProt.getTransport();
-        if (!(trans instanceof TSaslServerTransport)) {
-          throw new TException("Unexpected non-SASL transport " + trans.getClass());
-        }
-        TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
-        SaslServer saslServer = saslTrans.getSaslServer();
-        String authId = saslServer.getAuthorizationID();
-        LOG.debug("AUTH ID ======>" + authId);
-        String endUser = authId;
-
-        Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
-        remoteAddress.set(socket.getInetAddress());
-
-        String mechanismName = saslServer.getMechanismName();
-        userAuthMechanism.set(mechanismName);
-        if (AuthMethod.PLAIN.getMechanismName().equalsIgnoreCase(mechanismName)) {
-          remoteUser.set(endUser);
-          return wrapped.process(inProt, outProt);
-        }
-
-        authenticationMethod.set(AuthenticationMethod.KERBEROS);
-        if(AuthMethod.TOKEN.getMechanismName().equalsIgnoreCase(mechanismName)) {
-          try {
-            TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
-                secretManager);
-            endUser = tokenId.getUser().getUserName();
-            authenticationMethod.set(AuthenticationMethod.TOKEN);
-          } catch (InvalidToken e) {
-            throw new TException(e.getMessage());
-          }
-        }
-
-        UserGroupInformation clientUgi = null;
-        try {
-          if (useProxy) {
-            clientUgi = UserGroupInformation.createProxyUser(
-                endUser, UserGroupInformation.getLoginUser());
-            remoteUser.set(clientUgi.getShortUserName());
-            LOG.debug("Set remoteUser :" + remoteUser.get());
-            return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
-
-              @Override
-              public Boolean run() {
-                try {
-                  return wrapped.process(inProt, outProt);
-                } catch (TException te) {
-                  throw new RuntimeException(te);
-                }
-              }
-            });
-          } else {
-            // use the short user name for the request
-            UserGroupInformation endUserUgi = UserGroupInformation.createRemoteUser(endUser);
-            remoteUser.set(endUserUgi.getShortUserName());
-            LOG.debug("Set remoteUser :" + remoteUser.get() + ", from endUser :" + endUser);
-            return wrapped.process(inProt, outProt);
-          }
-        } catch (RuntimeException rte) {
-          if (rte.getCause() instanceof TException) {
-            throw (TException)rte.getCause();
-          }
-          throw rte;
-        } catch (InterruptedException ie) {
-          throw new RuntimeException(ie); // unexpected!
-        } catch (IOException ioe) {
-          throw new RuntimeException(ioe); // unexpected!
-        }
-        finally {
-          if (clientUgi != null) {
-            try { FileSystem.closeAllForUGI(clientUgi); }
-            catch(IOException exception) {
-              LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
-            }
-          }
-        }
-      }
-    }
-
-    /**
-     * A TransportFactory that wraps another one, but assumes a specified UGI
-     * before calling through.
-     *
-     * This is used on the server side to assume the server's Principal when accepting
-     * clients.
-     */
-    static class TUGIAssumingTransportFactory extends TTransportFactory {
-      private final UserGroupInformation ugi;
-      private final TTransportFactory wrapped;
-
-      public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
-        assert wrapped != null;
-        assert ugi != null;
-        this.wrapped = wrapped;
-        this.ugi = ugi;
-      }
-
-
-      @Override
-      public TTransport getTransport(final TTransport trans) {
-        return ugi.doAs(new PrivilegedAction<TTransport>() {
-          @Override
-          public TTransport run() {
-            return wrapped.getTransport(trans);
-          }
-        });
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java
deleted file mode 100644
index b3e4a76..0000000
--- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.hadoop.hive.thrift;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class HiveDelegationTokenManager {
-
-  public static final String  DELEGATION_TOKEN_GC_INTERVAL =
-      "hive.cluster.delegation.token.gc-interval";
-  private final static long DELEGATION_TOKEN_GC_INTERVAL_DEFAULT = 3600000; // 1 hour
-  // Delegation token related keys
-  public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY =
-      "hive.cluster.delegation.key.update-interval";
-  public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
-      24*60*60*1000; // 1 day
-  public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
-      "hive.cluster.delegation.token.renew-interval";
-  public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
-      24*60*60*1000;  // 1 day
-  public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY =
-      "hive.cluster.delegation.token.max-lifetime";
-  public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
-      7*24*60*60*1000; // 7 days
-  public static final String DELEGATION_TOKEN_STORE_CLS =
-      "hive.cluster.delegation.token.store.class";
-  public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
-      "hive.cluster.delegation.token.store.zookeeper.connectString";
-  // Alternate connect string specification configuration
-  public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE =
-      "hive.zookeeper.quorum";
-
-  public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
-      "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
-  public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
-      "hive.cluster.delegation.token.store.zookeeper.znode";
-  public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
-      "hive.cluster.delegation.token.store.zookeeper.acl";
-  public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
-      "/hivedelegation";
-
-  protected DelegationTokenSecretManager secretManager;
-
-  public HiveDelegationTokenManager() {
-  }
-
-  public DelegationTokenSecretManager getSecretManager() {
-    return secretManager;
-  }
-
-  public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode smode)
-      throws IOException {
-    long secretKeyInterval =
-        conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
-    long tokenMaxLifetime =
-        conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
-    long tokenRenewInterval =
-        conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
-    long tokenGcInterval =
-        conf.getLong(DELEGATION_TOKEN_GC_INTERVAL, DELEGATION_TOKEN_GC_INTERVAL_DEFAULT);
-
-    DelegationTokenStore dts = getTokenStore(conf);
-    dts.setConf(conf);
-    dts.init(hms, smode);
-    secretManager =
-        new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
-            tokenRenewInterval, tokenGcInterval, dts);
-    secretManager.startThreads();
-  }
-
-  public String getDelegationToken(final String owner, final String renewer, String remoteAddr)
-      throws IOException,
-      InterruptedException {
-    /**
-     * If the user asking the token is same as the 'owner' then don't do
-     * any proxy authorization checks. For cases like oozie, where it gets
-     * a delegation token for another user, we need to make sure oozie is
-     * authorized to get a delegation token.
-     */
-    // Do all checks on short names
-    UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
-    UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
-    if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
-      // in the case of proxy users, the getCurrentUser will return the
-      // real user (for e.g. oozie) due to the doAs that happened just before the
-      // server started executing the method getDelegationToken in the MetaStore
-      ownerUgi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getCurrentUser());
-      ProxyUsers.authorize(ownerUgi, remoteAddr, null);
-    }
-    return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
-
-      @Override
-      public String run() throws IOException {
-        return secretManager.getDelegationToken(renewer);
-      }
-    });
-  }
-
-  public String getDelegationTokenWithService(String owner, String renewer, String service, String remoteAddr)
-      throws IOException, InterruptedException {
-    String token = getDelegationToken(owner, renewer, remoteAddr);
-    return Utils.addServiceToToken(token, service);
-  }
-
-  public long renewDelegationToken(String tokenStrForm)
-      throws IOException {
-    return secretManager.renewDelegationToken(tokenStrForm);
-  }
-
-  public String getUserFromToken(String tokenStr) throws IOException {
-    return secretManager.getUserFromToken(tokenStr);
-  }
-
-  public void cancelDelegationToken(String tokenStrForm) throws IOException {
-    secretManager.cancelDelegationToken(tokenStrForm);
-  }
-
-  /**
-   * Verify token string
-   * @param tokenStrForm
-   * @return user name
-   * @throws IOException
-   */
-  public String verifyDelegationToken(String tokenStrForm) throws IOException {
-    return secretManager.verifyDelegationToken(tokenStrForm);
-  }
-
-  private DelegationTokenStore getTokenStore(Configuration conf) throws IOException {
-    String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
-    if (StringUtils.isBlank(tokenStoreClassName)) {
-      return new MemoryTokenStore();
-    }
-    try {
-      Class<? extends DelegationTokenStore> storeClass =
-          Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class);
-      return ReflectionUtils.newInstance(storeClass, conf);
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e);
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
deleted file mode 100644
index 9d837b8..0000000
--- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.thrift;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Default in-memory token store implementation.
- */
-public class MemoryTokenStore implements DelegationTokenStore {
-  private static final Logger LOG = LoggerFactory.getLogger(MemoryTokenStore.class);
-
-  private final Map<Integer, String> masterKeys
-      = new ConcurrentHashMap<Integer, String>();
-
-  private final ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
-      = new ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
-
-  private final AtomicInteger masterKeySeq = new AtomicInteger();
-  private Configuration conf;
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return this.conf;
-  }
-
-  @Override
-  public int addMasterKey(String s) {
-    int keySeq = masterKeySeq.getAndIncrement();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("addMasterKey: s = " + s + ", keySeq = " + keySeq);
-    }
-    masterKeys.put(keySeq, s);
-    return keySeq;
-  }
-
-  @Override
-  public void updateMasterKey(int keySeq, String s) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq);
-    }
-    masterKeys.put(keySeq, s);
-  }
-
-  @Override
-  public boolean removeMasterKey(int keySeq) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("removeMasterKey: keySeq = " + keySeq);
-    }
-    return masterKeys.remove(keySeq) != null;
-  }
-
-  @Override
-  public String[] getMasterKeys() {
-    return masterKeys.values().toArray(new String[0]);
-  }
-
-  @Override
-  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
-    DelegationTokenInformation token) {
-    DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + (tokenInfo == null));
-    }
-    return (tokenInfo == null);
-  }
-
-  @Override
-  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
-    DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null));
-    }
-    return tokenInfo != null;
-  }
-
-  @Override
-  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
-    DelegationTokenInformation result = tokens.get(tokenIdentifier);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result);
-    }
-    return result;
-  }
-
-  @Override
-  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
-    List<DelegationTokenIdentifier> result = new ArrayList<DelegationTokenIdentifier>(
-        tokens.size());
-    for (DelegationTokenIdentifier id : tokens.keySet()) {
-        result.add(id);
-    }
-    return result;
-  }
-
-  @Override
-  public void close() throws IOException {
-    //no-op
-  }
-
-  @Override
-  public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException {
-    // no-op
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
deleted file mode 100644
index 4719b85..0000000
--- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.thrift;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory
- * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.).
- * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not
- * cached in memory. This avoids complexities related to token expiration. The security token is
- * needed only at the time the transport is opened (as opposed to per interface operation). The
- * assumption therefore is low cost of interprocess token retrieval (for random read efficient store
- * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches.
- * The wrapper incorporates the token store abstraction within the limitations of current
- * Hive/Hadoop dependency (.20S) with minimum code duplication.
- * Eventually this should be supported by Hadoop security directly.
- */
-public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
-
-  final private long keyUpdateInterval;
-  final private long tokenRemoverScanInterval;
-  private Thread tokenRemoverThread;
-
-  final private DelegationTokenStore tokenStore;
-
-  public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
-      long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
-      long delegationTokenRemoverScanInterval,
-      DelegationTokenStore sharedStore) {
-    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
-        delegationTokenRemoverScanInterval);
-    this.keyUpdateInterval = delegationKeyUpdateInterval;
-    this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
-
-    this.tokenStore = sharedStore;
-  }
-
-  protected Map<Integer, DelegationKey> reloadKeys() {
-    // read keys from token store
-    String[] allKeys = tokenStore.getMasterKeys();
-    Map<Integer, DelegationKey> keys
-        = new HashMap<Integer, DelegationKey>(allKeys.length);
-    for (String keyStr : allKeys) {
-      DelegationKey key = new DelegationKey();
-      try {
-        decodeWritable(key, keyStr);
-        keys.put(key.getKeyId(), key);
-      } catch (IOException ex) {
-        LOGGER.error("Failed to load master key.", ex);
-      }
-    }
-    synchronized (this) {
-        super.allKeys.clear();
-        super.allKeys.putAll(keys);
-    }
-    return keys;
-  }
-
-  @Override
-  public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken {
-      DelegationTokenInformation info = this.tokenStore.getToken(identifier);
-      if (info == null) {
-          throw new InvalidToken("token expired or does not exist: " + identifier);
-      }
-      // must reuse super as info.getPassword is not accessible
-      synchronized (this) {
-        try {
-          super.currentTokens.put(identifier, info);
-          return super.retrievePassword(identifier);
-        } finally {
-          super.currentTokens.remove(identifier);
-        }
-      }
-  }
-
-  @Override
-  public DelegationTokenIdentifier cancelToken(Token<DelegationTokenIdentifier> token,
-      String canceller) throws IOException {
-    DelegationTokenIdentifier id = getTokenIdentifier(token);
-    LOGGER.info("Token cancelation requested for identifier: "+id);
-    this.tokenStore.removeToken(id);
-    return id;
-  }
-
-  /**
-   * Create the password and add it to shared store.
-   */
-  @Override
-  protected byte[] createPassword(DelegationTokenIdentifier id) {
-    byte[] password;
-    DelegationTokenInformation info;
-    synchronized (this) {
-      password = super.createPassword(id);
-      // add new token to shared store
-      // need to persist expiration along with password
-      info = super.currentTokens.remove(id);
-      if (info == null) {
-        throw new IllegalStateException("Failed to retrieve token after creation");
-      }
-    }
-    this.tokenStore.addToken(id, info);
-    return password;
-  }
-
-  @Override
-  public long renewToken(Token<DelegationTokenIdentifier> token,
-      String renewer) throws InvalidToken, IOException {
-    // since renewal is KERBEROS authenticated token may not be cached
-    final DelegationTokenIdentifier id = getTokenIdentifier(token);
-    DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id);
-    if (tokenInfo == null) {
-        throw new InvalidToken("token does not exist: " + id); // no token found
-    }
-    // ensure associated master key is available
-    if (!super.allKeys.containsKey(id.getMasterKeyId())) {
-      LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.",
-        id.getMasterKeyId());
-      reloadKeys();
-    }
-    // reuse super renewal logic
-    synchronized (this) {
-      super.currentTokens.put(id,  tokenInfo);
-      try {
-        return super.renewToken(token, renewer);
-      } finally {
-        super.currentTokens.remove(id);
-      }
-    }
-  }
-
-  public static String encodeWritable(Writable key) throws IOException {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(bos);
-    key.write(dos);
-    dos.flush();
-    return Base64.encodeBase64URLSafeString(bos.toByteArray());
-  }
-
-  public static void decodeWritable(Writable w, String idStr) throws IOException {
-    DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr)));
-    w.readFields(in);
-  }
-
-  /**
-   * Synchronize master key updates / sequence generation for multiple nodes.
-   * NOTE: {@link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need
-   * to utilize this "hook" to manipulate the key through the object reference.
-   * This .20S workaround should cease to exist when Hadoop supports token store.
-   */
-  @Override
-  protected void logUpdateMasterKey(DelegationKey key) throws IOException {
-    int keySeq = this.tokenStore.addMasterKey(encodeWritable(key));
-    // update key with assigned identifier
-    DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey());
-    String keyStr = encodeWritable(keyWithSeq);
-    this.tokenStore.updateMasterKey(keySeq, keyStr);
-    decodeWritable(key, keyStr);
-    LOGGER.info("New master key with key id={}", key.getKeyId());
-    super.logUpdateMasterKey(key);
-  }
-
-  @Override
-  public synchronized void startThreads() throws IOException {
-    try {
-      // updateCurrentKey needs to be called to initialize the master key
-      // (there should be a null check added in the future in rollMasterKey)
-      // updateCurrentKey();
-      Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey");
-      m.setAccessible(true);
-      m.invoke(this);
-    } catch (Exception e) {
-      throw new IOException("Failed to initialize master key", e);
-    }
-    running = true;
-    tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
-    tokenRemoverThread.start();
-  }
-
-  @Override
-  public synchronized void stopThreads() {
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Stopping expired delegation token remover thread");
-    }
-    running = false;
-    if (tokenRemoverThread != null) {
-      tokenRemoverThread.interrupt();
-    }
-  }
-
-  /**
-   * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager}
-   * that cannot be reused due to private method access. Logic here can more efficiently
-   * deal with external token store by only loading into memory the minimum data needed.
-   */
-  protected void removeExpiredTokens() {
-    long now = System.currentTimeMillis();
-    Iterator<DelegationTokenIdentifier> i = tokenStore.getAllDelegationTokenIdentifiers()
-        .iterator();
-    while (i.hasNext()) {
-      DelegationTokenIdentifier id = i.next();
-      if (now > id.getMaxDate()) {
-        this.tokenStore.removeToken(id); // no need to look at token info
-      } else {
-        // get token info to check renew date
-        DelegationTokenInformation tokenInfo = tokenStore.getToken(id);
-        if (tokenInfo != null) {
-          if (now > tokenInfo.getRenewDate()) {
-            this.tokenStore.removeToken(id);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Extension of rollMasterKey to remove expired keys from store.
-   *
-   * @throws IOException
-   */
-  protected void rollMasterKeyExt() throws IOException {
-    Map<Integer, DelegationKey> keys = reloadKeys();
-    int currentKeyId = super.currentId;
-    HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
-    List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys());
-    for (DelegationKey key : keysAfterRoll) {
-      keys.remove(key.getKeyId());
-      if (key.getKeyId() == currentKeyId) {
-        tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
-      }
-    }
-    for (DelegationKey expiredKey : keys.values()) {
-      LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
-      try {
-        tokenStore.removeMasterKey(expiredKey.getKeyId());
-      } catch (Exception e) {
-        LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e);
-      }
-    }
-  }
-
-  /**
-   * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
-   * restriction (there would not be an need to clone the remove thread if the remove logic was
-   * protected/extensible).
-   */
-  protected class ExpiredTokenRemover extends Thread {
-    private long lastMasterKeyUpdate;
-    private long lastTokenCacheCleanup;
-
-    @Override
-    public void run() {
-      LOGGER.info("Starting expired delegation token remover thread, "
-          + "tokenRemoverScanInterval=" + tokenRemoverScanInterval
-          / (60 * 1000) + " min(s)");
-      while (running) {
-        try {
-          long now = System.currentTimeMillis();
-          if (lastMasterKeyUpdate + keyUpdateInterval < now) {
-            try {
-              rollMasterKeyExt();
-              lastMasterKeyUpdate = now;
-            } catch (IOException e) {
-              LOGGER.error("Master key updating failed. "
-                  + StringUtils.stringifyException(e));
-            }
-          }
-          if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) {
-            removeExpiredTokens();
-            lastTokenCacheCleanup = now;
-          }
-          try {
-            Thread.sleep(5000); // 5 seconds
-          } catch (InterruptedException ie) {
-            LOGGER
-            .error("InterruptedException received for ExpiredTokenRemover thread "
-                + ie);
-          }
-        } catch (Throwable t) {
-          LOGGER.error("ExpiredTokenRemover thread received unexpected exception. "
-                           + t, t);
-          // Wait 5 seconds too in case of an exception, so we do not end up in busy waiting for
-          // the solution for this exception
-          try {
-            Thread.sleep(5000); // 5 seconds
-          } catch (InterruptedException ie) {
-            LOGGER.error("InterruptedException received for ExpiredTokenRemover thread during " +
-                "wait in exception sleep " + ie);
-          }
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
deleted file mode 100644
index 885ec56..0000000
--- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
+++ /dev/null
@@ -1,476 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.thrift;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
-import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ZooKeeper token store implementation.
- */
-public class ZooKeeperTokenStore implements DelegationTokenStore {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName());
-
-  protected static final String ZK_SEQ_FORMAT = "%010d";
-  private static final String NODE_KEYS = "/keys";
-  private static final String NODE_TOKENS = "/tokens";
-
-  private String rootNode = "";
-  private volatile CuratorFramework zkSession;
-  private String zkConnectString;
-  private int connectTimeoutMillis;
-  private List<ACL> newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS));
-
-  /**
-   * ACLProvider permissions will be used in case parent dirs need to be created
-   */
-  private final ACLProvider aclDefaultProvider =  new ACLProvider() {
-
-    @Override
-    public List<ACL> getDefaultAcl() {
-      return newNodeAcl;
-    }
-
-    @Override
-    public List<ACL> getAclForPath(String path) {
-      return getDefaultAcl();
-    }
-  };
-
-
-  private ServerMode serverMode;
-
-  private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
-      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
-
-  private Configuration conf;
-
-  /**
-   * Default constructor for dynamic instantiation w/ Configurable
-   * (ReflectionUtils does not support Configuration constructor injection).
-   */
-  protected ZooKeeperTokenStore() {
-  }
-
-  private CuratorFramework getSession() {
-    if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
-      synchronized (this) {
-        if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
-          zkSession =
-              CuratorFrameworkFactory.builder().connectString(zkConnectString)
-                  .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
-                  .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
-          zkSession.start();
-        }
-      }
-    }
-    return zkSession;
-  }
-
-  private void setupJAASConfig(Configuration conf) throws IOException {
-    if (!UserGroupInformation.getLoginUser().isFromKeytab()) {
-      // The process has not logged in using keytab
-      // this should be a test mode, can't use keytab to authenticate
-      // with zookeeper.
-      LOGGER.warn("Login is not from keytab");
-      return;
-    }
-
-    String principal;
-    String keytab;
-    switch (serverMode) {
-    case METASTORE:
-      principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal");
-      keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file");
-      break;
-    case HIVESERVER2:
-      principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal");
-      keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab");
-      break;
-    default:
-      throw new AssertionError("Unexpected server mode " + serverMode);
-    }
-    Utils.setZookeeperClientKerberosJaasConfig(principal, keytab);
-  }
-
-  private String getNonEmptyConfVar(Configuration conf, String param) throws IOException {
-    String val = conf.get(param);
-    if (val == null || val.trim().isEmpty()) {
-      throw new IOException("Configuration parameter " + param + " should be set, "
-          + WHEN_ZK_DSTORE_MSG);
-    }
-    return val;
-  }
-
-  /**
-   * Create a path if it does not already exist ("mkdir -p")
-   * @param path string with '/' separator
-   * @param acl list of ACL entries
-   * @throws TokenStoreException
-   */
-  public void ensurePath(String path, List<ACL> acl)
-      throws TokenStoreException {
-    try {
-      CuratorFramework zk = getSession();
-      String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
-          .withACL(acl).forPath(path);
-      LOGGER.info("Created path: {} ", node);
-    } catch (KeeperException.NodeExistsException e) {
-      // node already exists
-    } catch (Exception e) {
-      throw new TokenStoreException("Error creating path " + path, e);
-    }
-  }
-
-  /**
-   * Parse ACL permission string, from ZooKeeperMain private method
-   * @param permString
-   * @return
-   */
-  public static int getPermFromString(String permString) {
-      int perm = 0;
-      for (int i = 0; i < permString.length(); i++) {
-          switch (permString.charAt(i)) {
-          case 'r':
-              perm |= ZooDefs.Perms.READ;
-              break;
-          case 'w':
-              perm |= ZooDefs.Perms.WRITE;
-              break;
-          case 'c':
-              perm |= ZooDefs.Perms.CREATE;
-              break;
-          case 'd':
-              perm |= ZooDefs.Perms.DELETE;
-              break;
-          case 'a':
-              perm |= ZooDefs.Perms.ADMIN;
-              break;
-          default:
-              LOGGER.error("Unknown perm type: " + permString.charAt(i));
-          }
-      }
-      return perm;
-  }
-
-  /**
-   * Parse comma separated list of ACL entries to secure generated nodes, e.g.
-   * <code>sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa</code>
-   * @param aclString
-   * @return ACL list
-   */
-  public static List<ACL> parseACLs(String aclString) {
-    String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ",");
-    List<ACL> acl = new ArrayList<ACL>(aclComps.length);
-    for (String a : aclComps) {
-      if (StringUtils.isBlank(a)) {
-         continue;
-      }
-      a = a.trim();
-      // from ZooKeeperMain private method
-      int firstColon = a.indexOf(':');
-      int lastColon = a.lastIndexOf(':');
-      if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
-         LOGGER.error(a + " does not have the form scheme:id:perm");
-         continue;
-      }
-      ACL newAcl = new ACL();
-      newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
-          firstColon + 1, lastColon)));
-      newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
-      acl.add(newAcl);
-    }
-    return acl;
-  }
-
-  private void initClientAndPaths() {
-    if (this.zkSession != null) {
-      this.zkSession.close();
-    }
-    try {
-      ensurePath(rootNode + NODE_KEYS, newNodeAcl);
-      ensurePath(rootNode + NODE_TOKENS, newNodeAcl);
-    } catch (TokenStoreException e) {
-      throw e;
-    }
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    if (conf == null) {
-      throw new IllegalArgumentException("conf is null");
-    }
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return null; // not required
-  }
-
-  private Map<Integer, byte[]> getAllKeys() throws KeeperException, InterruptedException {
-
-    String masterKeyNode = rootNode + NODE_KEYS;
-
-    // get children of key node
-    List<String> nodes = zkGetChildren(masterKeyNode);
-
-    // read each child node, add to results
-    Map<Integer, byte[]> result = new HashMap<Integer, byte[]>();
-    for (String node : nodes) {
-      String nodePath = masterKeyNode + "/" + node;
-      byte[] data = zkGetData(nodePath);
-      if (data != null) {
-        result.put(getSeq(node), data);
-      }
-    }
-    return result;
-  }
-
-  private List<String> zkGetChildren(String path) {
-    CuratorFramework zk = getSession();
-    try {
-      return zk.getChildren().forPath(path);
-    } catch (Exception e) {
-      throw new TokenStoreException("Error getting children for " + path, e);
-    }
-  }
-
-  private byte[] zkGetData(String nodePath) {
-    CuratorFramework zk = getSession();
-    try {
-      return zk.getData().forPath(nodePath);
-    } catch (KeeperException.NoNodeException ex) {
-      return null;
-    } catch (Exception e) {
-      throw new TokenStoreException("Error reading " + nodePath, e);
-    }
-  }
-
-  private int getSeq(String path) {
-    String[] pathComps = path.split("/");
-    return Integer.parseInt(pathComps[pathComps.length-1]);
-  }
-
-  @Override
-  public int addMasterKey(String s) {
-    String keysPath = rootNode + NODE_KEYS + "/";
-    CuratorFramework zk = getSession();
-    String newNode;
-    try {
-      newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl)
-          .forPath(keysPath, s.getBytes());
-    } catch (Exception e) {
-      throw new TokenStoreException("Error creating new node with path " + keysPath, e);
-    }
-    LOGGER.info("Added key {}", newNode);
-    return getSeq(newNode);
-  }
-
-  @Override
-  public void updateMasterKey(int keySeq, String s) {
-    CuratorFramework zk = getSession();
-    String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq);
-    try {
-      zk.setData().forPath(keyPath, s.getBytes());
-    } catch (Exception e) {
-      throw new TokenStoreException("Error setting data in " + keyPath, e);
-    }
-  }
-
-  @Override
-  public boolean removeMasterKey(int keySeq) {
-    String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq);
-    zkDelete(keyPath);
-    return true;
-  }
-
-  private void zkDelete(String path) {
-    CuratorFramework zk = getSession();
-    try {
-      zk.delete().forPath(path);
-    } catch (KeeperException.NoNodeException ex) {
-      // already deleted
-    } catch (Exception e) {
-      throw new TokenStoreException("Error deleting " + path, e);
-    }
-  }
-
-  @Override
-  public String[] getMasterKeys() {
-    try {
-      Map<Integer, byte[]> allKeys = getAllKeys();
-      String[] result = new String[allKeys.size()];
-      int resultIdx = 0;
-      for (byte[] keyBytes : allKeys.values()) {
-          result[resultIdx++] = new String(keyBytes);
-      }
-      return result;
-    } catch (KeeperException ex) {
-      throw new TokenStoreException(ex);
-    } catch (InterruptedException ex) {
-      throw new TokenStoreException(ex);
-    }
-  }
-
-
-  private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) {
-    try {
-      return rootNode + NODE_TOKENS + "/"
-          + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
-    } catch (IOException ex) {
-      throw new TokenStoreException("Failed to encode token identifier", ex);
-    }
-  }
-
-  @Override
-  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
-      DelegationTokenInformation token) {
-    byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
-    String tokenPath = getTokenPath(tokenIdentifier);
-    CuratorFramework zk = getSession();
-    String newNode;
-    try {
-      newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl)
-          .forPath(tokenPath, tokenBytes);
-    } catch (Exception e) {
-      throw new TokenStoreException("Error creating new node with path " + tokenPath, e);
-    }
-
-    LOGGER.info("Added token: {}", newNode);
-    return true;
-  }
-
-  @Override
-  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
-    String tokenPath = getTokenPath(tokenIdentifier);
-    zkDelete(tokenPath);
-    return true;
-  }
-
-  @Override
-  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
-    byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier));
-    if(tokenBytes == null) {
-      // The token is already removed.
-      return null;
-    }
-    try {
-      return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
-    } catch (Exception ex) {
-      throw new TokenStoreException("Failed to decode token", ex);
-    }
-  }
-
-  @Override
-  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
-    String containerNode = rootNode + NODE_TOKENS;
-    final List<String> nodes = zkGetChildren(containerNode);
-    List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
-        nodes.size());
-    for (String node : nodes) {
-      DelegationTokenIdentifier id = new DelegationTokenIdentifier();
-      try {
-        TokenStoreDelegationTokenSecretManager.decodeWritable(id, node);
-        result.add(id);
-      } catch (Exception e) {
-        LOGGER.warn("Failed to decode token '{}'", node);
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (this.zkSession != null) {
-      this.zkSession.close();
-    }
-  }
-
-  @Override
-  public void init(Object hmsHandler, ServerMode smode) {
-    this.serverMode = smode;
-    zkConnectString =
-        conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
-    if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
-      // try alternate config param
-      zkConnectString =
-          conf.get(
-              HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE,
-              null);
-      if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
-        throw new IllegalArgumentException("Zookeeper connect string has to be specifed through "
-            + "either " + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
-            + " or "
-            + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
-            + WHEN_ZK_DSTORE_MSG);
-      }
-    }
-    connectTimeoutMillis =
-        conf.getInt(
-            HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
-            CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
-    String aclStr = conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null);
-    if (StringUtils.isNotBlank(aclStr)) {
-      this.newNodeAcl = parseACLs(aclStr);
-    }
-    rootNode =
-        conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE,
-            HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
-
-    try {
-      // Install the JAAS Configuration for the runtime
-      setupJAASConfig(conf);
-    } catch (IOException e) {
-      throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client "
-          + e.getMessage(), e);
-    }
-    initClientAndPaths();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DBTokenStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DBTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DBTokenStore.java
new file mode 100644
index 0000000..2f26010
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DBTokenStore.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DBTokenStore implements DelegationTokenStore {
+  private static final Logger LOG = LoggerFactory.getLogger(DBTokenStore.class);
+  private Configuration conf;
+
+  @Override
+  public int addMasterKey(String s) throws TokenStoreException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("addMasterKey: s = " + s);
+    }
+    return (Integer)invokeOnTokenStore("addMasterKey", new Object[]{s},String.class);
+  }
+
+  @Override
+  public void updateMasterKey(int keySeq, String s) throws TokenStoreException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq);
+    }
+    invokeOnTokenStore("updateMasterKey", new Object[] {Integer.valueOf(keySeq), s},
+        Integer.class, String.class);
+  }
+
+  @Override
+  public boolean removeMasterKey(int keySeq) {
+    return (Boolean)invokeOnTokenStore("removeMasterKey", new Object[] {Integer.valueOf(keySeq)},
+      Integer.class);
+  }
+
+  @Override
+  public String[] getMasterKeys() throws TokenStoreException {
+    return (String[])invokeOnTokenStore("getMasterKeys", new Object[0]);
+  }
+
+  @Override
+  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+      DelegationTokenInformation token) throws TokenStoreException {
+
+    try {
+      String identifier = TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
+      String tokenStr = Base64.encodeBase64URLSafeString(
+        MetastoreDelegationTokenSupport.encodeDelegationTokenInformation(token));
+      boolean result = (Boolean)invokeOnTokenStore("addToken", new Object[] {identifier, tokenStr},
+        String.class, String.class);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + result);
+      }
+      return result;
+    } catch (IOException e) {
+      throw new TokenStoreException(e);
+    }
+  }
+
+  @Override
+  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+      throws TokenStoreException {
+    try {
+      String tokenStr = (String)invokeOnTokenStore("getToken", new Object[] {
+          TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class);
+      DelegationTokenInformation result = null;
+      if (StringUtils.isNotEmpty(tokenStr)) {
+        result = MetastoreDelegationTokenSupport.decodeDelegationTokenInformation(Base64.decodeBase64(tokenStr));
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result);
+      }
+      return result;
+    } catch (IOException e) {
+      throw new TokenStoreException(e);
+    }
+  }
+
+  @Override
+  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException{
+    try {
+      boolean result = (Boolean)invokeOnTokenStore("removeToken", new Object[] {
+        TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + result);
+      }
+      return result;
+    } catch (IOException e) {
+      throw new TokenStoreException(e);
+    }
+  }
+
+  @Override
+  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException{
+
+    List<String> tokenIdents = (List<String>)invokeOnTokenStore("getAllTokenIdentifiers", new Object[0]);
+    List<DelegationTokenIdentifier> delTokenIdents = new ArrayList<DelegationTokenIdentifier>(tokenIdents.size());
+
+    for (String tokenIdent : tokenIdents) {
+      DelegationTokenIdentifier delToken = new DelegationTokenIdentifier();
+      try {
+        TokenStoreDelegationTokenSecretManager.decodeWritable(delToken, tokenIdent);
+      } catch (IOException e) {
+        throw new TokenStoreException(e);
+      }
+      delTokenIdents.add(delToken);
+    }
+    return delTokenIdents;
+  }
+
+  private Object handler;
+  private ServerMode serverMode;
+
+  @Override
+  public void init(Object handler, HadoopThriftAuthBridge.Server.ServerMode serverMode) throws TokenStoreException {
+    this.handler = handler;
+    this.serverMode = serverMode;
+  }
+
+  private Object invokeOnTokenStore(String methName, Object[] params, Class<?> ... paramTypes)
+      throws TokenStoreException{
+    Object tokenStore;
+    try {
+      switch (serverMode) {
+      case METASTORE:
+        tokenStore = handler.getClass().getMethod("getMS").invoke(handler);
+        break;
+      case HIVESERVER2:
+        Object hiveObject = ((Class<?>) handler)
+            .getMethod("get", org.apache.hadoop.conf.Configuration.class, java.lang.Class.class)
+            .invoke(handler, conf, DBTokenStore.class);
+        tokenStore = ((Class<?>) handler).getMethod("getMSC").invoke(hiveObject);
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected value of Server mode " + serverMode);
+      }
+      return tokenStore.getClass().getMethod(methName, paramTypes).invoke(tokenStore, params);
+    } catch (IllegalArgumentException e) {
+        throw new TokenStoreException(e);
+    } catch (SecurityException e) {
+        throw new TokenStoreException(e);
+    } catch (IllegalAccessException e) {
+        throw new TokenStoreException(e);
+    } catch (InvocationTargetException e) {
+        throw new TokenStoreException(e.getCause());
+    } catch (NoSuchMethodException e) {
+        throw new TokenStoreException(e);
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // No-op.
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java
new file mode 100644
index 0000000..d384a37
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+/**
+ * A delegation token that is specialized for Hive
+ */
+
+public class DelegationTokenSelector
+    extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+
+  public DelegationTokenSelector() {
+    super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
+  }
+}


Mime
View raw message