hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1203512 [1/4] - in /hbase/branches/0.92: ./ conf/ security/ security/src/ security/src/main/ security/src/main/java/ security/src/main/java/org/ security/src/main/java/org/apache/ security/src/main/java/org/apache/hadoop/ security/src/main...
Date Fri, 18 Nov 2011 07:13:05 GMT
Author: garyh
Date: Fri Nov 18 07:13:03 2011
New Revision: 1203512

URL: http://svn.apache.org/viewvc?rev=1203512&view=rev
Log:
HBASE-2742  Provide strong authentication with a secure RPC engine

Added:
    hbase/branches/0.92/conf/hbase-policy.xml
    hbase/branches/0.92/security/
    hbase/branches/0.92/security/src/
    hbase/branches/0.92/security/src/main/
    hbase/branches/0.92/security/src/main/java/
    hbase/branches/0.92/security/src/main/java/org/
    hbase/branches/0.92/security/src/main/java/org/apache/
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureClient.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureConnectionHeader.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
    hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java
    hbase/branches/0.92/security/src/test/
    hbase/branches/0.92/security/src/test/java/
    hbase/branches/0.92/security/src/test/java/org/
    hbase/branches/0.92/security/src/test/java/org/apache/
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
    hbase/branches/0.92/security/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java
    hbase/branches/0.92/security/src/test/resources/
    hbase/branches/0.92/security/src/test/resources/hbase-site.xml
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java
Modified:
    hbase/branches/0.92/CHANGES.txt
    hbase/branches/0.92/pom.xml
    hbase/branches/0.92/src/assembly/all.xml
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/HServerAddress.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/security/User.java
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/branches/0.92/src/main/resources/hbase-default.xml
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java

Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1203512&r1=1203511&r2=1203512&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Fri Nov 18 07:13:03 2011
@@ -798,6 +798,7 @@ Release 0.92.0 - Unreleased
                (Jonathan Hsieh)
    HBASE-4806  Fix logging message in HbaseObjectWritable
                (Jonathan Hsieh via todd)
+   HBASE-2742  Provide strong authentication with a secure RPC engine         
 
 Release 0.90.5 - Unreleased
 

Added: hbase/branches/0.92/conf/hbase-policy.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/conf/hbase-policy.xml?rev=1203512&view=auto
==============================================================================
--- hbase/branches/0.92/conf/hbase-policy.xml (added)
+++ hbase/branches/0.92/conf/hbase-policy.xml Fri Nov 18 07:13:03 2011
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+
+<configuration>
+  <property>
+    <name>security.client.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for HRegionInterface protocol implementations (ie. 
+    clients talking to HRegionServers)
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.admin.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for HMasterInterface protocol implementation (ie. 
+    clients talking to HMaster for admin operations).
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.masterregion.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for HMasterRegionInterface protocol implementations
+    (for HRegionServers communicating with HMaster)
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+</configuration>

Modified: hbase/branches/0.92/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/pom.xml?rev=1203512&r1=1203511&r2=1203512&view=diff
==============================================================================
--- hbase/branches/0.92/pom.xml (original)
+++ hbase/branches/0.92/pom.xml Fri Nov 18 07:13:03 2011
@@ -244,6 +244,7 @@
         <enabled>false</enabled>
       </snapshots>
     </repository>
+
   </repositories>
 
   <build>
@@ -372,6 +373,14 @@
         </includes>
       </resource>
     </resources>
+    <testResources>
+      <testResource>
+        <directory>src/test/resources</directory>
+        <excludes>
+          <exclude>hbase-site.xml</exclude>
+        </excludes>
+      </testResource>
+    </testResources>
 
     <plugins>
       <plugin>
@@ -660,12 +669,12 @@
               <target>
                 <!-- Complements the assembly -->
 
-                <mkdir dir="${project.build.directory}/${project.artifactId}-${project.version}/${project.artifactId}-${project.version}/lib/native/${build.platform}"/>
+                <mkdir dir="${project.build.directory}/${project.build.finalName}/${project.build.finalName}/lib/native/${build.platform}"/>
 
                 <!-- Using Unix cp to preserve symlinks, using script to handle wildcards -->
                 <echo file="${project.build.directory}/copynativelibs.sh">
                     if [ `ls ${project.build.directory}/nativelib | wc -l` -ne 0 ]; then
-                      cp -PR ${project.build.directory}/nativelib/lib* ${project.build.directory}/${project.artifactId}-${project.version}/${project.artifactId}-${project.version}/lib/native/${build.platform}
+                      cp -PR ${project.build.directory}/nativelib/lib* ${project.build.directory}/${project.build.finalName}/${project.build.finalName}/lib/native/${build.platform}
                     fi
                 </echo>
                 <exec executable="sh" dir="${project.build.directory}" failonerror="true">
@@ -674,9 +683,9 @@
 
                 <!-- Using Unix tar to preserve symlinks -->
                 <exec executable="tar" failonerror="yes"
-                  dir="${project.build.directory}/${project.artifactId}-${project.version}">
+                  dir="${project.build.directory}/${project.build.finalName}">
                     <arg value="czf"/>
-                    <arg value="${project.build.directory}/${project.artifactId}-${project.version}.tar.gz"/>
+                    <arg value="${project.build.directory}/${project.build.finalName}.tar.gz"/>
                     <arg value="."/>
                 </exec>
 
@@ -1292,6 +1301,32 @@
           <scope>test</scope>
         </dependency>
       </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-test-resource</id>
+                <goals>
+                  <goal>add-test-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      <directory>src/test/resources</directory>
+                      <includes>
+                        <include>hbase-site.xml</include>
+                      </includes>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
     
     
@@ -1335,6 +1370,61 @@
       </properties>
     </profile>     
 
+    <!-- profile for building against Hadoop 0.20+security-->
+    <profile>
+      <id>security</id>
+      <build>
+        <finalName>${artifactId}-${version}-security</finalName>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/security/src/main/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-test-source</id>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/security/src/test/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-test-resource</id>
+                <goals>
+                  <goal>add-test-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      <directory>${project.basedir}/security/src/test/resources</directory>
+                      <includes>
+                        <include>hbase-site.xml</include>
+                      </includes>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+
     <!--
       profile for building against Hadoop 0.22.0. Activate using:
        mvn -Dhadoop.profile=22
@@ -1479,6 +1569,32 @@
           <scope>test</scope>
         </dependency>
       </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-test-resource</id>
+                <goals>
+                  <goal>add-test-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      <directory>src/test/resources</directory>
+                      <includes>
+                        <include>hbase-site.xml</include>
+                      </includes>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <!--
@@ -1635,6 +1751,32 @@
           <scope>test</scope>
         </dependency>
       </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-test-resource</id>
