hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1203515 [2/4] - in /hbase/trunk: ./ conf/ security/ security/src/ security/src/main/ security/src/main/java/ security/src/main/java/org/ security/src/main/java/org/apache/ security/src/main/java/org/apache/hadoop/ security/src/main/java/or...
Date Fri, 18 Nov 2011 07:34:45 GMT
Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+
+/**
+ * Implementation of secure Hadoop policy provider for mapping
+ * protocol interfaces to hbase-policy.xml entries.
+ */
+public class HBasePolicyProvider extends PolicyProvider {
+  protected static Service[] services = {
+      new Service("security.client.protocol.acl", HRegionInterface.class),
+      new Service("security.admin.protocol.acl", HMasterInterface.class),
+      new Service("security.masterregion.protocol.acl", HMasterRegionInterface.class)
+  };
+
+  @Override
+  public Service[] getServices() {
+    return services;
+  }
+
+  public static void init(Configuration conf,
+      ServiceAuthorizationManager authManager) {
+    // set service-level authorization security policy
+    conf.set("hadoop.policy.file", "hbase-policy.xml");
+    if (conf.getBoolean(
+          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+      authManager.refresh(conf, new HBasePolicyProvider());
+    }
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * A utility class that encapsulates SASL logic for RPC client.
+ * Copied from <code>org.apache.hadoop.security</code>
+ */
+public class HBaseSaslRpcClient {
+  public static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
+
+  private final SaslClient saslClient;
+
+  /**
+   * Create a HBaseSaslRpcClient for an authentication method
+   * 
+   * @param method
+   *          the requested authentication method
+   * @param token
+   *          token to use if needed by the authentication method
+   */
+  public HBaseSaslRpcClient(AuthMethod method,
+      Token<? extends TokenIdentifier> token, String serverPrincipal)
+      throws IOException {
+    switch (method) {
+    case DIGEST:
+      if (LOG.isDebugEnabled())
+        LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
+            + " client to authenticate to service at " + token.getService());
+      saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+          .getMechanismName() }, null, null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
+          HBaseSaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
+      break;
+    case KERBEROS:
+      if (LOG.isDebugEnabled()) {
+        LOG
+            .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
+                + " client. Server's Kerberos principal name is "
+                + serverPrincipal);
+      }
+      if (serverPrincipal == null || serverPrincipal.length() == 0) {
+        throw new IOException(
+            "Failed to specify server's Kerberos principal name");
+      }
+      String names[] = HBaseSaslRpcServer.splitKerberosName(serverPrincipal);
+      if (names.length != 3) {
+        throw new IOException(
+          "Kerberos principal name does NOT have the expected hostname part: "
+                + serverPrincipal);
+      }
+      saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS
+          .getMechanismName() }, null, names[0], names[1],
+          HBaseSaslRpcServer.SASL_PROPS, null);
+      break;
+    default:
+      throw new IOException("Unknown authentication method " + method);
+    }
+    if (saslClient == null)
+      throw new IOException("Unable to find SASL client implementation");
+  }
+
+  private static void readStatus(DataInputStream inStream) throws IOException {
+    int id = inStream.readInt(); // read and discard dummy id
+    int status = inStream.readInt(); // read status
+    if (status != SaslStatus.SUCCESS.state) {
+      throw new RemoteException(WritableUtils.readString(inStream),
+          WritableUtils.readString(inStream));
+    }
+  }
+  
+  /**
+   * Do client side SASL authentication with server via the given InputStream
+   * and OutputStream
+   * 
+   * @param inS
+   *          InputStream to use
+   * @param outS
+   *          OutputStream to use
+   * @return true if connection is set up, or false if needs to switch 
+   *             to simple Auth.
+   * @throws IOException
+   */
+  public boolean saslConnect(InputStream inS, OutputStream outS)
+      throws IOException {
+    DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
+    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
+        outS));
+
+    try {
+      byte[] saslToken = new byte[0];
+      if (saslClient.hasInitialResponse())
+        saslToken = saslClient.evaluateChallenge(saslToken);
+      if (saslToken != null) {
+        outStream.writeInt(saslToken.length);
+        outStream.write(saslToken, 0, saslToken.length);
+        outStream.flush();
+        if (LOG.isDebugEnabled())
+          LOG.debug("Have sent token of size " + saslToken.length
+              + " from initSASLContext.");
+      }
+      if (!saslClient.isComplete()) {
+        readStatus(inStream);
+        int len = inStream.readInt();
+        if (len == HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Server asks us to fall back to simple auth.");
+          saslClient.dispose();
+          return false;
+        }
+        saslToken = new byte[len];
+        if (LOG.isDebugEnabled())
+          LOG.debug("Will read input token of size " + saslToken.length
+              + " for processing by initSASLContext");
+        inStream.readFully(saslToken);
+      }
+
+      while (!saslClient.isComplete()) {
+        saslToken = saslClient.evaluateChallenge(saslToken);
+        if (saslToken != null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Will send token of size " + saslToken.length
+                + " from initSASLContext.");
+          outStream.writeInt(saslToken.length);
+          outStream.write(saslToken, 0, saslToken.length);
+          outStream.flush();
+        }
+        if (!saslClient.isComplete()) {
+          readStatus(inStream);
+          saslToken = new byte[inStream.readInt()];
+          if (LOG.isDebugEnabled())
+            LOG.debug("Will read input token of size " + saslToken.length
+                + " for processing by initSASLContext");
+          inStream.readFully(saslToken);
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client context established. Negotiated QoP: "
+            + saslClient.getNegotiatedProperty(Sasl.QOP));
+      }
+      return true;
+    } catch (IOException e) {
+      try {
+        saslClient.dispose();
+      } catch (SaslException ignored) {
+        // ignore further exceptions during cleanup
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
+   * been called.
+   * 
+   * @param in
+   *          the InputStream to wrap
+   * @return a SASL wrapped InputStream
+   * @throws IOException
+   */
+  public InputStream getInputStream(InputStream in) throws IOException {
+    if (!saslClient.isComplete()) {
+      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    }
+    return new SaslInputStream(in, saslClient);
+  }
+
+  /**
+   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
+   * been called.
+   * 
+   * @param out
+   *          the OutputStream to wrap
+   * @return a SASL wrapped OutputStream
+   * @throws IOException
+   */
+  public OutputStream getOutputStream(OutputStream out) throws IOException {
+    if (!saslClient.isComplete()) {
+      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    }
+    return new SaslOutputStream(out, saslClient);
+  }
+
+  /** Release resources used by wrapped saslClient */
+  public void dispose() throws SaslException {
+    saslClient.dispose();
+  }
+
+  private static class SaslClientCallbackHandler implements CallbackHandler {
+    private final String userName;
+    private final char[] userPassword;
+
+    public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+      this.userName = HBaseSaslRpcServer.encodeIdentifier(token.getIdentifier());
+      this.userPassword = HBaseSaslRpcServer.encodePassword(token.getPassword());
+    }
+
+    public void handle(Callback[] callbacks)
+        throws UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting username: " + userName);
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting userPassword");
+        pc.setPassword(userPassword);
+      }
+      if (rc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting realm: "
+              + rc.getDefaultText());
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.hbase.ipc.SecureServer;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+/**
+ * A utility class for dealing with SASL on RPC server
+ */
+public class HBaseSaslRpcServer {
+  public static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
+  public static final String SASL_DEFAULT_REALM = "default";
+  public static final Map<String, String> SASL_PROPS =
+      new TreeMap<String, String>();
+
+  public static final int SWITCH_TO_SIMPLE_AUTH = -88;
+
+  public static enum QualityOfProtection {
+    AUTHENTICATION("auth"),
+    INTEGRITY("auth-int"),
+    PRIVACY("auth-conf");
+
+    public final String saslQop;
+
+    private QualityOfProtection(String saslQop) {
+      this.saslQop = saslQop;
+    }
+
+    public String getSaslQop() {
+      return saslQop;
+    }
+  }
+
+  public static void init(Configuration conf) {
+    QualityOfProtection saslQOP = QualityOfProtection.AUTHENTICATION;
+    String rpcProtection = conf.get("hbase.rpc.protection",
+        QualityOfProtection.AUTHENTICATION.name().toLowerCase());
+    if (QualityOfProtection.INTEGRITY.name().toLowerCase()
+        .equals(rpcProtection)) {
+      saslQOP = QualityOfProtection.INTEGRITY;
+    } else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals(
+        rpcProtection)) {
+      saslQOP = QualityOfProtection.PRIVACY;
+    }
+
+    SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
+    SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+  }
+
+  static String encodeIdentifier(byte[] identifier) {
+    return new String(Base64.encodeBase64(identifier));
+  }
+
+  static byte[] decodeIdentifier(String identifier) {
+    return Base64.decodeBase64(identifier.getBytes());
+  }
+
+  public static <T extends TokenIdentifier> T getIdentifier(String id,
+      SecretManager<T> secretManager) throws InvalidToken {
+    byte[] tokenId = decodeIdentifier(id);
+    T tokenIdentifier = secretManager.createIdentifier();
+    try {
+      tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
+          tokenId)));
+    } catch (IOException e) {
+      throw (InvalidToken) new InvalidToken(
+          "Can't de-serialize tokenIdentifier").initCause(e);
+    }
+    return tokenIdentifier;
+  }
+
+  static char[] encodePassword(byte[] password) {
+    return new String(Base64.encodeBase64(password)).toCharArray();
+  }
+
+  /** Splitting fully qualified Kerberos name into parts */
+  public static String[] splitKerberosName(String fullName) {
+    return fullName.split("[/@]");
+  }
+
+  public enum SaslStatus {
+    SUCCESS (0),
+    ERROR (1);
+    
+    public final int state;
+    private SaslStatus(int state) {
+      this.state = state;
+    }
+  }
+  
+  /** Authentication method */
+  public static enum AuthMethod {
+    SIMPLE((byte) 80, "", AuthenticationMethod.SIMPLE),
+    KERBEROS((byte) 81, "GSSAPI", AuthenticationMethod.KERBEROS),
+    DIGEST((byte) 82, "DIGEST-MD5", AuthenticationMethod.TOKEN);
+
+    /** The code for this method. */
+    public final byte code;
+    public final String mechanismName;
+    public final AuthenticationMethod authenticationMethod;
+
+    private AuthMethod(byte code, String mechanismName, 
+                       AuthenticationMethod authMethod) {
+      this.code = code;
+      this.mechanismName = mechanismName;
+      this.authenticationMethod = authMethod;
+    }
+
+    private static final int FIRST_CODE = values()[0].code;
+
+    /** Return the object represented by the code. */
+    private static AuthMethod valueOf(byte code) {
+      final int i = (code & 0xff) - FIRST_CODE;
+      return i < 0 || i >= values().length ? null : values()[i];
+    }
+
+    /** Return the SASL mechanism name */
+    public String getMechanismName() {
+      return mechanismName;
+    }
+
+    /** Read from in */
+    public static AuthMethod read(DataInput in) throws IOException {
+      return valueOf(in.readByte());
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.write(code);
+    }
+  };
+
+  /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+  public static class SaslDigestCallbackHandler implements CallbackHandler {
+    private SecretManager<TokenIdentifier> secretManager;
+    private SecureServer.SecureConnection connection;
+
+    public SaslDigestCallbackHandler(
+        SecretManager<TokenIdentifier> secretManager,
+        SecureServer.SecureConnection connection) {
+      this.secretManager = secretManager;
+      this.connection = connection;
+    }
+
+    private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
+      return encodePassword(secretManager.retrievePassword(tokenid));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void handle(Callback[] callbacks) throws InvalidToken,
+        UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          continue; // realm is ignored
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL DIGEST-MD5 Callback");
+        }
+      }
+      if (pc != null) {
+        TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
+        char[] password = getPassword(tokenIdentifier);
+        UserGroupInformation user = null;
+        user = tokenIdentifier.getUser(); // may throw exception
+        connection.attemptingUser = user;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+              + "for client: " + tokenIdentifier.getUser());
+        }
+        pc.setPassword(password);
+      }
+      if (ac != null) {
+        String authid = ac.getAuthenticationID();
+        String authzid = ac.getAuthorizationID();
+        if (authid.equals(authzid)) {
+          ac.setAuthorized(true);
+        } else {
+          ac.setAuthorized(false);
+        }
+        if (ac.isAuthorized()) {
+          if (LOG.isDebugEnabled()) {
+            String username =
+              getIdentifier(authzid, secretManager).getUser().getUserName();
+            LOG.debug("SASL server DIGEST-MD5 callback: setting "
+                + "canonicalized client ID: " + username);
+          }
+          ac.setAuthorizedID(authzid);
+        }
+      }
+    }
+  }
+
+  /** CallbackHandler for SASL GSSAPI Kerberos mechanism */
+  public static class SaslGssCallbackHandler implements CallbackHandler {
+
+    /** {@inheritDoc} */
+    @Override
+    public void handle(Callback[] callbacks) throws
+        UnsupportedCallbackException {
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL GSSAPI Callback");
+        }
+      }
+      if (ac != null) {
+        String authid = ac.getAuthenticationID();
+        String authzid = ac.getAuthorizationID();
+        if (authid.equals(authzid)) {
+          ac.setAuthorized(true);
+        } else {
+          ac.setAuthorized(false);
+        }
+        if (ac.isAuthorized()) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("SASL server GSSAPI callback: setting "
+                + "canonicalized client ID: " + authzid);
+          ac.setAuthorizedID(authzid);
+        }
+      }
+    }
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import javax.crypto.SecretKey;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Represents a secret key used for signing and verifying authentication tokens
+ * by {@link AuthenticationTokenSecretManager}.
+ */
+public class AuthenticationKey implements Writable {
+  private int id;
+  private long expirationDate;
+  private SecretKey secret;
+
+  public AuthenticationKey() {
+    // for Writable
+  }
+
+  public AuthenticationKey(int keyId, long expirationDate, SecretKey key) {
+    this.id = keyId;
+    this.expirationDate = expirationDate;
+    this.secret = key;
+  }
+
+  public int getKeyId() {
+    return id;
+  }
+
+  public long getExpiration() {
+    return expirationDate;
+  }
+
+  public void setExpiration(long timestamp) {
+    expirationDate = timestamp;
+  }
+
+  SecretKey getKey() {
+    return secret;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof AuthenticationKey)) {
+      return false;
+    }
+    AuthenticationKey other = (AuthenticationKey)obj;
+    return id == other.getKeyId() &&
+        expirationDate == other.getExpiration() &&
+        (secret == null ? other.getKey() == null :
+            other.getKey() != null &&
+                Bytes.equals(secret.getEncoded(), other.getKey().getEncoded()));       
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("AuthenticationKey[ ")
+       .append("id=").append(id)
+       .append(", expiration=").append(expirationDate)
+       .append(" ]");
+    return buf.toString();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, id);
+    WritableUtils.writeVLong(out, expirationDate);
+    if (secret == null) {
+      WritableUtils.writeVInt(out, -1);
+    } else {
+      byte[] keyBytes = secret.getEncoded();
+      WritableUtils.writeVInt(out, keyBytes.length);
+      out.write(keyBytes);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = WritableUtils.readVInt(in);
+    expirationDate = WritableUtils.readVLong(in);
+    int keyLength = WritableUtils.readVInt(in);
+    if (keyLength < 0) {
+      secret = null;
+    } else {
+      byte[] keyBytes = new byte[keyLength];
+      in.readFully(keyBytes);
+      secret = AuthenticationTokenSecretManager.createSecretKey(keyBytes);
+    }
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Defines a custom RPC protocol for obtaining authentication tokens
+ */
+public interface AuthenticationProtocol extends CoprocessorProtocol {
+  /**
+   * Obtains a token capable of authenticating as the current user for future
+   * connections.
+   * @return an authentication token for the current user
+   * @throws IOException If obtaining a token is denied or encounters an error
+   */
+  public Token<AuthenticationTokenIdentifier> getAuthenticationToken()
+      throws IOException;
+
+  /**
+   * Returns the currently authenticated username.
+   */
+  public String whoami();
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Represents the identity information stored in an HBase authentication token.
+ */
+public class AuthenticationTokenIdentifier extends TokenIdentifier {
+  public static final byte VERSION = 1;
+  public static final Text AUTH_TOKEN_TYPE = new Text("HBASE_AUTH_TOKEN");
+
+  protected String username;
+  protected int keyId;
+  protected long issueDate;
+  protected long expirationDate;
+  protected long sequenceNumber;
+  
+  public AuthenticationTokenIdentifier() {
+  }
+
+  public AuthenticationTokenIdentifier(String username) {
+    this.username = username;
+  }
+
+  public AuthenticationTokenIdentifier(String username, int keyId,
+      long issueDate, long expirationDate) {
+    this.username = username;
+    this.keyId = keyId;
+    this.issueDate = issueDate;
+    this.expirationDate = expirationDate;
+  }
+
+  @Override
+  public Text getKind() {
+    return AUTH_TOKEN_TYPE;
+  }
+
+  @Override
+  public UserGroupInformation getUser() {
+    if (username == null || "".equals(username)) {
+      return null;
+    }
+    return UserGroupInformation.createRemoteUser(username);
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  void setUsername(String name) {
+    this.username = name;
+  }
+
+  public int getKeyId() {
+    return keyId;
+  }
+
+  void setKeyId(int id) {
+    this.keyId = id;
+  }
+
+  public long getIssueDate() {
+    return issueDate;
+  }
+
+  void setIssueDate(long timestamp) {
+    this.issueDate = timestamp;
+  }
+
+  public long getExpirationDate() {
+    return expirationDate;
+  }
+
+  void setExpirationDate(long timestamp) {
+    this.expirationDate = timestamp;
+  }
+
+  public long getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  void setSequenceNumber(long seq) {
+    this.sequenceNumber = seq;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(VERSION);
+    WritableUtils.writeString(out, username);
+    WritableUtils.writeVInt(out, keyId);
+    WritableUtils.writeVLong(out, issueDate);
+    WritableUtils.writeVLong(out, expirationDate);
+    WritableUtils.writeVLong(out, sequenceNumber);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte version = in.readByte();
+    if (version != VERSION) {
+      throw new IOException("Version mismatch in deserialization: " +
+          "expected="+VERSION+", got="+version);
+    }
+    username = WritableUtils.readString(in);
+    keyId = WritableUtils.readVInt(in);
+    issueDate = WritableUtils.readVLong(in);
+    expirationDate = WritableUtils.readVLong(in);
+    sequenceNumber = WritableUtils.readVLong(in);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other instanceof AuthenticationTokenIdentifier) {
+      AuthenticationTokenIdentifier ident = (AuthenticationTokenIdentifier)other;
+      return sequenceNumber == ident.getSequenceNumber()
+          && keyId == ident.getKeyId()
+          && issueDate == ident.getIssueDate()
+          && expirationDate == ident.getExpirationDate()
+          && (username == null ? ident.getUsername() == null :
+              username.equals(ident.getUsername()));
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)sequenceNumber;
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import javax.crypto.SecretKey;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKLeaderManager;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Manages an internal list of secret keys used to sign new authentication
+ * tokens as they are generated, and to valid existing tokens used for
+ * authentication.
+ *
+ * <p>
+ * A single instance of {@code AuthenticationTokenSecretManager} will be
+ * running as the "leader" in a given HBase cluster.  The leader is responsible
+ * for periodically generating new secret keys, which are then distributed to
+ * followers via ZooKeeper, and for expiring previously used secret keys that
+ * are no longer needed (as any tokens using them have expired).
+ * </p>
+ */
+public class AuthenticationTokenSecretManager
+    extends SecretManager<AuthenticationTokenIdentifier> {
+
+  static final String NAME_PREFIX = "SecretManager-";
+
+  private static Log LOG = LogFactory.getLog(
+      AuthenticationTokenSecretManager.class);
+
+  private long lastKeyUpdate;
+  private long keyUpdateInterval;
+  private long tokenMaxLifetime;
+  private ZKSecretWatcher zkWatcher;
+  private LeaderElector leaderElector;
+  private ClusterId clusterId;
+
+  private Map<Integer,AuthenticationKey> allKeys =
+      new ConcurrentHashMap<Integer, AuthenticationKey>();
+  private AuthenticationKey currentKey;
+
+  private int idSeq;
+  private AtomicLong tokenSeq = new AtomicLong();
+  private String name;
+
+  /**
+   * Create a new secret manager instance for generating keys.
+   * @param conf Configuration to use
+   * @param zk Connection to zookeeper for handling leader elections
+   * @param keyUpdateInterval Time (in milliseconds) between rolling a new master key for token signing
+   * @param tokenMaxLifetime Maximum age (in milliseconds) before a token expires and is no longer valid
+   */
+  /* TODO: Restrict access to this constructor to make rogues instances more difficult.
+   * For the moment this class is instantiated from
+   * org.apache.hadoop.hbase.ipc.SecureServer so public access is needed.
+   */
+  public AuthenticationTokenSecretManager(Configuration conf,
+      ZooKeeperWatcher zk, String serverName,
+      long keyUpdateInterval, long tokenMaxLifetime) {
+    this.zkWatcher = new ZKSecretWatcher(conf, zk, this);
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenMaxLifetime = tokenMaxLifetime;
+    this.leaderElector = new LeaderElector(zk, serverName);
+    this.name = NAME_PREFIX+serverName;
+    this.clusterId = new ClusterId(zk, zk);
+  }
+
+  public void start() {
+    try {
+      // populate any existing keys
+      this.zkWatcher.start();
+      // try to become leader
+      this.leaderElector.start();
+    } catch (KeeperException ke) {
+      LOG.error("Zookeeper initialization failed", ke);
+    }
+  }
+
+  public void stop() {
+    this.leaderElector.stop("SecretManager stopping");
+  }
+
+  public boolean isMaster() {
+    return leaderElector.isMaster();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  protected byte[] createPassword(AuthenticationTokenIdentifier identifier) {
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    AuthenticationKey secretKey = currentKey;
+    identifier.setKeyId(secretKey.getKeyId());
+    identifier.setIssueDate(now);
+    identifier.setExpirationDate(now + tokenMaxLifetime);
+    identifier.setSequenceNumber(tokenSeq.getAndIncrement());
+    return createPassword(WritableUtils.toByteArray(identifier),
+        secretKey.getKey());
+  }
+
+  @Override
+  public byte[] retrievePassword(AuthenticationTokenIdentifier identifier)
+      throws InvalidToken {
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    if (identifier.getExpirationDate() < now) {
+      throw new InvalidToken("Token has expired");
+    }
+    AuthenticationKey masterKey = allKeys.get(identifier.getKeyId());
+    if (masterKey == null) {
+      throw new InvalidToken("Unknown master key for token (id="+
+          identifier.getKeyId()+")");
+    }
+    // regenerate the password
+    return createPassword(WritableUtils.toByteArray(identifier),
+        masterKey.getKey());
+  }
+
+  @Override
+  public AuthenticationTokenIdentifier createIdentifier() {
+    return new AuthenticationTokenIdentifier();
+  }
+
+  public Token<AuthenticationTokenIdentifier> generateToken(String username) {
+    AuthenticationTokenIdentifier ident =
+        new AuthenticationTokenIdentifier(username);
+    Token<AuthenticationTokenIdentifier> token =
+        new Token<AuthenticationTokenIdentifier>(ident, this);
+    if (clusterId.hasId()) {
+      token.setService(new Text(clusterId.getId()));
+    }
+    return token;
+  }
+
+  public synchronized void addKey(AuthenticationKey key) throws IOException {
+    // ignore zk changes when running as master
+    if (leaderElector.isMaster()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Running as master, ignoring new key "+key.getKeyId());
+      }
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding key "+key.getKeyId());
+    }
+
+    allKeys.put(key.getKeyId(), key);
+    if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) {
+      currentKey = key;
+    }
+    // update current sequence
+    if (key.getKeyId() > idSeq) {
+      idSeq = key.getKeyId();
+    }
+  }
+
+  synchronized void removeKey(Integer keyId) {
+    // ignore zk changes when running as master
+    if (leaderElector.isMaster()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Running as master, ignoring removed key "+keyId);
+      }
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing key "+keyId);
+    }
+
+    allKeys.remove(keyId);
+  }
+
+  AuthenticationKey getCurrentKey() {
+    return currentKey;
+  }
+
+  AuthenticationKey getKey(int keyId) {
+    return allKeys.get(keyId);
+  }
+
+  synchronized void removeExpiredKeys() {
+    if (!leaderElector.isMaster()) {
+      LOG.info("Skipping removeExpiredKeys() because not running as master.");
+      return;
+    }
+
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    Iterator<AuthenticationKey> iter = allKeys.values().iterator();
+    while (iter.hasNext()) {
+      AuthenticationKey key = iter.next();
+      if (key.getExpiration() < now) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removing expired key "+key.getKeyId());
+        }
+        iter.remove();
+        zkWatcher.removeKeyFromZK(key);
+      }
+    }
+  }
+
+  synchronized void rollCurrentKey() {
+    if (!leaderElector.isMaster()) {
+      LOG.info("Skipping rollCurrentKey() because not running as master.");
+      return;
+    }
+
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    AuthenticationKey prev = currentKey;
+    AuthenticationKey newKey = new AuthenticationKey(++idSeq,
+        Long.MAX_VALUE, // don't allow to expire until it's replaced by a new key
+        generateSecret());
+    allKeys.put(newKey.getKeyId(), newKey);
+    currentKey = newKey;
+    zkWatcher.addKeyToZK(newKey);
+    lastKeyUpdate = now;
+
+    if (prev != null) {
+      // make sure previous key is still stored
+      prev.setExpiration(now + tokenMaxLifetime);
+      allKeys.put(prev.getKeyId(), prev);
+      zkWatcher.updateKeyInZK(prev);
+    }
+  }
+
+  public static SecretKey createSecretKey(byte[] raw) {
+    return SecretManager.createSecretKey(raw);
+  }
+
+  private class LeaderElector extends Thread implements Stoppable {
+    private boolean stopped = false;
+    /** Flag indicating whether we're in charge of rolling/expiring keys */
+    private boolean isMaster = false;
+    private ZKLeaderManager zkLeader;
+
+    public LeaderElector(ZooKeeperWatcher watcher, String serverName) {
+      setDaemon(true);
+      setName("ZKSecretWatcher-leaderElector");
+      zkLeader = new ZKLeaderManager(watcher,
+          ZKUtil.joinZNode(zkWatcher.getRootKeyZNode(), "keymaster"),
+          Bytes.toBytes(serverName), this);
+    }
+
+    public boolean isMaster() {
+      return isMaster;
+    }
+
+    @Override
+    public boolean isStopped() {
+      return stopped;
+    }
+
+    @Override
+    public void stop(String reason) {
+      stopped = true;
+      // prevent further key generation when stopping
+      if (isMaster) {
+        zkLeader.stepDownAsLeader();
+      }
+      isMaster = false;
+      LOG.info("Stopping leader election, because: "+reason);
+      interrupt();
+    }
+
+    public void run() {
+      zkLeader.start();
+      zkLeader.waitToBecomeLeader();
+      isMaster = true;
+
+      while (!stopped) {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+
+        // clear any expired
+        removeExpiredKeys();
+
+        if (lastKeyUpdate + keyUpdateInterval < now) {
+          // roll a new master key
+          rollCurrentKey();
+        }
+
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException ie) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Interrupted waiting for next update", ie);
+          }
+        }
+      }
+    }
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class AuthenticationTokenSelector
+    implements TokenSelector<AuthenticationTokenIdentifier> {
+
+  public AuthenticationTokenSelector() {
+  }
+
+  @Override
+  public Token<AuthenticationTokenIdentifier> selectToken(Text serviceName,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (serviceName != null) {
+      for (Token ident : tokens) {
+        if (serviceName.equals(ident.getService()) &&
+            AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.equals(ident.getKind())) {
+          return (Token<AuthenticationTokenIdentifier>)ident;
+        }
+      }
+    }
+    return null;
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RequestContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.SecureServer;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Provides a service for obtaining authentication tokens via the
+ * {@link AuthenticationProtocol} coprocessor protocol.
+ */
+public class TokenProvider extends BaseEndpointCoprocessor
+    implements AuthenticationProtocol {
+
+  public static final long VERSION = 0L;
+  private static Log LOG = LogFactory.getLog(TokenProvider.class);
+
+  private AuthenticationTokenSecretManager secretManager;
+
+
+  @Override
+  public void start(CoprocessorEnvironment env) {
+    super.start(env);
+
+    // if running at region
+    if (env instanceof RegionCoprocessorEnvironment) {
+      RegionCoprocessorEnvironment regionEnv =
+          (RegionCoprocessorEnvironment)env;
+      RpcServer server = regionEnv.getRegionServerServices().getRpcServer();
+      if (server instanceof SecureServer) {
+        SecretManager mgr = ((SecureServer)server).getSecretManager();
+        if (mgr instanceof AuthenticationTokenSecretManager) {
+          secretManager = (AuthenticationTokenSecretManager)mgr;
+        }
+      }
+    }
+  }
+
+  @Override
+  public Token<AuthenticationTokenIdentifier> getAuthenticationToken()
+      throws IOException {
+    if (secretManager == null) {
+      throw new IOException(
+          "No secret manager configured for token authentication");
+    }
+
+    User currentUser = RequestContext.getRequestUser();
+    UserGroupInformation ugi = null;
+    if (currentUser != null) {
+      ugi = currentUser.getUGI();
+    }
+    if (currentUser == null) {
+      throw new AccessDeniedException("No authenticated user for request!");
+    } else if (ugi.getAuthenticationMethod() !=
+        UserGroupInformation.AuthenticationMethod.KERBEROS) {
+      LOG.warn("Token generation denied for user="+currentUser.getName()
+          +", authMethod="+ugi.getAuthenticationMethod());
+      throw new AccessDeniedException(
+          "Token generation only allowed for Kerberos authenticated clients");
+    }
+
+    return secretManager.generateToken(currentUser.getName());
+  }
+
+  @Override
+  public String whoami() {
+    return RequestContext.getRequestUserName();
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    if (AuthenticationProtocol.class.getName().equals(protocol)) {
+      return TokenProvider.VERSION;
+    }
+    LOG.warn("Unknown protocol requested: "+protocol);
+    return -1;
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Utility methods for obtaining authentication tokens.
+ */
+public class TokenUtil {
+  private static Log LOG = LogFactory.getLog(TokenUtil.class);
+
+  /**
+   * Obtain and return an authentication token for the current user.
+   * @param conf The configuration for connecting to the cluster
+   * @return the authentication token instance
+   */
+  public static Token<AuthenticationTokenIdentifier> obtainToken(
+      Configuration conf) throws IOException {
+    HTable meta = null;
+    try {
+      meta = new HTable(conf, ".META.");
+      AuthenticationProtocol prot = meta.coprocessorProxy(
+          AuthenticationProtocol.class, HConstants.EMPTY_START_ROW);
+      return prot.getAuthenticationToken();
+    } finally {
+      if (meta != null) {
+        meta.close();
+      }
+    }
+  }
+
+  private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
+      throws IOException {
+    return token.getService() != null
+        ? token.getService() : new Text("default");
+  }
+
+  /**
+   * Obtain an authentication token for the given user and add it to the
+   * user's credentials.
+   * @param conf The configuration for connecting to the cluster
+   * @param user The user for whom to obtain the token
+   * @throws IOException If making a remote call to the {@link TokenProvider} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainAndCacheToken(final Configuration conf,
+      UserGroupInformation user)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token =
+          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
+            public Token<AuthenticationTokenIdentifier> run() throws Exception {
+              return obtainToken(conf);
+            }
+          });
+
+      if (token == null) {
+        throw new IOException("No token returned for user "+user.getUserName());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Obtained token "+token.getKind().toString()+" for user "+
+            user.getUserName());
+      }
+      user.addToken(token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user "+user.getUserName());
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param conf The configuration for connecting to the cluster
+   * @param user The user for whom to obtain the token
+   * @param job The job instance in which the token should be stored
+   * @throws IOException If making a remote call to the {@link TokenProvider} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainTokenForJob(final Configuration conf,
+      UserGroupInformation user, Job job)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token =
+          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
+            public Token<AuthenticationTokenIdentifier> run() throws Exception {
+              return obtainToken(conf);
+            }
+          });
+
+      if (token == null) {
+        throw new IOException("No token returned for user "+user.getUserName());
+      }
+      Text clusterId = getClusterId(token);
+      LOG.info("Obtained token "+token.getKind().toString()+" for user "+
+          user.getUserName() + " on cluster "+clusterId.toString());
+      job.getCredentials().addToken(clusterId, token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user "+user.getUserName());
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param user The user for whom to obtain the token
+   * @param job The job configuration in which the token should be stored
+   * @throws IOException If making a remote call to the {@link TokenProvider} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainTokenForJob(final JobConf job,
+      UserGroupInformation user)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token =
+          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
+            public Token<AuthenticationTokenIdentifier> run() throws Exception {
+              return obtainToken(job);
+            }
+          });
+
+      if (token == null) {
+        throw new IOException("No token returned for user "+user.getUserName());
+      }
+      Text clusterId = getClusterId(token);
+      LOG.info("Obtained token "+token.getKind().toString()+" for user "+
+          user.getUserName()+" on cluster "+clusterId.toString());
+      job.getCredentials().addToken(clusterId, token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user "+user.getUserName());
+    }
+  }
+}

Added: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java (added)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Synchronizes token encryption keys across cluster nodes.
+ */
+public class ZKSecretWatcher extends ZooKeeperListener {
+  private static final String DEFAULT_ROOT_NODE = "tokenauth";
+  private static final String DEFAULT_KEYS_PARENT = "keys";
+  private static Log LOG = LogFactory.getLog(ZKSecretWatcher.class);
+
+  private AuthenticationTokenSecretManager secretManager;
+  private String baseKeyZNode;
+  private String keysParentZNode;
+
+  public ZKSecretWatcher(Configuration conf,
+      ZooKeeperWatcher watcher,
+      AuthenticationTokenSecretManager secretManager) {
+    super(watcher);
+    this.secretManager = secretManager;
+    String keyZNodeParent = conf.get("zookeeper.znode.tokenauth.parent", DEFAULT_ROOT_NODE);
+    this.baseKeyZNode = ZKUtil.joinZNode(watcher.baseZNode, keyZNodeParent);
+    this.keysParentZNode = ZKUtil.joinZNode(baseKeyZNode, DEFAULT_KEYS_PARENT);
+  }
+
+  public void start() throws KeeperException {
+    watcher.registerListener(this);
+    // make sure the base node exists
+    ZKUtil.createWithParents(watcher, keysParentZNode);
+
+    if (ZKUtil.watchAndCheckExists(watcher, keysParentZNode)) {
+      List<ZKUtil.NodeAndData> nodes =
+          ZKUtil.getChildDataAndWatchForNewChildren(watcher, keysParentZNode);
+      refreshNodes(nodes);
+    }
+  }
+
+  @Override
+  public void nodeCreated(String path) {
+    if (path.equals(keysParentZNode)) {
+      try {
+        List<ZKUtil.NodeAndData> nodes =
+            ZKUtil.getChildDataAndWatchForNewChildren(watcher, keysParentZNode);
+        refreshNodes(nodes);
+      } catch (KeeperException ke) {
+        LOG.fatal("Error reading data from zookeeper", ke);
+        watcher.abort("Error reading new key znode "+path, ke);
+      }
+    }
+  }
+
+  @Override
+  public void nodeDeleted(String path) {
+    if (keysParentZNode.equals(ZKUtil.getParent(path))) {
+      String keyId = ZKUtil.getNodeName(path);
+      try {
+        Integer id = new Integer(keyId);
+        secretManager.removeKey(id);
+      } catch (NumberFormatException nfe) {
+        LOG.error("Invalid znode name for key ID '"+keyId+"'", nfe);
+      }
+    }
+  }
+
+  @Override
+  public void nodeDataChanged(String path) {
+    if (keysParentZNode.equals(ZKUtil.getParent(path))) {
+      try {
+        byte[] data = ZKUtil.getDataAndWatch(watcher, path);
+        if (data == null || data.length == 0) {
+          LOG.debug("Ignoring empty node "+path);
+          return;
+        }
+
+        AuthenticationKey key = (AuthenticationKey)Writables.getWritable(data,
+            new AuthenticationKey());
+        secretManager.addKey(key);
+      } catch (KeeperException ke) {
+        LOG.fatal("Error reading data from zookeeper", ke);
+        watcher.abort("Error reading updated key znode "+path, ke);
+      } catch (IOException ioe) {
+        LOG.fatal("Error reading key writables", ioe);
+        watcher.abort("Error reading key writables from znode "+path, ioe);
+      }
+    }
+  }
+
+  @Override
+  public void nodeChildrenChanged(String path) {
+    if (path.equals(keysParentZNode)) {
+      // keys changed
+      try {
+        List<ZKUtil.NodeAndData> nodes =
+            ZKUtil.getChildDataAndWatchForNewChildren(watcher, keysParentZNode);
+        refreshNodes(nodes);
+      } catch (KeeperException ke) {
+        LOG.fatal("Error reading data from zookeeper", ke);
+        watcher.abort("Error reading changed keys from zookeeper", ke);
+      }
+    }
+  }
+
+  public String getRootKeyZNode() {
+    return baseKeyZNode;
+  }
+
+  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
+    for (ZKUtil.NodeAndData n : nodes) {
+      String path = n.getNode();
+      String keyId = ZKUtil.getNodeName(path);
+      try {
+        byte[] data = n.getData();
+        if (data == null || data.length == 0) {
+          LOG.debug("Ignoring empty node "+path);
+          continue;
+        }
+        AuthenticationKey key = (AuthenticationKey)Writables.getWritable(
+            data, new AuthenticationKey());
+        secretManager.addKey(key);
+      } catch (IOException ioe) {
+        LOG.fatal("Failed reading new secret key for id '" + keyId +
+            "' from zk", ioe);
+        watcher.abort("Error deserializing key from znode "+path, ioe);
+      }
+    }
+  }
+
+  private String getKeyNode(int keyId) {
+    return ZKUtil.joinZNode(keysParentZNode, Integer.toString(keyId));
+  }
+
+  public void removeKeyFromZK(AuthenticationKey key) {
+    String keyZNode = getKeyNode(key.getKeyId());
+    try {
+      ZKUtil.deleteNode(watcher, keyZNode);
+    } catch (KeeperException.NoNodeException nne) {
+      LOG.error("Non-existent znode "+keyZNode+" for key "+key.getKeyId(), nne);
+    } catch (KeeperException ke) {
+      LOG.fatal("Failed removing znode "+keyZNode+" for key "+key.getKeyId(),
+          ke);
+      watcher.abort("Unhandled zookeeper error removing znode "+keyZNode+
+          " for key "+key.getKeyId(), ke);
+    }
+  }
+
+  public void addKeyToZK(AuthenticationKey key) {
+    String keyZNode = getKeyNode(key.getKeyId());
+    try {
+      byte[] keyData = Writables.getBytes(key);
+      // TODO: is there any point in retrying beyond what ZK client does?
+      ZKUtil.createSetData(watcher, keyZNode, keyData);
+    } catch (KeeperException ke) {
+      LOG.fatal("Unable to synchronize master key "+key.getKeyId()+
+          " to znode "+keyZNode, ke);
+      watcher.abort("Unable to synchronize secret key "+
+          key.getKeyId()+" in zookeeper", ke);
+    } catch (IOException ioe) {
+      // this can only happen from an error serializing the key
+      watcher.abort("Failed serializing key "+key.getKeyId(), ioe);
+    }
+  }
+
+  public void updateKeyInZK(AuthenticationKey key) {
+    String keyZNode = getKeyNode(key.getKeyId());
+    try {
+      byte[] keyData = Writables.getBytes(key);
+      try {
+        ZKUtil.updateExistingNodeData(watcher, keyZNode, keyData, -1);
+      } catch (KeeperException.NoNodeException ne) {
+        // node was somehow removed, try adding it back
+        ZKUtil.createSetData(watcher, keyZNode, keyData);
+      }
+    } catch (KeeperException ke) {
+      LOG.fatal("Unable to update master key "+key.getKeyId()+
+          " in znode "+keyZNode);
+      watcher.abort("Unable to synchronize secret key "+
+          key.getKeyId()+" in zookeeper", ke);
+    } catch (IOException ioe) {
+      // this can only happen from an error serializing the key
+      watcher.abort("Failed serializing key "+key.getKeyId(), ioe);
+    }
+  }
+}

Added: hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java (added)
+++ hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.RequestContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.SecureRpcEngine;
+import org.apache.hadoop.hbase.ipc.SecureServer;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+
+import org.apache.hadoop.security.token.Token;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for authentication token creation and usage
+ */
+public class TestTokenAuthentication {
+  public static interface IdentityProtocol extends CoprocessorProtocol {
+    public String whoami();
+    public String getAuthMethod();
+  }
+
+  public static class IdentityCoprocessor extends BaseEndpointCoprocessor
+      implements IdentityProtocol {
+    public String whoami() {
+      return RequestContext.getRequestUserName();
+    }
+
+    public String getAuthMethod() {
+      UserGroupInformation ugi = null;
+      User user = RequestContext.getRequestUser();
+      if (user != null) {
+        ugi = user.getUGI();
+      }
+      if (ugi != null) {
+        return ugi.getAuthenticationMethod().toString();
+      }
+      return null;
+    }
+  }
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private static AuthenticationTokenSecretManager secretManager;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(HBaseRPC.RPC_ENGINE_PROP, SecureRpcEngine.class.getName());
+    conf.set("hbase.coprocessor.region.classes",
+        IdentityCoprocessor.class.getName());
+    TEST_UTIL.startMiniCluster();
+    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+    RpcServer server = rs.getRpcServer();
+    assertTrue(server instanceof SecureServer);
+    SecretManager mgr =
+        ((SecureServer)server).getSecretManager();
+    assertTrue(mgr instanceof AuthenticationTokenSecretManager);
+    secretManager = (AuthenticationTokenSecretManager)mgr;
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testTokenCreation() throws Exception {
+    Token<AuthenticationTokenIdentifier> token =
+        secretManager.generateToken("testuser");
+
+    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
+    Writables.getWritable(token.getIdentifier(), ident);
+    assertEquals("Token username should match", "testuser",
+        ident.getUsername());
+    byte[] passwd = secretManager.retrievePassword(ident);
+    assertTrue("Token password and password from secret manager should match",
+        Bytes.equals(token.getPassword(), passwd));
+  }
+
+  // @Test - Disable due to kerberos requirement
+  public void testTokenAuthentication() throws Exception {
+    UserGroupInformation testuser =
+        UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
+
+    testuser.setAuthenticationMethod(
+        UserGroupInformation.AuthenticationMethod.TOKEN);
+    final Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set("hadoop.security.authentication", "kerberos");
+    conf.set("randomkey", UUID.randomUUID().toString());
+    testuser.setConfiguration(conf);
+    Token<AuthenticationTokenIdentifier> token =
+        secretManager.generateToken("testuser");
+    testuser.addToken(token);
+
+    // verify the server authenticates us as this token user
+    testuser.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        HTable table = new HTable(conf, ".META.");
+        IdentityProtocol prot = table.coprocessorProxy(
+            IdentityProtocol.class, HConstants.EMPTY_START_ROW);
+        String myname = prot.whoami();
+        assertEquals("testuser", myname);
+        String authMethod = prot.getAuthMethod();
+        assertEquals("TOKEN", authMethod);
+        return null;
+      }
+    });
+  }
+}

Added: hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java (added)
+++ hbase/trunk/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the synchronization of token authentication master keys through
+ * ZKSecretWatcher
+ */
+public class TestZKSecretWatcher {
+  private static Log LOG = LogFactory.getLog(TestZKSecretWatcher.class);
+  private static HBaseTestingUtility TEST_UTIL;
+  private static AuthenticationTokenSecretManager KEY_MASTER;
+  private static AuthenticationTokenSecretManager KEY_SLAVE;
+  private static AuthenticationTokenSecretManager KEY_SLAVE2;
+  private static AuthenticationTokenSecretManager KEY_SLAVE3;
+
+  private static class MockAbortable implements Abortable {
+    private boolean abort;
+    public void abort(String reason, Throwable e) {
+      LOG.info("Aborting: "+reason, e);
+      abort = true;
+    }
+
+    public boolean isAborted() {
+      return abort;
+    }
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    TEST_UTIL.startMiniZKCluster();
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    ZooKeeperWatcher zk = newZK(conf, "server1", new MockAbortable());
+    AuthenticationTokenSecretManager[] tmp = new AuthenticationTokenSecretManager[2];
+    tmp[0] = new AuthenticationTokenSecretManager(
+        conf, zk, "server1", 60*60*1000, 60*1000);
+    tmp[0].start();
+
+    zk = newZK(conf, "server2", new MockAbortable());
+    tmp[1] = new AuthenticationTokenSecretManager(
+        conf, zk, "server2", 60*60*1000, 60*1000);
+    tmp[1].start();
+
+    while (KEY_MASTER == null) {
+      for (int i=0; i<2; i++) {
+        if (tmp[i].isMaster()) {
+          KEY_MASTER = tmp[i];
+          KEY_SLAVE = tmp[ i+1 % 2 ];
+          break;
+        }
+      }
+      Thread.sleep(500);
+    }
+    LOG.info("Master is "+KEY_MASTER.getName()+
+        ", slave is "+KEY_SLAVE.getName());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Test
+  public void testKeyUpdate() throws Exception {
+    // sanity check
+    assertTrue(KEY_MASTER.isMaster());
+    assertFalse(KEY_SLAVE.isMaster());
+    int maxKeyId = 0;
+
+    KEY_MASTER.rollCurrentKey();
+    AuthenticationKey key1 = KEY_MASTER.getCurrentKey();
+    assertNotNull(key1);
+    LOG.debug("Master current key: "+key1.getKeyId());
+
+    // wait for slave to update
+    Thread.sleep(1000);
+    AuthenticationKey slaveCurrent = KEY_SLAVE.getCurrentKey();
+    assertNotNull(slaveCurrent);
+    assertEquals(key1, slaveCurrent);
+    LOG.debug("Slave current key: "+slaveCurrent.getKeyId());
+
+    // generate two more keys then expire the original
+    KEY_MASTER.rollCurrentKey();
+    AuthenticationKey key2 = KEY_MASTER.getCurrentKey();
+    LOG.debug("Master new current key: "+key2.getKeyId());
+    KEY_MASTER.rollCurrentKey();
+    AuthenticationKey key3 = KEY_MASTER.getCurrentKey();
+    LOG.debug("Master new current key: "+key3.getKeyId());
+
+    // force expire the original key
+    key1.setExpiration(EnvironmentEdgeManager.currentTimeMillis() - 1000);
+    KEY_MASTER.removeExpiredKeys();
+    // verify removed from master
+    assertNull(KEY_MASTER.getKey(key1.getKeyId()));
+
+    // wait for slave to catch up
+    Thread.sleep(1000);
+    // make sure the slave has both new keys
+    AuthenticationKey slave2 = KEY_SLAVE.getKey(key2.getKeyId());
+    assertNotNull(slave2);
+    assertEquals(key2, slave2);
+    AuthenticationKey slave3 = KEY_SLAVE.getKey(key3.getKeyId());
+    assertNotNull(slave3);
+    assertEquals(key3, slave3);
+    slaveCurrent = KEY_SLAVE.getCurrentKey();
+    assertEquals(key3, slaveCurrent);
+    LOG.debug("Slave current key: "+slaveCurrent.getKeyId());
+
+    // verify that the expired key has been removed
+    assertNull(KEY_SLAVE.getKey(key1.getKeyId()));
+
+    // bring up a new slave
+    Configuration conf = TEST_UTIL.getConfiguration();
+    ZooKeeperWatcher zk = newZK(conf, "server3", new MockAbortable());
+    KEY_SLAVE2 = new AuthenticationTokenSecretManager(
+        conf, zk, "server3", 60*60*1000, 60*1000);
+    KEY_SLAVE2.start();
+
+    Thread.sleep(1000);
+    // verify the new slave has current keys (and not expired)
+    slave2 = KEY_SLAVE2.getKey(key2.getKeyId());
+    assertNotNull(slave2);
+    assertEquals(key2, slave2);
+    slave3 = KEY_SLAVE2.getKey(key3.getKeyId());
+    assertNotNull(slave3);
+    assertEquals(key3, slave3);
+    slaveCurrent = KEY_SLAVE2.getCurrentKey();
+    assertEquals(key3, slaveCurrent);
+    assertNull(KEY_SLAVE2.getKey(key1.getKeyId()));
+
+    // test leader failover
+    KEY_MASTER.stop();
+
+    // wait for master to stop
+    Thread.sleep(1000);
+    assertFalse(KEY_MASTER.isMaster());
+
+    // check for a new master
+    AuthenticationTokenSecretManager[] mgrs =
+        new AuthenticationTokenSecretManager[]{ KEY_SLAVE, KEY_SLAVE2 };
+    AuthenticationTokenSecretManager newMaster = null;
+    int tries = 0;
+    while (newMaster == null && tries++ < 5) {
+      for (AuthenticationTokenSecretManager mgr : mgrs) {
+        if (mgr.isMaster()) {
+          newMaster = mgr;
+          break;
+        }
+      }
+      if (newMaster == null) {
+        Thread.sleep(500);
+      }
+    }
+    assertNotNull(newMaster);
+
+    AuthenticationKey current = newMaster.getCurrentKey();
+    // new master will immediately roll the current key, so it's current may be greater
+    assertTrue(current.getKeyId() >= slaveCurrent.getKeyId());
+    LOG.debug("New master, current key: "+current.getKeyId());
+
+    // roll the current key again on new master and verify the key ID increments
+    newMaster.rollCurrentKey();
+    AuthenticationKey newCurrent = newMaster.getCurrentKey();
+    LOG.debug("New master, rolled new current key: "+newCurrent.getKeyId());
+    assertTrue(newCurrent.getKeyId() > current.getKeyId());
+
+    // add another slave
+    ZooKeeperWatcher zk3 = newZK(conf, "server4", new MockAbortable());
+    KEY_SLAVE3 = new AuthenticationTokenSecretManager(
+        conf, zk3, "server4", 60*60*1000, 60*1000);
+    KEY_SLAVE3.start();
+    Thread.sleep(5000);
+
+    // check master failover again
+    newMaster.stop();
+
+    // wait for master to stop
+    Thread.sleep(5000);
+    assertFalse(newMaster.isMaster());
+
+    // check for a new master
+    mgrs = new AuthenticationTokenSecretManager[]{ KEY_SLAVE, KEY_SLAVE2, KEY_SLAVE3 };
+    newMaster = null;
+    tries = 0;
+    while (newMaster == null && tries++ < 5) {
+      for (AuthenticationTokenSecretManager mgr : mgrs) {
+        if (mgr.isMaster()) {
+          newMaster = mgr;
+          break;
+        }
+      }
+      if (newMaster == null) {
+        Thread.sleep(500);
+      }
+    }
+    assertNotNull(newMaster);
+
+    AuthenticationKey current2 = newMaster.getCurrentKey();
+    // new master will immediately roll the current key, so it's current may be greater
+    assertTrue(current2.getKeyId() >= newCurrent.getKeyId());
+    LOG.debug("New master 2, current key: "+current2.getKeyId());
+
+    // roll the current key again on new master and verify the key ID increments
+    newMaster.rollCurrentKey();
+    AuthenticationKey newCurrent2 = newMaster.getCurrentKey();
+    LOG.debug("New master 2, rolled new current key: "+newCurrent2.getKeyId());
+    assertTrue(newCurrent2.getKeyId() > current2.getKeyId());
+  }
+
+  private static ZooKeeperWatcher newZK(Configuration conf, String name,
+      Abortable abort) throws Exception {
+    Configuration copy = HBaseConfiguration.create(conf);
+    ZooKeeperWatcher zk = new ZooKeeperWatcher(copy, name, abort);
+    return zk;
+  }
+}



Mime
View raw message