+                <goals>
+                  <goal>add-test-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      <directory>src/test/resources</directory>
+                      <includes>
+                        <include>hbase-site.xml</include>
+                      </includes>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
   </profiles>
  

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureClient.java?rev=1203512&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureClient.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureClient.java Fri Nov 18 07:13:03 2011
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
+import org.apache.hadoop.hbase.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import javax.net.SocketFactory;
+import java.io.*;
+import java.net.*;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A client for an IPC service, which support SASL authentication of connections
+ * using either GSSAPI for Kerberos authentication or DIGEST-MD5 for
+ * authentication using signed tokens.
+ *
+ * <p>
+ * This is a copy of org.apache.hadoop.ipc.Client from secure Hadoop,
+ * reworked to remove code duplicated with
+ * {@link org.apache.hadoop.hbase.HBaseClient}.  This is part of the loadable
+ * {@link SecureRpcEngine}, and only functions in connection with a
+ * {@link SecureServer} instance.
+ * </p>
+ */
+public class SecureClient extends HBaseClient {
+
+  private static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.SecureClient");
+
+  protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
+      new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
+  static {
+    tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
+        new AuthenticationTokenSelector());
+  }
+
+  /** Thread that reads responses and notifies callers.  Each connection owns a
+   * socket connected to a remote address.  Calls are multiplexed through this
+   * socket: responses may be delivered out of order. */
+  protected class SecureConnection extends Connection {
+    private InetSocketAddress server;             // server ip:port
+    private String serverPrincipal;  // server's krb5 principal name
+    private SecureConnectionHeader header;              // connection header
+    private AuthMethod authMethod; // authentication method
+    private boolean useSasl;
+    private Token<? extends TokenIdentifier> token;
+    private HBaseSaslRpcClient saslRpcClient;
+    private int reloginMaxBackoff; // max pause before relogin on sasl failure
+
+    public SecureConnection(ConnectionId remoteId) throws IOException {
+      super(remoteId);
+      this.server = remoteId.getAddress();
+
+      User ticket = remoteId.getTicket();
+      Class<?> protocol = remoteId.getProtocol();
+      this.useSasl = User.isSecurityEnabled();
+      if (useSasl && protocol != null) {
+        TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class);
+        if (tokenInfo != null) {
+          TokenSelector<? extends TokenIdentifier> tokenSelector =
+              tokenHandlers.get(tokenInfo.value());
+          if (tokenSelector != null) {
+            token = tokenSelector.selectToken(new Text(clusterId),
+                ticket.getUGI().getTokens());
+          } else if (LOG.isDebugEnabled()) {
+            LOG.debug("No token selector found for type "+tokenInfo.value());
+          }
+        }
+        KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
+        if (krbInfo != null) {
+          String serverKey = krbInfo.serverPrincipal();
+          if (serverKey == null) {
+            throw new IOException(
+                "Can't obtain server Kerberos config key from KerberosInfo");
+          }
+          serverPrincipal = SecurityUtil.getServerPrincipal(
+              conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("RPC Server Kerberos principal name for protocol="
+                + protocol.getCanonicalName() + " is " + serverPrincipal);
+          }
+        }
+      }
+
+      if (!useSasl) {
+        authMethod = AuthMethod.SIMPLE;
+      } else if (token != null) {
+        authMethod = AuthMethod.DIGEST;
+      } else {
+        authMethod = AuthMethod.KERBEROS;
+      }
+
+      header = new SecureConnectionHeader(
+          protocol == null ? null : protocol.getName(), ticket, authMethod);
+
+      if (LOG.isDebugEnabled())
+        LOG.debug("Use " + authMethod + " authentication for protocol "
+            + protocol.getSimpleName());
+
+      reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
+    }
+
+    private synchronized void disposeSasl() {
+      if (saslRpcClient != null) {
+        try {
+          saslRpcClient.dispose();
+          saslRpcClient = null;
+        } catch (IOException ioe) {
+          LOG.info("Error disposing of SASL client", ioe);
+        }
+      }
+    }
+
+    private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
+      UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+      UserGroupInformation currentUser =
+        UserGroupInformation.getCurrentUser();
+      UserGroupInformation realUser = currentUser.getRealUser();
+      return authMethod == AuthMethod.KERBEROS &&
+          loginUser != null &&
+          //Make sure user logged in using Kerberos either keytab or TGT
+          loginUser.hasKerberosCredentials() &&
+          // relogin only in case it is the login user (e.g. JT)
+          // or superuser (like oozie).
+          (loginUser.equals(currentUser) || loginUser.equals(realUser));
+    }
+
+    private synchronized boolean setupSaslConnection(final InputStream in2,
+        final OutputStream out2)
+        throws IOException {
+      saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal);
+      return saslRpcClient.saslConnect(in2, out2);
+    }
+
+    /**
+     * If multiple clients with the same principal try to connect
+     * to the same server at the same time, the server assumes a
+     * replay attack is in progress. This is a feature of kerberos.
+     * In order to work around this, what is done is that the client
+     * backs off randomly and tries to initiate the connection
+     * again.
+     * The other problem is to do with ticket expiry. To handle that,
+     * a relogin is attempted.
+     */
+    private synchronized void handleSaslConnectionFailure(
+        final int currRetries,
+        final int maxRetries, final Exception ex, final Random rand,
+        final User user)
+    throws IOException, InterruptedException{
+      user.runAs(new PrivilegedExceptionAction<Object>() {
+        public Object run() throws IOException, InterruptedException {
+          closeConnection();
+          if (shouldAuthenticateOverKrb()) {
+            if (currRetries < maxRetries) {
+              LOG.debug("Exception encountered while connecting to " +
+                  "the server : " + ex);
+              //try re-login
+              if (UserGroupInformation.isLoginKeytabBased()) {
+                UserGroupInformation.getLoginUser().reloginFromKeytab();
+              } else {
+                UserGroupInformation.getLoginUser().reloginFromTicketCache();
+              }
+              disposeSasl();
+              //have granularity of milliseconds
+              //we are sleeping with the Connection lock held but since this
+              //connection instance is being used for connecting to the server
+              //in question, it is okay
+              Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
+              return null;
+            } else {
+              String msg = "Couldn't setup connection for " +
+              UserGroupInformation.getLoginUser().getUserName() +
+              " to " + serverPrincipal;
+              LOG.warn(msg);
+              throw (IOException) new IOException(msg).initCause(ex);
+            }
+          } else {
+            LOG.warn("Exception encountered while connecting to " +
+                "the server : " + ex);
+          }
+          if (ex instanceof RemoteException)
+            throw (RemoteException)ex;
+          throw new IOException(ex);
+        }
+      });
+    }
+
+    @Override
+    protected synchronized void setupIOstreams()
+        throws IOException, InterruptedException {
+      if (socket != null || shouldCloseConnection.get()) {
+        return;
+      }
+
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Connecting to "+server);
+        }
+        short numRetries = 0;
+        final short MAX_RETRIES = 5;
+        Random rand = null;
+        while (true) {
+          setupConnection();
+          InputStream inStream = NetUtils.getInputStream(socket);
+          OutputStream outStream = NetUtils.getOutputStream(socket);
+          writeRpcHeader(outStream);
+          if (useSasl) {
+            final InputStream in2 = inStream;
+            final OutputStream out2 = outStream;
+            User ticket = remoteId.getTicket();
+            if (authMethod == AuthMethod.KERBEROS) {
+              UserGroupInformation ugi = ticket.getUGI();
+              if (ugi != null && ugi.getRealUser() != null) {
+                ticket = User.create(ugi.getRealUser());
+              }
+            }
+            boolean continueSasl = false;
+            try {
+              continueSasl =
+                ticket.runAs(new PrivilegedExceptionAction<Boolean>() {
+                  @Override
+                  public Boolean run() throws IOException {
+                    return setupSaslConnection(in2, out2);
+                  }
+                });
+            } catch (Exception ex) {
+              if (rand == null) {
+                rand = new Random();
+              }
+              handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
+                   ticket);
+              continue;
+            }
+            if (continueSasl) {
+              // Sasl connect is successful. Let's set up Sasl i/o streams.
+              inStream = saslRpcClient.getInputStream(inStream);
+              outStream = saslRpcClient.getOutputStream(outStream);
+            } else {
+              // fall back to simple auth because server told us so.
+              authMethod = AuthMethod.SIMPLE;
+              header = new SecureConnectionHeader(header.getProtocol(),
+                  header.getUser(), authMethod);
+              useSasl = false;
+            }
+          }
+          this.in = new DataInputStream(new BufferedInputStream
+              (new PingInputStream(inStream)));
+          this.out = new DataOutputStream
+          (new BufferedOutputStream(outStream));
+          writeHeader();
+
+          // update last activity time
+          touch();
+
+          // start the receiver thread after the socket connection has been set up
+          start();
+          return;
+        }
+      } catch (IOException e) {
+        markClosed(e);
+        close();
+
+        throw e;
+      }
+    }
+
+    /* Write the RPC header */
+    private void writeRpcHeader(OutputStream outStream) throws IOException {
+      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
+      // Write out the header, version and authentication method
+      out.write(SecureServer.HEADER.array());
+      out.write(SecureServer.CURRENT_VERSION);
+      authMethod.write(out);
+      out.flush();
+    }
+
+    /**
+     * Write the protocol header for each connection
+     * Out is not synchronized because only the first thread does this.
+     */
+    private void writeHeader() throws IOException {
+      // Write out the ConnectionHeader
+      DataOutputBuffer buf = new DataOutputBuffer();
+      header.write(buf);
+
+      // Write out the payload length
+      int bufLen = buf.getLength();
+      out.writeInt(bufLen);
+      out.write(buf.getData(), 0, bufLen);
+    }
+
+    @Override
+    protected void receiveResponse() {
+      if (shouldCloseConnection.get()) {
+        return;
+      }
+      touch();
+
+      try {
+        int id = in.readInt();                    // try to read an id
+
+        if (LOG.isDebugEnabled())
+          LOG.debug(getName() + " got value #" + id);
+
+        Call call = calls.remove(id);
+
+        int state = in.readInt();     // read call status
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("call #"+id+" state is " + state);
+        }
+        if (state == Status.SUCCESS.state) {
+          Writable value = ReflectionUtils.newInstance(valueClass, conf);
+          value.readFields(in);                 // read value
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("call #"+id+", response is:\n"+value.toString());
+          }
+          call.setValue(value);
+        } else if (state == Status.ERROR.state) {
+          call.setException(new RemoteException(WritableUtils.readString(in),
+                                                WritableUtils.readString(in)));
+        } else if (state == Status.FATAL.state) {
+          // Close the connection
+          markClosed(new RemoteException(WritableUtils.readString(in),
+                                         WritableUtils.readString(in)));
+        }
+      } catch (IOException e) {
+        if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
+          // Clean up open calls but don't treat this as a fatal condition,
+          // since we expect certain responses to not make it by the specified
+          // {@link ConnectionId#rpcTimeout}.
+          closeException = e;
+        } else {
+          // Since the server did not respond within the default ping interval
+          // time, treat this as a fatal condition and close this connection
+          markClosed(e);
+        }
+      } finally {
+        if (remoteId.rpcTimeout > 0) {
+          cleanupCalls(remoteId.rpcTimeout);
+        }
+      }
+    }
+
+    /** Close the connection. */
+    protected synchronized void close() {
+      if (!shouldCloseConnection.get()) {
+        LOG.error("The connection is not in the closed state");
+        return;
+      }
+
+      // release the resources
+      // first thing to do;take the connection out of the connection list
+      synchronized (connections) {
+        if (connections.get(remoteId) == this) {
+          connections.remove(remoteId);
+        }
+      }
+
+      // close the streams and therefore the socket
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(in);
+      disposeSasl();
+
+      // clean up all calls
+      if (closeException == null) {
+        if (!calls.isEmpty()) {
+          LOG.warn(
+              "A connection is closed for no cause and calls are not empty");
+
+          // clean up calls anyway
+          closeException = new IOException("Unexpected closed connection");
+          cleanupCalls();
+        }
+      } else {
+        // log the info
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing ipc connection to " + server + ": " +
+              closeException.getMessage(),closeException);
+        }
+
+        // cleanup calls
+        cleanupCalls();
+      }
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": closed");
+    }
+  }
+
+  /**
+   * Construct an IPC client whose values are of the given {@link org.apache.hadoop.io.Writable}
+   * class.
+   * @param valueClass value class
+   * @param conf configuration
+   * @param factory socket factory
+   */
+  public SecureClient(Class<? extends Writable> valueClass, Configuration conf,
+      SocketFactory factory) {
+    super(valueClass, conf, factory);
+  }
+
+  /**
+   * Construct an IPC client with the default SocketFactory
+   * @param valueClass value class
+   * @param conf configuration
+   */
+  public SecureClient(Class<? extends Writable> valueClass, Configuration conf) {
+    this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
+  }
+
+  @Override
+  protected SecureConnection getConnection(InetSocketAddress addr,
+                                   Class<? extends VersionedProtocol> protocol,
+                                   User ticket,
+                                   int rpcTimeout,
+                                   Call call)
+                                   throws IOException, InterruptedException {
+    if (!running.get()) {
+      // the client is stopped
+      throw new IOException("The client is stopped");
+    }
+    SecureConnection connection;
+    /* we could avoid this allocation for each RPC by having a
+     * connectionsId object and with set() method. We need to manage the
+     * refs for keys in HashMap properly. For now its ok.
+     */
+    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
+    do {
+      synchronized (connections) {
+        connection = (SecureConnection)connections.get(remoteId);
+        if (connection == null) {
+          connection = new SecureConnection(remoteId);
+          connections.put(remoteId, connection);
+        }
+      }
+    } while (!connection.addCall(call));
+
+    //we don't invoke the method below inside "synchronized (connections)"
+    //block above. The reason for that is if the server happens to be slow,
+    //it will take longer to establish a connection and that will slow the
+    //entire system down.
+    connection.setupIOstreams();
+    return connection;
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureConnectionHeader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureConnectionHeader.java?rev=1203512&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureConnectionHeader.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureConnectionHeader.java Fri Nov 18 07:13:03 2011
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * The IPC connection header sent by the client to the server
+ * on connection establishment.  Part of the {@link SecureRpcEngine}
+ * implementation.
+ */
+class SecureConnectionHeader extends ConnectionHeader {
+  private User user = null;
+  private AuthMethod authMethod;
+
+  public SecureConnectionHeader() {}
+
+  /**
+   * Create a new {@link org.apache.hadoop.hbase.ipc.SecureConnectionHeader} with the given <code>protocol</code>
+   * and {@link org.apache.hadoop.security.UserGroupInformation}.
+   * @param protocol protocol used for communication between the IPC client
+   *                 and the server
+   * @param ugi {@link org.apache.hadoop.security.UserGroupInformation} of the client communicating with
+   *            the server
+   */
+  public SecureConnectionHeader(String protocol, User user, AuthMethod authMethod) {
+    this.protocol = protocol;
+    this.user = user;
+    this.authMethod = authMethod;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    protocol = Text.readString(in);
+    if (protocol.isEmpty()) {
+      protocol = null;
+    }
+    boolean ugiUsernamePresent = in.readBoolean();
+    if (ugiUsernamePresent) {
+      String username = in.readUTF();
+      boolean realUserNamePresent = in.readBoolean();
+      if (realUserNamePresent) {
+        String realUserName = in.readUTF();
+        UserGroupInformation realUserUgi =
+            UserGroupInformation.createRemoteUser(realUserName);
+        user = User.create(
+            UserGroupInformation.createProxyUser(username, realUserUgi));
+      } else {
+        user = User.create(UserGroupInformation.createRemoteUser(username));
+      }
+    } else {
+      user = null;
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, (protocol == null) ? "" : protocol);
+    if (user != null) {
+      UserGroupInformation ugi = user.getUGI();
+      if (authMethod == AuthMethod.KERBEROS) {
+        // Send effective user for Kerberos auth
+        out.writeBoolean(true);
+        out.writeUTF(ugi.getUserName());
+        out.writeBoolean(false);
+      } else if (authMethod == AuthMethod.DIGEST) {
+        // Don't send user for token auth
+        out.writeBoolean(false);
+      } else {
+        //Send both effective user and real user for simple auth
+        out.writeBoolean(true);
+        out.writeUTF(ugi.getUserName());
+        if (ugi.getRealUser() != null) {
+          out.writeBoolean(true);
+          out.writeUTF(ugi.getRealUser().getUserName());
+        } else {
+          out.writeBoolean(false);
+        }
+      }
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public User getUser() {
+    return user;
+  }
+
+  public String toString() {
+    return protocol + "-" + user;
+  }
+}

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java?rev=1203512&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java Fri Nov 18 07:13:03 2011
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.security.HBasePolicyProvider;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
+import org.apache.hadoop.hbase.util.Objects;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+
+import javax.net.SocketFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.*;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A loadable RPC engine supporting SASL authentication of connections, using
+ * GSSAPI for Kerberos authentication or DIGEST-MD5 for authentication via
+ * signed tokens.
+ *
+ * <p>
+ * This is a fork of the {@code org.apache.hadoop.ipc.WriteableRpcEngine} from
+ * secure Hadoop, reworked to eliminate code duplication with the existing
+ * HBase {@link WritableRpcEngine}.
+ * </p>
+ *
+ * @see SecureClient
+ * @see SecureServer
+ */
+public class SecureRpcEngine implements RpcEngine {
+  // Leave this out in the hadoop ipc package but keep class name.  Do this
+  // so that we dont' get the logging of this class's invocations by doing our
+  // blanket enabling DEBUG on the o.a.h.h. package.
+  protected static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.SecureRpcEngine");
+
+  private SecureRpcEngine() {
+    super();
+  }                                  // no public ctor
+
+  /* Cache a client using its socket factory as the hash key */
+  static private class ClientCache {
+    private Map<SocketFactory, SecureClient> clients =
+      new HashMap<SocketFactory, SecureClient>();
+
+    protected ClientCache() {}
+
+    /**
+     * Construct & cache an IPC client with the user-provided SocketFactory
+     * if no cached client exists.
+     *
+     * @param conf Configuration
+     * @param factory socket factory
+     * @return an IPC client
+     */
+    protected synchronized SecureClient getClient(Configuration conf,
+        SocketFactory factory) {
+      // Construct & cache client.  The configuration is only used for timeout,
+      // and Clients have connection pools.  So we can either (a) lose some
+      // connection pooling and leak sockets, or (b) use the same timeout for all
+      // configurations.  Since the IPC is usually intended globally, not
+      // per-job, we choose (a).
+      SecureClient client = clients.get(factory);
+      if (client == null) {
+        // Make an hbase client instead of hadoop Client.
+        client = new SecureClient(HbaseObjectWritable.class, conf, factory);
+        clients.put(factory, client);
+      } else {
+        client.incCount();
+      }
+      return client;
+    }
+
+    /**
+     * Construct & cache an IPC client with the default SocketFactory
+     * if no cached client exists.
+     *
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    protected synchronized SecureClient getClient(Configuration conf) {
+      return getClient(conf, SocketFactory.getDefault());
+    }
+
+    /**
+     * Stop a RPC client connection
+     * A RPC client is closed only when its reference count becomes zero.
+     * @param client client to stop
+     */
+    protected void stopClient(SecureClient client) {
+      synchronized (this) {
+        client.decCount();
+        if (client.isZeroReference()) {
+          clients.remove(client.getSocketFactory());
+        }
+      }
+      if (client.isZeroReference()) {
+        client.stop();
+      }
+    }
+  }
+
+  protected final static ClientCache CLIENTS = new ClientCache();
+
+  private static class Invoker implements InvocationHandler {
+    private Class<? extends VersionedProtocol> protocol;
+    private InetSocketAddress address;
+    private User ticket;
+    private SecureClient client;
+    private boolean isClosed = false;
+    final private int rpcTimeout;
+
+    public Invoker(Class<? extends VersionedProtocol> protocol,
+        InetSocketAddress address, User ticket,
+        Configuration conf, SocketFactory factory, int rpcTimeout) {
+      this.protocol = protocol;
+      this.address = address;
+      this.ticket = ticket;
+      this.client = CLIENTS.getClient(conf, factory);
+      this.rpcTimeout = rpcTimeout;
+    }
+
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      final boolean logDebug = LOG.isDebugEnabled();
+      long startTime = 0;
+      if (logDebug) {
+        startTime = System.currentTimeMillis();
+      }
+      HbaseObjectWritable value = (HbaseObjectWritable)
+        client.call(new Invocation(method, args), address,
+                    protocol, ticket, rpcTimeout);
+      if (logDebug) {
+        long callTime = System.currentTimeMillis() - startTime;
+        LOG.debug("Call: " + method.getName() + " " + callTime);
+      }
+      return value.get();
+    }
+
+    /* close the IPC client that's responsible for this invoker's RPCs */
+    synchronized protected void close() {
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
+      }
+    }
+  }
+
+  /**
+   * Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address.
+   *
+   * @param protocol interface
+   * @param clientVersion version we are expecting
+   * @param addr remote address
+   * @param ticket ticket
+   * @param conf configuration
+   * @param factory socket factory
+   * @return proxy
+   * @throws java.io.IOException e
+   */
+  public VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, User ticket,
+      Configuration conf, SocketFactory factory, int rpcTimeout)
+  throws IOException {
+    if (User.isSecurityEnabled()) {
+      HBaseSaslRpcServer.init(conf);
+    }
+    VersionedProtocol proxy =
+        (VersionedProtocol) Proxy.newProxyInstance(
+            protocol.getClassLoader(), new Class[] { protocol },
+            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
+    long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+                                                  clientVersion);
+    if (serverVersion != clientVersion) {
+      throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
+                                serverVersion);
+    }
+    return proxy;
+  }
+
+  /**
+   * Stop this proxy and release its invoker's resource
+   * @param proxy the proxy to be stopped
+   */
+  public void stopProxy(VersionedProtocol proxy) {
+    if (proxy!=null) {
+      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+    }
+  }
+
+
+  /** Expert: Make multiple, parallel calls to a set of servers. */
+  public Object[] call(Method method, Object[][] params,
+                       InetSocketAddress[] addrs,
+                       Class<? extends VersionedProtocol> protocol,
+                       User ticket, Configuration conf)
+    throws IOException, InterruptedException {
+
+    Invocation[] invocations = new Invocation[params.length];
+    for (int i = 0; i < params.length; i++)
+      invocations[i] = new Invocation(method, params[i]);
+    SecureClient client = CLIENTS.getClient(conf);
+    try {
+      Writable[] wrappedValues =
+        client.call(invocations, addrs, protocol, ticket);
+
+      if (method.getReturnType() == Void.TYPE) {
+        return null;
+      }
+
+      Object[] values =
+          (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
+      for (int i = 0; i < values.length; i++)
+        if (wrappedValues[i] != null)
+          values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
+
+      return values;
+    } finally {
+      CLIENTS.stopClient(client);
+    }
+  }
+
+  /** Construct a server for a protocol implementation instance listening on a
+   * port and address, with a secret manager. */
+  public Server getServer(Class<? extends VersionedProtocol> protocol,
+      final Object instance,
+      Class<?>[] ifaces,
+      final String bindAddress, final int port,
+      final int numHandlers,
+      int metaHandlerCount, final boolean verbose, Configuration conf,
+       int highPriorityLevel)
+    throws IOException {
+    Server server = new Server(instance, ifaces, conf, bindAddress, port,
+            numHandlers, metaHandlerCount, verbose,
+            highPriorityLevel);
+    return server;
+  }
+
+  /** An RPC Server. */
+  public static class Server extends SecureServer {
+    private Object instance;
+    private Class<?> implementation;
+    private Class<?>[] ifaces;
+    private boolean verbose;
+
+    private static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * @throws java.io.IOException e
+     */
+    public Server(Object instance, final Class<?>[] ifaces,
+                  Configuration conf, String bindAddress,  int port,
+                  int numHandlers, int metaHandlerCount, boolean verbose,
+                  int highPriorityLevel)
+        throws IOException {
+      super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, conf,
+          classNameBase(instance.getClass().getName()), highPriorityLevel);
+      this.instance = instance;
+      this.implementation = instance.getClass();
+      this.verbose = verbose;
+
+      this.ifaces = ifaces;
+
+      // create metrics for the advertised interfaces this server implements.
+      this.rpcMetrics.createMetrics(this.ifaces);
+    }
+
+    public AuthenticationTokenSecretManager createSecretManager(){
+      if (instance instanceof org.apache.hadoop.hbase.Server) {
+        org.apache.hadoop.hbase.Server server =
+            (org.apache.hadoop.hbase.Server)instance;
+        Configuration conf = server.getConfiguration();
+        long keyUpdateInterval =
+            conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
+        long maxAge =
+            conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
+        return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
+            server.getServerName().toString(), keyUpdateInterval, maxAge);
+      }
+      return null;
+    }
+
+    @Override
+    public void startThreads() {
+      AuthenticationTokenSecretManager mgr = createSecretManager();
+      if (mgr != null) {
+        setSecretManager(mgr);
+        mgr.start();
+      }
+      this.authManager = new ServiceAuthorizationManager();
+      HBasePolicyProvider.init(conf, authManager);
+
+      // continue with base startup
+      super.startThreads();
+    }
+
+    @Override
+    public Writable call(Class<? extends VersionedProtocol> protocol,
+        Writable param, long receivedTime, MonitoredRPCHandler status)
+    throws IOException {
+      try {
+        Invocation call = (Invocation)param;
+        if(call.getMethodName() == null) {
+          throw new IOException("Could not find requested method, the usual " +
+              "cause is a version mismatch between client and server.");
+        }
+        if (verbose) log("Call: " + call);
+
+        Method method =
+          protocol.getMethod(call.getMethodName(),
+                                   call.getParameterClasses());
+        method.setAccessible(true);
+
+        Object impl = null;
+        if (protocol.isAssignableFrom(this.implementation)) {
+          impl = this.instance;
+        }
+        else {
+          throw new HBaseRPC.UnknownProtocolException(protocol);
+        }
+
+        long startTime = System.currentTimeMillis();
+        Object[] params = call.getParameters();
+        Object value = method.invoke(impl, params);
+        int processingTime = (int) (System.currentTimeMillis() - startTime);
+        int qTime = (int) (startTime-receivedTime);
+        if (TRACELOG.isDebugEnabled()) {
+          TRACELOG.debug("Call #" + CurCall.get().id +
+              "; Served: " + protocol.getSimpleName()+"#"+call.getMethodName() +
+              " queueTime=" + qTime +
+              " processingTime=" + processingTime +
+              " contents=" + Objects.describeQuantity(params));
+        }
+        rpcMetrics.rpcQueueTime.inc(qTime);
+        rpcMetrics.rpcProcessingTime.inc(processingTime);
+        rpcMetrics.inc(call.getMethodName(), processingTime);
+        if (verbose) log("Return: "+value);
+
+        return new HbaseObjectWritable(method.getReturnType(), value);
+      } catch (InvocationTargetException e) {
+        Throwable target = e.getTargetException();
+        if (target instanceof IOException) {
+          throw (IOException)target;
+        }
+        IOException ioe = new IOException(target.toString());
+        ioe.setStackTrace(target.getStackTrace());
+        throw ioe;
+      } catch (Throwable e) {
+        if (!(e instanceof IOException)) {
+          LOG.error("Unexpected throwable object ", e);
+        }
+        IOException ioe = new IOException(e.toString());
+        ioe.setStackTrace(e.getStackTrace());
+        throw ioe;
+      }
+    }
+  }
+
+  protected static void log(String value) {
+    String v = value;
+    if (v != null && v.length() > 55)
+      v = v.substring(0, 55)+"...";
+    LOG.info(v);
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java?rev=1203512&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java Fri Nov 18 07:13:03 2011
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.*;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
+
+/**
+ * An abstract IPC service, supporting SASL authentication of connections,
+ * using GSSAPI for Kerberos authentication or DIGEST-MD5 for authentication
+ * via signed tokens.
+ *
+ * <p>
+ * This is part of the {@link SecureRpcEngine} implementation.
+ * </p>
+ *
+ * @see org.apache.hadoop.hbase.ipc.SecureClient
+ */
+public abstract class SecureServer extends HBaseServer {
+  private final boolean authorize;
+  private boolean isSecurityEnabled;
+
+  /**
+   * The first four bytes of secure RPC connections
+   */
+  public static final ByteBuffer HEADER = ByteBuffer.wrap("srpc".getBytes());
+
+  // 1 : Introduce ping and server does not throw away RPCs
+  // 3 : Introduce the protocol into the RPC connection header
+  // 4 : Introduced SASL security layer
+  public static final byte CURRENT_VERSION = 4;
+
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.SecureServer");
+  private static final Log AUDITLOG =
+    LogFactory.getLog("SecurityLogger.org.apache.hadoop.ipc.SecureServer");
+  private static final String AUTH_FAILED_FOR = "Auth failed for ";
+  private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
+
+  protected SecretManager<TokenIdentifier> secretManager;
+  protected ServiceAuthorizationManager authManager;
+
+  protected class SecureCall extends HBaseServer.Call {
+    public SecureCall(int id, Writable param, Connection connection,
+        Responder responder) {
+      super(id, param, connection, responder);
+    }
+
+    @Override
+    protected synchronized void setResponse(Object value, Status status,
+        String errorClass, String error) {
+      Writable result = null;
+      if (value instanceof Writable) {
+        result = (Writable) value;
+      } else {
+        /* We might have a null value and errors. Avoid creating a
+         * HbaseObjectWritable, because the constructor fails on null. */
+        if (value != null) {
+          result = new HbaseObjectWritable(value);
+        }
+      }
+
+      int size = BUFFER_INITIAL_SIZE;
+      if (result instanceof WritableWithSize) {
+        // get the size hint.
+        WritableWithSize ohint = (WritableWithSize) result;
+        long hint = ohint.getWritableSize() + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
+        if (hint > Integer.MAX_VALUE) {
+          // oops, new problem.
+          IOException ioe =
+            new IOException("Result buffer size too large: " + hint);
+          errorClass = ioe.getClass().getName();
+          error = StringUtils.stringifyException(ioe);
+        } else {
+          size = (int)hint;
+        }
+      }
+
+      ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
+      DataOutputStream out = new DataOutputStream(buf);
+      try {
+        out.writeInt(this.id);                // write call id
+        out.writeInt(status.state);           // write status
+      } catch (IOException e) {
+        errorClass = e.getClass().getName();
+        error = StringUtils.stringifyException(e);
+      }
+
+      try {
+        if (status == Status.SUCCESS) {
+          result.write(out);
+        } else {
+          WritableUtils.writeString(out, errorClass);
+          WritableUtils.writeString(out, error);
+        }
+        if (((SecureConnection)connection).useWrap) {
+          wrapWithSasl(buf);
+        }
+      } catch (IOException e) {
+        LOG.warn("Error sending response to call: ", e);
+      }
+
+      this.response = buf.getByteBuffer();
+    }
+
+    private void wrapWithSasl(ByteBufferOutputStream response)
+        throws IOException {
+      if (((SecureConnection)connection).useSasl) {
+        // getByteBuffer calls flip()
+        ByteBuffer buf = response.getByteBuffer();
+        byte[] token;
+        // synchronization may be needed since there can be multiple Handler
+        // threads using saslServer to wrap responses.
+        synchronized (((SecureConnection)connection).saslServer) {
+          token = ((SecureConnection)connection).saslServer.wrap(buf.array(),
+              buf.arrayOffset(), buf.remaining());
+        }
+        if (LOG.isDebugEnabled())
+          LOG.debug("Adding saslServer wrapped token of size " + token.length
+              + " as call response.");
+        buf.clear();
+        DataOutputStream saslOut = new DataOutputStream(response);
+        saslOut.writeInt(token.length);
+        saslOut.write(token, 0, token.length);
+      }
+    }
+  }
+
+  /** Reads calls from a connection and queues them for handling. */
+  public class SecureConnection extends HBaseServer.Connection  {
+    private boolean rpcHeaderRead = false; // if initial rpc header is read
+    private boolean headerRead = false;  //if the connection header that
+                                         //follows version is read.
+    private ByteBuffer data;
+    private ByteBuffer dataLengthBuffer;
+    protected final LinkedList<SecureCall> responseQueue;
+    private int dataLength;
+    private InetAddress addr;
+
+    boolean useSasl;
+    SaslServer saslServer;
+    private AuthMethod authMethod;
+    private boolean saslContextEstablished;
+    private boolean skipInitialSaslHandshake;
+    private ByteBuffer rpcHeaderBuffer;
+    private ByteBuffer unwrappedData;
+    private ByteBuffer unwrappedDataLengthBuffer;
+
+    public UserGroupInformation attemptingUser = null; // user name before auth
+
+    // Fake 'call' for failed authorization response
+    private final int AUTHORIZATION_FAILED_CALLID = -1;
+    // Fake 'call' for SASL context setup
+    private static final int SASL_CALLID = -33;
+    private final SecureCall saslCall = new SecureCall(SASL_CALLID, null, this, null);
+
+    private boolean useWrap = false;
+
+    public SecureConnection(SocketChannel channel, long lastContact) {
+      super(channel, lastContact);
+      this.header = new SecureConnectionHeader();
+      this.channel = channel;
+      this.data = null;
+      this.dataLengthBuffer = ByteBuffer.allocate(4);
+      this.unwrappedData = null;
+      this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
+      this.socket = channel.socket();
+      this.addr = socket.getInetAddress();
+      this.responseQueue = new LinkedList<SecureCall>();
+    }
+
+    @Override
+    public String toString() {
+      return getHostAddress() + ":" + remotePort;
+    }
+
+    public String getHostAddress() {
+      return hostAddress;
+    }
+
+    public InetAddress getHostInetAddress() {
+      return addr;
+    }
+
+    private User getAuthorizedUgi(String authorizedId)
+        throws IOException {
+      if (authMethod == AuthMethod.DIGEST) {
+        TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
+            secretManager);
+        UserGroupInformation ugi = tokenId.getUser();
+        if (ugi == null) {
+          throw new AccessControlException(
+              "Can't retrieve username from tokenIdentifier.");
+        }
+        ugi.addTokenIdentifier(tokenId);
+        return User.create(ugi);
+      } else {
+        return User.create(UserGroupInformation.createRemoteUser(authorizedId));
+      }
+    }
+
+    private void saslReadAndProcess(byte[] saslToken) throws IOException,
+        InterruptedException {
+      if (!saslContextEstablished) {
+        byte[] replyToken = null;
+        try {
+          if (saslServer == null) {
+            switch (authMethod) {
+            case DIGEST:
+              if (secretManager == null) {
+                throw new AccessControlException(
+                    "Server is not configured to do DIGEST authentication.");
+              }
+              saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
+                  .getMechanismName(), null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
+                  HBaseSaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
+                      secretManager, this));
+              break;
+            default:
+              UserGroupInformation current = UserGroupInformation
+                  .getCurrentUser();
+              String fullName = current.getUserName();
+              if (LOG.isDebugEnabled())
+                LOG.debug("Kerberos principal name is " + fullName);
+              final String names[] = HBaseSaslRpcServer.splitKerberosName(fullName);
+              if (names.length != 3) {
+                throw new AccessControlException(
+                    "Kerberos principal name does NOT have the expected "
+                        + "hostname part: " + fullName);
+              }
+              current.doAs(new PrivilegedExceptionAction<Object>() {
+                @Override
+                public Object run() throws SaslException {
+                  saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
+                      .getMechanismName(), names[0], names[1],
+                      HBaseSaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
+                  return null;
+                }
+              });
+            }
+            if (saslServer == null)
+              throw new AccessControlException(
+                  "Unable to find SASL server implementation for "
+                      + authMethod.getMechanismName());
+            if (LOG.isDebugEnabled())
+              LOG.debug("Created SASL server with mechanism = "
+                  + authMethod.getMechanismName());
+          }
+          if (LOG.isDebugEnabled())
+            LOG.debug("Have read input token of size " + saslToken.length
+                + " for processing by saslServer.evaluateResponse()");
+          replyToken = saslServer.evaluateResponse(saslToken);
+        } catch (IOException e) {
+          IOException sendToClient = e;
+          Throwable cause = e;
+          while (cause != null) {
+            if (cause instanceof InvalidToken) {
+              sendToClient = (InvalidToken) cause;
+              break;
+            }
+            cause = cause.getCause();
+          }
+          doSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
+              sendToClient.getLocalizedMessage());
+          rpcMetrics.authenticationFailures.inc();
+          String clientIP = this.toString();
+          // attempting user could be null
+          AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
+          throw e;
+        }
+        if (replyToken != null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Will send token of size " + replyToken.length
+                + " from saslServer.");
+          doSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
+              null);
+        }
+        if (saslServer.isComplete()) {
+          LOG.debug("SASL server context established. Negotiated QoP is "
+              + saslServer.getNegotiatedProperty(Sasl.QOP));
+          String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+          useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
+          ticket = getAuthorizedUgi(saslServer.getAuthorizationID());
+          LOG.debug("SASL server successfully authenticated client: " + ticket);
+          rpcMetrics.authenticationSuccesses.inc();
+          AUDITLOG.trace(AUTH_SUCCESSFUL_FOR + ticket);
+          saslContextEstablished = true;
+        }
+      } else {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Have read input token of size " + saslToken.length
+              + " for processing by saslServer.unwrap()");
+
+        if (!useWrap) {
+          processOneRpc(saslToken);
+        } else {
+          byte[] plaintextData = saslServer.unwrap(saslToken, 0,
+              saslToken.length);
+          processUnwrappedData(plaintextData);
+        }
+      }
+    }
+
+    private void doSaslReply(SaslStatus status, Writable rv,
+        String errorClass, String error) throws IOException {
+      saslCall.setResponse(rv,
+          status == SaslStatus.SUCCESS ? Status.SUCCESS : Status.ERROR,
+           errorClass, error);
+      saslCall.responder = responder;
+      saslCall.sendResponseIfReady();
+    }
+
+    private void disposeSasl() {
+      if (saslServer != null) {
+        try {
+          saslServer.dispose();
+        } catch (SaslException ignored) {
+        }
+      }
+    }
+
+    public int readAndProcess() throws IOException, InterruptedException {
+      while (true) {
+        /* Read at most one RPC. If the header is not read completely yet
+         * then iterate until we read first RPC or until there is no data left.
+         */
+        int count = -1;
+        if (dataLengthBuffer.remaining() > 0) {
+          count = channelRead(channel, dataLengthBuffer);
+          if (count < 0 || dataLengthBuffer.remaining() > 0)
+            return count;
+        }
+
+        if (!rpcHeaderRead) {
+          //Every connection is expected to send the header.
+          if (rpcHeaderBuffer == null) {
+            rpcHeaderBuffer = ByteBuffer.allocate(2);
+          }
+          count = channelRead(channel, rpcHeaderBuffer);
+          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
+            return count;
+          }
+          int version = rpcHeaderBuffer.get(0);
+          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
+          authMethod = AuthMethod.read(new DataInputStream(
+              new ByteArrayInputStream(method)));
+          dataLengthBuffer.flip();
+          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
+            //Warning is ok since this is not supposed to happen.
+            LOG.warn("Incorrect header or version mismatch from " +
+                     hostAddress + ":" + remotePort +
+                     " got version " + version +
+                     " expected version " + CURRENT_VERSION);
+            return -1;
+          }
+          dataLengthBuffer.clear();
+          if (authMethod == null) {
+            throw new IOException("Unable to read authentication method");
+          }
+          if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+            AccessControlException ae = new AccessControlException(
+                "Authentication is required");
+            SecureCall failedCall = new SecureCall(AUTHORIZATION_FAILED_CALLID, null, this,
+                null);
+            failedCall.setResponse(null, Status.FATAL, ae.getClass().getName(),
+                ae.getMessage());
+            responder.doRespond(failedCall);
+            throw ae;
+          }
+          if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+            doSaslReply(SaslStatus.SUCCESS, new IntWritable(
+                HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
+            authMethod = AuthMethod.SIMPLE;
+            // client has already sent the initial Sasl message and we
+            // should ignore it. Both client and server should fall back
+            // to simple auth from now on.
+            skipInitialSaslHandshake = true;
+          }
+          if (authMethod != AuthMethod.SIMPLE) {
+            useSasl = true;
+          }
+
+          rpcHeaderBuffer = null;
+          rpcHeaderRead = true;
+          continue;
+        }
+
+        if (data == null) {
+          dataLengthBuffer.flip();
+          dataLength = dataLengthBuffer.getInt();
+
+          if (dataLength == HBaseClient.PING_CALL_ID) {
+            if(!useWrap) { //covers the !useSasl too
+              dataLengthBuffer.clear();
+              return 0;  //ping message
+            }
+          }
+          if (dataLength < 0) {
+            LOG.warn("Unexpected data length " + dataLength + "!! from " +
+                getHostAddress());
+          }
+          data = ByteBuffer.allocate(dataLength);
+          incRpcCount();  // Increment the rpc count
+        }
+
+        count = channelRead(channel, data);
+
+        if (data.remaining() == 0) {
+          dataLengthBuffer.clear();
+          data.flip();
+          if (skipInitialSaslHandshake) {
+            data = null;
+            skipInitialSaslHandshake = false;
+            continue;
+          }
+          boolean isHeaderRead = headerRead;
+          if (useSasl) {
+            saslReadAndProcess(data.array());
+          } else {
+            processOneRpc(data.array());
+          }
+          data = null;
+          if (!isHeaderRead) {
+            continue;
+          }
+        }
+        return count;
+      }
+    }
+
+    /// Reads the connection header following version
+    private void processHeader(byte[] buf) throws IOException {
+      DataInputStream in =
+        new DataInputStream(new ByteArrayInputStream(buf));
+      header.readFields(in);
+      try {
+        String protocolClassName = header.getProtocol();
+        if (protocolClassName != null) {
+          protocol = getProtocolClass(header.getProtocol(), conf);
+        }
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException("Unknown protocol: " + header.getProtocol());
+      }
+
+      User protocolUser = header.getUser();
+      if (!useSasl) {
+        ticket = protocolUser;
+        if (ticket != null) {
+          ticket.getUGI().setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+        }
+      } else {
+        // user is authenticated
+        ticket.getUGI().setAuthenticationMethod(authMethod.authenticationMethod);
+        //Now we check if this is a proxy user case. If the protocol user is
+        //different from the 'user', it is a proxy user scenario. However,
+        //this is not allowed if user authenticated with DIGEST.
+        if ((protocolUser != null)
+            && (!protocolUser.getName().equals(ticket.getName()))) {
+          if (authMethod == AuthMethod.DIGEST) {
+            // Not allowed to doAs if token authentication is used
+            throw new AccessControlException("Authenticated user (" + ticket
+                + ") doesn't match what the client claims to be ("
+                + protocolUser + ")");
+          } else {
+            // Effective user can be different from authenticated user
+            // for simple auth or kerberos auth
+            // The user is the real user. Now we create a proxy user
+            UserGroupInformation realUser = ticket.getUGI();
+            ticket = User.create(
+                UserGroupInformation.createProxyUser(protocolUser.getName(),
+                    realUser));
+            // Now the user is a proxy user, set Authentication method Proxy.
+            ticket.getUGI().setAuthenticationMethod(AuthenticationMethod.PROXY);
+          }
+        }
+      }
+    }
+
+    private void processUnwrappedData(byte[] inBuf) throws IOException,
+        InterruptedException {
+      ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
+          inBuf));
+      // Read all RPCs contained in the inBuf, even partial ones
+      while (true) {
+        int count = -1;
+        if (unwrappedDataLengthBuffer.remaining() > 0) {
+          count = channelRead(ch, unwrappedDataLengthBuffer);
+          if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
+            return;
+        }
+
+        if (unwrappedData == null) {
+          unwrappedDataLengthBuffer.flip();
+          int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
+
+          if (unwrappedDataLength == HBaseClient.PING_CALL_ID) {
+            if (LOG.isDebugEnabled())
+              LOG.debug("Received ping message");
+            unwrappedDataLengthBuffer.clear();
+            continue; // ping message
+          }
+          unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
+        }
+
+        count = channelRead(ch, unwrappedData);
+        if (count <= 0 || unwrappedData.remaining() > 0)
+          return;
+
+        if (unwrappedData.remaining() == 0) {
+          unwrappedDataLengthBuffer.clear();
+          unwrappedData.flip();
+          processOneRpc(unwrappedData.array());
+          unwrappedData = null;
+        }
+      }
+    }
+
+    private void processOneRpc(byte[] buf) throws IOException,
+        InterruptedException {
+      if (headerRead) {
+        processData(buf);
+      } else {
+        processHeader(buf);
+        headerRead = true;
+        if (!authorizeConnection()) {
+          throw new AccessControlException("Connection from " + this
+              + " for protocol " + header.getProtocol()
+              + " is unauthorized for user " + ticket);
+        }
+      }
+    }
+
+    protected void processData(byte[] buf) throws  IOException, InterruptedException {
+      DataInputStream dis =
+        new DataInputStream(new ByteArrayInputStream(buf));
+      int id = dis.readInt();                    // try to read an id
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(" got #" + id);
+      }
+
+      Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
+      param.readFields(dis);
+
+      SecureCall call = new SecureCall(id, param, this, responder);
+
+      if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
+        priorityCallQueue.put(call);
+      } else {
+        callQueue.put(call);              // queue the call; maybe blocked here
+      }
+    }
+
+    private boolean authorizeConnection() throws IOException {
+      try {
+        // If auth method is DIGEST, the token was obtained by the
+        // real user for the effective user, therefore not required to
+        // authorize real user. doAs is allowed only for simple or kerberos
+        // authentication
+        if (ticket != null && ticket.getUGI().getRealUser() != null
+            && (authMethod != AuthMethod.DIGEST)) {
+          ProxyUsers.authorize(ticket.getUGI(), this.getHostAddress(), conf);
+        }
+        authorize(ticket, header, getHostInetAddress());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Successfully authorized " + header);
+        }
+        rpcMetrics.authorizationSuccesses.inc();
+      } catch (AuthorizationException ae) {
+        LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
+        rpcMetrics.authorizationFailures.inc();
+        SecureCall failedCall = new SecureCall(AUTHORIZATION_FAILED_CALLID, null, this,
+            null);
+        failedCall.setResponse(null, Status.FATAL, ae.getClass().getName(),
+            ae.getMessage());
+        responder.doRespond(failedCall);
+        return false;
+      }
+      return true;
+    }
+
+    protected synchronized void close() {
+      disposeSasl();
+      data = null;
+      dataLengthBuffer = null;
+      if (!channel.isOpen())
+        return;
+      try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
+      if (channel.isOpen()) {
+        try {channel.close();} catch(Exception ignored) {}
+      }
+      try {socket.close();} catch(Exception ignored) {}
+    }
+  }
+
+  /** Constructs a server listening on the named port and address.  Parameters passed must
+   * be of the named class.  The <code>handlerCount</handlerCount> determines
+   * the number of handler threads that will be used to process calls.
+   *
+   */
+  @SuppressWarnings("unchecked")
+  protected SecureServer(String bindAddress, int port,
+                  Class<? extends Writable> paramClass, int handlerCount,
+                  int priorityHandlerCount, Configuration conf, String serverName,
+                  int highPriorityLevel)
+    throws IOException {
+    super(bindAddress, port, paramClass, handlerCount, priorityHandlerCount,
+        conf, serverName, highPriorityLevel);
+    this.authorize =
+      conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
+    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+    LOG.debug("security enabled="+isSecurityEnabled);
+
+    if (isSecurityEnabled) {
+      HBaseSaslRpcServer.init(conf);
+    }
+  }
+
+  @Override
+  protected Connection getConnection(SocketChannel channel, long time) {
+    return new SecureConnection(channel, time);
+  }
+
+  Configuration getConf() {
+    return conf;
+  }
+
+  /** for unit testing only, should be called before server is started */
+  void disableSecurity() {
+    this.isSecurityEnabled = false;
+  }
+
+  /** for unit testing only, should be called before server is started */
+  void enableSecurity() {
+    this.isSecurityEnabled = true;
+  }
+
+  /** Stops the service.  No new calls will be handled after this is called. */
+  public synchronized void stop() {
+    super.stop();
+  }
+
+  public SecretManager<? extends TokenIdentifier> getSecretManager() {
+    return this.secretManager;
+  }
+
+  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
+    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;    
+  }
+
+  /**
+   * Authorize the incoming client connection.
+   *
+   * @param user client user
+   * @param connection incoming connection
+   * @param addr InetAddress of incoming connection
+   * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
+   */
+  public void authorize(User user,
+                        ConnectionHeader connection,
+                        InetAddress addr
+                        ) throws AuthorizationException {
+    if (authorize) {
+      Class<?> protocol = null;
+      try {
+        protocol = getProtocolClass(connection.getProtocol(), getConf());
+      } catch (ClassNotFoundException cfne) {
+        throw new AuthorizationException("Unknown protocol: " +
+                                         connection.getProtocol());
+      }
+      authManager.authorize(user != null ? user.getUGI() : null,
+          protocol, getConf(), addr);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java?rev=1203512&view=auto
==============================================================================
--- hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java (added)
+++ hbase/branches/0.92/security/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java Fri Nov 18 07:13:03 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Exception thrown by access-related methods.
+ */
+public class AccessDeniedException extends DoNotRetryIOException {
+  private static final long serialVersionUID = 1913879564363001780L;
+
+  public AccessDeniedException() {
+    super();
+  }
+
+  public AccessDeniedException(Class<?> clazz, String s) {
+    super( "AccessDenied [" + clazz.getName() + "]: " + s);
+  }
+
+  public AccessDeniedException(String s) {
+    super(s);
+  }
+}



Mime
View raw message