hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1203515 [3/4] - in /hbase/trunk: ./ conf/ security/ security/src/ security/src/main/ security/src/main/java/ security/src/main/java/org/ security/src/main/java/org/apache/ security/src/main/java/org/apache/hadoop/ security/src/main/java/or...
Date Fri, 18 Nov 2011 07:34:45 GMT
Added: hbase/trunk/security/src/test/resources/hbase-site.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/test/resources/hbase-site.xml?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/security/src/test/resources/hbase-site.xml (added)
+++ hbase/trunk/security/src/test/resources/hbase-site.xml Fri Nov 18 07:34:43 2011
@@ -0,0 +1,153 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>hbase.regionserver.msginterval</name>
+    <value>1000</value>
+    <description>Interval between messages from the RegionServer to HMaster
+    in milliseconds.  Default is 15. Set this value low if you want unit
+    tests to be responsive.
+    </description>
+  </property>
+  <property>
+    <name>hbase.client.pause</name>
+    <value>1000</value>
+    <description>General client pause value.  Used mostly as value to wait
+    before running a retry of a failed get, region lookup, etc.</description>
+  </property>
+  <property>
+    <name>hbase.client.retries.number</name>
+    <value>10</value>
+    <description>Maximum retries.  Used as maximum for all retryable
+    operations such as fetching of the root region from root region
+    server, getting a cell's value, starting a row update, etc.
+    Default: 10.
+    </description>
+  </property>
+  <property>
+    <name>hbase.server.thread.wakefrequency</name>
+    <value>1000</value>
+    <description>Time to sleep in between searches for work (in milliseconds).
+    Used as sleep interval by service threads such as META scanner and log roller.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.event.waiting.time</name>
+    <value>50</value>
+    <description>Time to sleep between checks to see if a table event took place.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>5</value>
+    <description>Count of RPC Server instances spun up on RegionServers
+    Same property is used by the HMaster for count of master handlers.
+    Default is 10.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase master web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase regionserver web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port.auto</name>
+    <value>true</value>
+    <description>Info server auto port bind. Enables automatic port
+    search if hbase.regionserver.info.port is already in use.
+    Enabled for testing to run multiple tests on one machine.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.thread.wakefrequency</name>
+    <value>3000</value>
+    <description>The interval between checks for expired region server leases.
+    This value has been reduced due to the other reduced values above so that
+    the master will notice a dead region server sooner. The default is 15 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>1000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 60,000.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.safemode</name>
+    <value>false</value>
+    <description>
+    Turn on/off safe mode in region server. Always on for production, always off
+    for tests.
+    </description>
+  </property>
+  <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>67108864</value>
+    <description>
+    Maximum desired file size for an HRegion.  If filesize exceeds
+    value + (value / 2), the HRegion is split in two.  Default: 256M.
+
+    Keep the maximum filesize small so we split more often in tests.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.log.dir</name>
+    <value>${user.dir}/../logs</value>
+  </property>
+  <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>21818</value>
+    <description>Property from ZooKeeper's config zoo.cfg.
+    The port at which the clients will connect.
+    </description>
+  </property>
+  <property>
+    <name>hbase.defaults.for.version.skip</name>
+    <value>true</value>
+    <description>
+    Set to true to skip the 'hbase.defaults.for.version'.
+    Setting this to true can be useful in contexts other than
+    the other side of a maven generation; i.e. running in an
+    ide.  You'll want to set this boolean to true to avoid
+    seeing the RuntimException complaint: "hbase-default.xml file
+    seems to be for and old version of HBase (@@@VERSION@@@), this
+    version is X.X.X-SNAPSHOT"
+    </description>
+  </property>
+  <property>
+   <name>hbase.rpc.engine</name>
+   <value>org.apache.hadoop.hbase.ipc.SecureRpcEngine</value>
+  </property>
+</configuration>

Modified: hbase/trunk/src/assembly/all.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/assembly/all.xml?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/assembly/all.xml (original)
+++ hbase/trunk/src/assembly/all.xml Fri Nov 18 07:34:43 2011
@@ -57,8 +57,8 @@
       <directory>target</directory>
       <outputDirectory>/</outputDirectory>
       <includes>
-          <include>hbase-${project.version}.jar</include>
-          <include>hbase-${project.version}-tests.jar</include>
+          <include>${project.build.finalName}.jar</include>
+          <include>${project.build.finalName}-tests.jar</include>
       </includes>
     </fileSet>
     <fileSet>

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerAddress.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerAddress.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerAddress.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerAddress.java Fri Nov 18 07:34:43 2011
@@ -121,7 +121,10 @@ public class HServerAddress implements W
 
   /** @return Hostname */
   public String getHostname() {
-    return this.address.getHostName();
+    // Kerberos is case-sensitive, and dictates that, where hostnames are
+    // case-insensitive (as in DNS), the lowercase version must be used
+    // So here we lowercase to properly interact with kerberos auth
+    return this.address.getHostName().toLowerCase();
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Nov 18 07:34:43 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.ipc.ExecR
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -137,6 +138,8 @@ public class HConnectionManager {
 
   public static final int MAX_CACHED_HBASE_INSTANCES;
 
+  private static Log LOG = LogFactory.getLog(HConnectionManager.class);
+
   static {
     // We set instances to one more than the value specified for {@link
     // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
@@ -388,6 +391,7 @@ public class HConnectionManager {
         HConstants.HBASE_CLIENT_INSTANCE_ID };
 
     private Map<String, String> properties;
+    private String username;
 
     public HConnectionKey(Configuration conf) {
       Map<String, String> m = new HashMap<String, String>();
@@ -400,12 +404,25 @@ public class HConnectionManager {
         }
       }
       this.properties = Collections.unmodifiableMap(m);
+
+      try {
+        User currentUser = User.getCurrent();
+        if (currentUser != null) {
+          username = currentUser.getName();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Error obtaining current user, skipping username in HConnectionKey",
+            ioe);
+      }
     }
 
     @Override
     public int hashCode() {
       final int prime = 31;
       int result = 1;
+      if (username != null) {
+        result = username.hashCode();
+      }
       for (String property : CONNECTION_PROPERTIES) {
         String value = properties.get(property);
         if (value != null) {
@@ -425,6 +442,11 @@ public class HConnectionManager {
       if (getClass() != obj.getClass())
         return false;
       HConnectionKey that = (HConnectionKey) obj;
+      if (this.username != null && !this.username.equals(that.username)) {
+        return false;
+      } else if (this.username == null && that.username != null) {
+        return false;
+      }
       if (this.properties == null) {
         if (that.properties != null) {
           return false;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java Fri Nov 18 07:34:43 2011
@@ -25,26 +25,26 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hbase.security.User;
 
 /**
  * The IPC connection header sent by the client to the server
  * on connection establishment.
  */
 class ConnectionHeader implements Writable {
-  private String protocol;
+  protected String protocol;
 
   public ConnectionHeader() {}
 
   /**
    * Create a new {@link ConnectionHeader} with the given <code>protocol</code>
-   * and {@link UserGroupInformation}.
+   * and {@link User}.
    * @param protocol protocol used for communication between the IPC client
    *                 and the server
-   * @param ugi {@link UserGroupInformation} of the client communicating with
+   * @param user {@link User} of the client communicating with
    *            the server
    */
-  public ConnectionHeader(String protocol, UserGroupInformation ugi) {
+  public ConnectionHeader(String protocol, User user) {
     this.protocol = protocol;
   }
 
@@ -65,7 +65,7 @@ class ConnectionHeader implements Writab
     return protocol;
   }
 
-  public UserGroupInformation getUgi() {
+  public User getUser() {
     return null;
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri Nov 18 07:34:43 2011
@@ -45,6 +45,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
@@ -53,9 +54,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
@@ -159,7 +158,7 @@ public class HBaseClient {
   }
 
   /** A call waiting for a value. */
-  private class Call {
+  protected class Call {
     final int id;                                       // call id
     final Writable param;                               // parameter
     Writable value;                               // value, null if error
@@ -210,18 +209,18 @@ public class HBaseClient {
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
-  private class Connection extends Thread {
+  protected class Connection extends Thread {
     private ConnectionHeader header;              // connection header
-    private ConnectionId remoteId;
-    private Socket socket = null;                 // connected socket
-    private DataInputStream in;
-    private DataOutputStream out;
+    protected ConnectionId remoteId;
+    protected Socket socket = null;                 // connected socket
+    protected DataInputStream in;
+    protected DataOutputStream out;
 
     // currently active calls
-    private final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
-    private final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
+    protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
+    protected final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
-    private IOException closeException; // close reason
+    protected IOException closeException; // close reason
 
     public Connection(ConnectionId remoteId) throws IOException {
       if (remoteId.getAddress().isUnresolved()) {
@@ -229,7 +228,7 @@ public class HBaseClient {
                                        remoteId.getAddress().getHostName());
       }
       this.remoteId = remoteId;
-      UserGroupInformation ticket = remoteId.getTicket();
+      User ticket = remoteId.getTicket();
       Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
 
       header = new ConnectionHeader(
@@ -237,12 +236,12 @@ public class HBaseClient {
 
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
         remoteId.getAddress().toString() +
-        ((ticket==null)?" from an unknown user": (" from " + ticket.getUserName())));
+        ((ticket==null)?" from an unknown user": (" from " + ticket.getName())));
       this.setDaemon(true);
     }
 
     /** Update lastActivity with the current time. */
-    private void touch() {
+    protected void touch() {
       lastActivity.set(System.currentTimeMillis());
     }
 
@@ -265,7 +264,7 @@ public class HBaseClient {
      * reading. If no failure is detected, it retries until at least
      * a byte is read.
      */
-    private class PingInputStream extends FilterInputStream {
+    protected class PingInputStream extends FilterInputStream {
       /* constructor */
       protected PingInputStream(InputStream in) {
         super(in);
@@ -317,40 +316,50 @@ public class HBaseClient {
       }
     }
 
+    protected synchronized void setupConnection() throws IOException {
+      short ioFailures = 0;
+      short timeoutFailures = 0;
+      while (true) {
+        try {
+          this.socket = socketFactory.createSocket();
+          this.socket.setTcpNoDelay(tcpNoDelay);
+          this.socket.setKeepAlive(tcpKeepAlive);
+          // connection time out is 20s
+          NetUtils.connect(this.socket, remoteId.getAddress(),
+              getSocketTimeout(conf));
+          if (remoteId.rpcTimeout > 0) {
+            pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
+          }
+          this.socket.setSoTimeout(pingInterval);
+          return;
+        } catch (SocketTimeoutException toe) {
+          /* The max number of retries is 45,
+           * which amounts to 20s*45 = 15 minutes retries.
+           */
+          handleConnectionFailure(timeoutFailures++, maxRetries, toe);
+        } catch (IOException ie) {
+          handleConnectionFailure(ioFailures++, maxRetries, ie);
+        }
+      }
+    }
+
     /** Connect to the server and set up the I/O streams. It then sends
      * a header to the server and starts
      * the connection thread that waits for responses.
      * @throws java.io.IOException e
      */
-    protected synchronized void setupIOstreams() throws IOException {
+    protected synchronized void setupIOstreams()
+        throws IOException, InterruptedException {
+
       if (socket != null || shouldCloseConnection.get()) {
         return;
       }
 
-      short ioFailures = 0;
-      short timeoutFailures = 0;
       try {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Connecting to "+remoteId.getAddress());
-        }
-        while (true) {
-          try {
-            this.socket = socketFactory.createSocket();
-            this.socket.setTcpNoDelay(tcpNoDelay);
-            this.socket.setKeepAlive(tcpKeepAlive);
-            NetUtils.connect(this.socket, remoteId.getAddress(),
-              getSocketTimeout(conf));
-            if (remoteId.rpcTimeout > 0) {
-              pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
-            }
-            this.socket.setSoTimeout(pingInterval);
-            break;
-          } catch (SocketTimeoutException toe) {
-            handleConnectionFailure(timeoutFailures++, maxRetries, toe);
-          } catch (IOException ie) {
-            handleConnectionFailure(ioFailures++, maxRetries, ie);
-          }
+          LOG.debug("Connecting to "+remoteId);
         }
+        setupConnection();
         this.in = new DataInputStream(new BufferedInputStream
             (new PingInputStream(NetUtils.getInputStream(socket))));
         this.out = new DataOutputStream
@@ -370,7 +379,22 @@ public class HBaseClient {
       }
     }
 
-    /* Handle connection failures
+    protected void closeConnection() {
+      // close the current connection
+      if (socket != null) {
+        try {
+          socket.close();
+        } catch (IOException e) {
+          LOG.warn("Not able to close a socket", e);
+        }
+      }
+      // set socket to null so that the next call to setupIOstreams
+      // can start the process of connect all over again.
+      socket = null;
+    }
+
+    /**
+     *  Handle connection failures
      *
      * If the current number of retries is equal to the max number of retries,
      * stop retrying and throw the exception; Otherwise backoff N seconds and
@@ -386,17 +410,8 @@ public class HBaseClient {
      */
     private void handleConnectionFailure(
         int curRetries, int maxRetries, IOException ioe) throws IOException {
-      // close the current connection
-      if (socket != null) { // could be null if the socket creation failed
-        try {
-          socket.close();
-        } catch (IOException e) {
-          LOG.warn("Not able to close a socket", e);
-        }
-      }
-      // set socket to null so that the next call to setupIOstreams
-      // can start the process of connect all over again.
-      socket = null;
+
+      closeConnection();
 
       // throw the exception if the maximum number of retries is reached
       if (curRetries >= maxRetries) {
@@ -435,7 +450,7 @@ public class HBaseClient {
      * Return true if it is time to read a response; false otherwise.
      */
     @SuppressWarnings({"ThrowableInstanceNeverThrown"})
-    private synchronized boolean waitForWork() {
+    protected synchronized boolean waitForWork() {
       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
         long timeout = maxIdleTime-
               (System.currentTimeMillis()-lastActivity.get());
@@ -541,7 +556,7 @@ public class HBaseClient {
     /* Receive a response.
      * Because only one receiver, so no synchronization on in.
      */
-    private void receiveResponse() {
+    protected void receiveResponse() {
       if (shouldCloseConnection.get()) {
         return;
       }
@@ -598,7 +613,7 @@ public class HBaseClient {
       }
     }
 
-    private synchronized void markClosed(IOException e) {
+    protected synchronized void markClosed(IOException e) {
       if (shouldCloseConnection.compareAndSet(false, true)) {
         closeException = e;
         notifyAll();
@@ -606,7 +621,7 @@ public class HBaseClient {
     }
 
     /** Close the connection. */
-    private synchronized void close() {
+    protected synchronized void close() {
       if (!shouldCloseConnection.get()) {
         LOG.error("The connection is not in the closed state");
         return;
@@ -647,11 +662,11 @@ public class HBaseClient {
     }
 
     /* Cleanup all calls and mark them as done */
-    private void cleanupCalls() {
+    protected void cleanupCalls() {
       cleanupCalls(0);
     }
 
-    private void cleanupCalls(long rpcTimeout) {
+    protected void cleanupCalls(long rpcTimeout) {
       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
       while (itor.hasNext()) {
         Call c = itor.next().getValue();
@@ -687,7 +702,7 @@ public class HBaseClient {
   }
 
   /** Call implementation used for parallel calls. */
-  private class ParallelCall extends Call {
+  protected class ParallelCall extends Call {
     private final ParallelResults results;
     protected final int index;
 
@@ -705,7 +720,7 @@ public class HBaseClient {
   }
 
   /** Result collector for parallel calls. */
-  private static class ParallelResults {
+  protected static class ParallelResults {
     protected final Writable[] values;
     protected int size;
     protected int count;
@@ -778,7 +793,7 @@ public class HBaseClient {
    * @return either a {@link PoolType#RoundRobin} or
    *         {@link PoolType#ThreadLocal}
    */
-  private static PoolType getPoolType(Configuration config) {
+  protected static PoolType getPoolType(Configuration config) {
     return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
         PoolType.RoundRobin, PoolType.ThreadLocal);
   }
@@ -790,7 +805,7 @@ public class HBaseClient {
    * @param config
    * @return the maximum pool size
    */
-  private static int getPoolSize(Configuration config) {
+  protected static int getPoolSize(Configuration config) {
     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
   }
 
@@ -843,7 +858,7 @@ public class HBaseClient {
   }
 
   public Writable call(Writable param, InetSocketAddress addr,
-                       UserGroupInformation ticket, int rpcTimeout)
+                       User ticket, int rpcTimeout)
                        throws IOException, InterruptedException {
     return call(param, addr, null, ticket, rpcTimeout);
   }
@@ -855,7 +870,7 @@ public class HBaseClient {
    * threw an exception. */
   public Writable call(Writable param, InetSocketAddress addr,
                        Class<? extends VersionedProtocol> protocol,
-                       UserGroupInformation ticket, int rpcTimeout)
+                       User ticket, int rpcTimeout)
       throws InterruptedException, IOException {
     Call call = new Call(param);
     Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
@@ -902,7 +917,7 @@ public class HBaseClient {
    * @return an exception to throw
    */
   @SuppressWarnings({"ThrowableInstanceNeverThrown"})
-  private IOException wrapException(InetSocketAddress addr,
+  protected IOException wrapException(InetSocketAddress addr,
                                          IOException exception) {
     if (exception instanceof ConnectException) {
       //connection refused; include the host:port in the error
@@ -929,7 +944,7 @@ public class HBaseClient {
    * @param addresses socket addresses
    * @return  Writable[]
    * @throws IOException e
-   * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, UserGroupInformation)} instead
+   * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, User)} instead
    */
   @Deprecated
   public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
@@ -943,7 +958,7 @@ public class HBaseClient {
    * contains nulls for calls that timed out or errored.  */
   public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
                          Class<? extends VersionedProtocol> protocol,
-                         UserGroupInformation ticket)
+                         User ticket)
       throws IOException, InterruptedException {
     if (addresses.length == 0) return new Writable[0];
 
@@ -976,12 +991,12 @@ public class HBaseClient {
 
   /* Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
-  private Connection getConnection(InetSocketAddress addr,
+  protected Connection getConnection(InetSocketAddress addr,
                                    Class<? extends VersionedProtocol> protocol,
-                                   UserGroupInformation ticket,
+                                   User ticket,
                                    int rpcTimeout,
                                    Call call)
-                                   throws IOException {
+                                   throws IOException, InterruptedException {
     if (!running.get()) {
       // the client is stopped
       throw new IOException("The client is stopped");
@@ -1014,16 +1029,16 @@ public class HBaseClient {
    * This class holds the address and the user ticket. The client connections
    * to servers are uniquely identified by <remoteAddress, ticket>
    */
-  private static class ConnectionId {
+  protected static class ConnectionId {
     final InetSocketAddress address;
-    final UserGroupInformation ticket;
-    final private int rpcTimeout;
+    final User ticket;
+    final int rpcTimeout;
     Class<? extends VersionedProtocol> protocol;
     private static final int PRIME = 16777619;
 
     ConnectionId(InetSocketAddress address,
         Class<? extends VersionedProtocol> protocol,
-        UserGroupInformation ticket,
+        User ticket,
         int rpcTimeout) {
       this.protocol = protocol;
       this.address = address;
@@ -1039,7 +1054,7 @@ public class HBaseClient {
       return protocol;
     }
 
-    UserGroupInformation getTicket() {
+    User getTicket() {
       return ticket;
     }
 
@@ -1048,8 +1063,8 @@ public class HBaseClient {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
        return address.equals(id.address) && protocol == id.protocol &&
-           ticket == id.ticket && rpcTimeout == id.rpcTimeout;
-       //Note : ticket is a ref comparision.
+              ((ticket != null && ticket.equals(id.ticket)) ||
+               (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;
      }
      return false;
     }
@@ -1058,8 +1073,7 @@ public class HBaseClient {
     public int hashCode() {
       return (address.hashCode() + PRIME * (
                   PRIME * System.identityHashCode(protocol) ^
-                  System.identityHashCode(ticket)
-                )) ^ rpcTimeout;
+             (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
     }
   }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Fri Nov 18 07:34:43 2011
@@ -26,12 +26,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
-
 import javax.net.SocketFactory;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -78,7 +76,12 @@ public class HBaseRPC {
     super();
   }                                  // no public ctor
 
-  private static final String RPC_ENGINE_PROP = "hbase.rpc.engine";
+  /**
+   * Configuration key for the {@link RpcEngine} implementation to load to
+   * handle connection protocols.  Handlers for individual protocols can be
+   * configured using {@code "hbase.rpc.engine." + protocol.class.name}.
+   */
+  public static final String RPC_ENGINE_PROP = "hbase.rpc.engine";
 
   // cache of RpcEngines by protocol
   private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
@@ -274,8 +277,8 @@ public class HBaseRPC {
   public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf,
       SocketFactory factory, int rpcTimeout) throws IOException {
-    return getProxy(protocol, clientVersion, addr, null, conf, factory,
-        rpcTimeout);
+    return getProxy(protocol, clientVersion, addr,
+        User.getCurrent(), conf, factory, rpcTimeout);
   }
 
   /**
@@ -294,7 +297,7 @@ public class HBaseRPC {
    */
   public static VersionedProtocol getProxy(
       Class<? extends VersionedProtocol> protocol,
-      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
+      long clientVersion, InetSocketAddress addr, User ticket,
       Configuration conf, SocketFactory factory, int rpcTimeout)
   throws IOException {
     VersionedProtocol proxy =
@@ -349,11 +352,16 @@ public class HBaseRPC {
    * @param conf configuration
    * @return values
    * @throws IOException e
+   * @deprecated Instead of calling statically, use
+   *     {@link HBaseRPC#getProtocolEngine(Class, org.apache.hadoop.conf.Configuration)}
+   *     to obtain an {@link RpcEngine} instance and then use
+   *     {@link RpcEngine#call(java.lang.reflect.Method, Object[][], java.net.InetSocketAddress[], Class, org.apache.hadoop.hbase.security.User, org.apache.hadoop.conf.Configuration)}
    */
+  @Deprecated
   public static Object[] call(Method method, Object[][] params,
       InetSocketAddress[] addrs,
       Class<? extends VersionedProtocol> protocol,
-      UserGroupInformation ticket,
+      User ticket,
       Configuration conf)
     throws IOException, InterruptedException {
     return getProtocolEngine(protocol, conf)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Fri Nov 18 07:34:43 2011
@@ -27,8 +27,7 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+import org.apache.hadoop.metrics.util.*;
 
 import java.lang.reflect.Method;
 
@@ -46,11 +45,15 @@ import java.lang.reflect.Method;
  */
 public class HBaseRpcMetrics implements Updater {
   public static final String NAME_DELIM = "$";
-  private MetricsRecord metricsRecord;
+  private final MetricsRegistry registry = new MetricsRegistry();
+  private final MetricsRecord metricsRecord;
+  private final RpcServer myServer;
   private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class);
   private final HBaseRPCStatistics rpcStatistics;
 
-  public HBaseRpcMetrics(String hostName, String port) {
+  public HBaseRpcMetrics(String hostName, String port,
+      final RpcServer server) {
+    myServer = server;
     MetricsContext context = MetricsUtil.getContext("rpc");
     metricsRecord = MetricsUtil.createRecord(context, "metrics");
 
@@ -73,13 +76,29 @@ public class HBaseRpcMetrics implements 
    *  - they can be set directly by calling their set/inc methods
    *  -they can also be read directly - e.g. JMX does this.
    */
-  public final MetricsRegistry registry = new MetricsRegistry();
 
-  public MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime", registry);
-  public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime", registry);
-  public MetricsTimeVaryingRate rpcSlowResponseTime = new MetricsTimeVaryingRate("RpcSlowResponse", registry);
-
-  //public Map <String, MetricsTimeVaryingRate> metricsList = Collections.synchronizedMap(new HashMap<String, MetricsTimeVaryingRate>());
+  public final MetricsTimeVaryingLong receivedBytes =
+         new MetricsTimeVaryingLong("ReceivedBytes", registry);
+  public final MetricsTimeVaryingLong sentBytes =
+         new MetricsTimeVaryingLong("SentBytes", registry);
+  public final MetricsTimeVaryingRate rpcQueueTime =
+          new MetricsTimeVaryingRate("RpcQueueTime", registry);
+  public MetricsTimeVaryingRate rpcProcessingTime =
+          new MetricsTimeVaryingRate("RpcProcessingTime", registry);
+  public final MetricsIntValue numOpenConnections =
+          new MetricsIntValue("NumOpenConnections", registry);
+  public final MetricsIntValue callQueueLen =
+          new MetricsIntValue("callQueueLen", registry);
+  public final MetricsTimeVaryingInt authenticationFailures = 
+          new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
+  public final MetricsTimeVaryingInt authenticationSuccesses =
+          new MetricsTimeVaryingInt("rpcAuthenticationSuccesses", registry);
+  public final MetricsTimeVaryingInt authorizationFailures =
+          new MetricsTimeVaryingInt("rpcAuthorizationFailures", registry);
+  public final MetricsTimeVaryingInt authorizationSuccesses =
+         new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
+  public MetricsTimeVaryingRate rpcSlowResponseTime =
+      new MetricsTimeVaryingRate("RpcSlowResponse", registry);
 
   private void initMethods(Class<? extends VersionedProtocol> protocol) {
     for (Method m : protocol.getDeclaredMethods()) {
@@ -182,19 +201,15 @@ public class HBaseRpcMetrics implements 
 
   /**
    * Push the metrics to the monitoring subsystem on doUpdate() call.
-   * @param context ctx
    */
-  public void doUpdates(MetricsContext context) {
-    rpcQueueTime.pushMetric(metricsRecord);
-    rpcProcessingTime.pushMetric(metricsRecord);
-
-    synchronized (registry) {
-      // Iterate through the registry to propagate the different rpc metrics.
-
-      for (String metricName : registry.getKeyList() ) {
-        MetricsTimeVaryingRate value = (MetricsTimeVaryingRate) registry.get(metricName);
-
-        value.pushMetric(metricsRecord);
+  public void doUpdates(final MetricsContext context) {
+    synchronized (this) {
+      // ToFix - fix server to use the following two metrics directly so
+      // the metrics do not have be copied here.
+      numOpenConnections.set(myServer.getNumOpenConnections());
+      callQueueLen.set(myServer.getCallQueueLen());
+      for (MetricsBase m : registry.getMetricsList()) {
+        m.pushMetric(metricsRecord);
       }
     }
     metricsRecord.update();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Nov 18 07:34:43 2011
@@ -62,13 +62,12 @@ import org.apache.hadoop.hbase.io.HbaseO
 import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
-import org.apache.hadoop.hbase.ipc.VersionedProtocol;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -200,7 +199,7 @@ public abstract class HBaseServer implem
   protected BlockingQueue<Call> callQueue; // queued calls
   protected BlockingQueue<Call> priorityCallQueue;
 
-  private int highPriorityLevel;  // what level a high priority call is at
+  protected int highPriorityLevel;  // what level a high priority call is at
 
   protected final List<Connection> connectionList =
     Collections.synchronizedList(new LinkedList<Connection>());
@@ -275,7 +274,7 @@ public abstract class HBaseServer implem
       return param.toString() + " from " + connection.toString();
     }
 
-    private synchronized void setResponse(Object value, Status status,
+    protected synchronized void setResponse(Object value, Status status,
         String errorClass, String error) {
       // Avoid overwriting an error value in the response.  This can happen if
       // endDelayThrowing is called by another thread before the actual call
@@ -608,7 +607,7 @@ public abstract class HBaseServer implem
           if (errorHandler != null) {
             if (errorHandler.checkOOME(e)) {
               LOG.info(getName() + ": exiting on OOME");
-              closeCurrentConnection(key);
+              closeCurrentConnection(key, e);
               cleanupConnections(true);
               return;
             }
@@ -617,12 +616,12 @@ public abstract class HBaseServer implem
             // log the event and sleep for a minute and give
             // some thread(s) a chance to finish
             LOG.warn("Out of Memory in server select", e);
-            closeCurrentConnection(key);
+            closeCurrentConnection(key, e);
             cleanupConnections(true);
             try { Thread.sleep(60000); } catch (Exception ignored) {}
       }
         } catch (Exception e) {
-          closeCurrentConnection(key);
+          closeCurrentConnection(key, e);
         }
         cleanupConnections(false);
       }
@@ -644,13 +643,16 @@ public abstract class HBaseServer implem
       }
     }
 
-    private void closeCurrentConnection(SelectionKey key) {
+    private void closeCurrentConnection(SelectionKey key, Throwable e) {
       if (key != null) {
         Connection c = (Connection)key.attachment();
         if (c != null) {
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
+                (e != null ? " on error " + e.getMessage() : ""));
+          }
           closeConnection(c);
+          key.attach(null);
         }
       }
     }
@@ -673,7 +675,7 @@ public abstract class HBaseServer implem
         try {
           reader.startAdd();
           SelectionKey readKey = reader.registerChannel(channel);
-          c = new Connection(channel, System.currentTimeMillis());
+          c = getConnection(channel, System.currentTimeMillis());
           readKey.attach(c);
           synchronized (connectionList) {
             connectionList.add(numConnections, c);
@@ -742,7 +744,7 @@ public abstract class HBaseServer implem
   }
 
   // Sends responses of RPC back to clients.
-  private class Responder extends Thread {
+  protected class Responder extends Thread {
     private final Selector writeSelector;
     private int pending;         // connections waiting to register
 
@@ -816,7 +818,11 @@ public abstract class HBaseServer implem
           }
 
           for(Call call : calls) {
-            doPurge(call, now);
+            try {
+              doPurge(call, now);
+            } catch (IOException e) {
+              LOG.warn("Error in purging old calls " + e);
+            }
           }
         } catch (OutOfMemoryError e) {
           if (errorHandler != null) {
@@ -832,7 +838,7 @@ public abstract class HBaseServer implem
             //
             LOG.warn("Out of Memory in server select", e);
             try { Thread.sleep(60000); } catch (Exception ignored) {}
-      }
+          }
         } catch (Exception e) {
           LOG.warn("Exception in Responder " +
                    StringUtils.stringifyException(e));
@@ -870,7 +876,7 @@ public abstract class HBaseServer implem
     // Remove calls that have been pending in the responseQueue
     // for a long time.
     //
-    private void doPurge(Call call, long now) {
+    private void doPurge(Call call, long now) throws IOException {
       synchronized (call.connection.responseQueue) {
         Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
         while (iter.hasNext()) {
@@ -1019,7 +1025,7 @@ public abstract class HBaseServer implem
   }
 
   /** Reads calls from a connection and queues them for handling. */
-  private class Connection {
+  protected class Connection {
     private boolean versionRead = false; //if initial signature and
                                          //version are read
     private boolean headerRead = false;  //if the connection header that
@@ -1034,11 +1040,11 @@ public abstract class HBaseServer implem
     protected Socket socket;
     // Cache the remote host & port info so that even if the socket is
     // disconnected, we can say where it used to connect to.
-    private String hostAddress;
-    private int remotePort;
+    protected String hostAddress;
+    protected int remotePort;
     ConnectionHeader header = new ConnectionHeader();
     Class<? extends VersionedProtocol> protocol;
-    protected UserGroupInformation ticket = null;
+    protected User ticket = null;
 
     public Connection(SocketChannel channel, long lastContact) {
       this.channel = channel;
@@ -1096,7 +1102,7 @@ public abstract class HBaseServer implem
     }
 
     /* Increment the outstanding RPC count */
-    private void incRpcCount() {
+    protected void incRpcCount() {
       rpcCount++;
     }
 
@@ -1158,7 +1164,7 @@ public abstract class HBaseServer implem
           dataLengthBuffer.clear();
           data.flip();
           if (headerRead) {
-            processData();
+            processData(data.array());
             data = null;
             return count;
           }
@@ -1214,17 +1220,16 @@ public abstract class HBaseServer implem
         throw new IOException("Unknown protocol: " + header.getProtocol());
       }
 
-      ticket = header.getUgi();
+      ticket = header.getUser();
     }
 
-    private void processData() throws  IOException, InterruptedException {
-      byte[] array = data.array();
+    protected void processData(byte[] buf) throws  IOException, InterruptedException {
       DataInputStream dis =
-        new DataInputStream(new ByteArrayInputStream(array));
+        new DataInputStream(new ByteArrayInputStream(buf));
       int id = dis.readInt();                    // try to read an id
 
       if (LOG.isDebugEnabled())
-        LOG.debug(" got call #" + id + ", " + array.length + " bytes");
+        LOG.debug(" got call #" + id + ", " + buf.length + " bytes");
 
       Writable param;
       try {
@@ -1307,6 +1312,15 @@ public abstract class HBaseServer implem
           try {
             if (!started)
               throw new ServerNotRunningYetException("Server is not running yet");
+
+            if (LOG.isDebugEnabled()) {
+              User remoteUser = call.connection.ticket;
+              LOG.debug(getName() + ": call #" + call.id + " executing as "
+                  + (remoteUser == null ? "NULL principal" : remoteUser.getName()));
+            }
+
+            RequestContext.set(call.connection.ticket, getRemoteIp(),
+                call.connection.protocol);
             // make the call
             value = call(call.connection.protocol, call.param, call.timestamp, 
                 status);
@@ -1314,6 +1328,10 @@ public abstract class HBaseServer implem
             LOG.debug(getName()+", call "+call+": error: " + e, e);
             errorClass = e.getClass().getName();
             error = StringUtils.stringifyException(e);
+          } finally {
+            // Must always clear the request context to avoid leaking
+            // credentials between requests.
+            RequestContext.clear();
           }
           CurCall.set(null);
 
@@ -1414,7 +1432,7 @@ public abstract class HBaseServer implem
     listener = new Listener();
     this.port = listener.getAddress().getPort();
     this.rpcMetrics = new HBaseRpcMetrics(serverName,
-                          Integer.toString(this.port));
+                          Integer.toString(this.port), this);
     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
 
@@ -1428,6 +1446,14 @@ public abstract class HBaseServer implem
   }
 
   /**
+   * Subclasses of HBaseServer can override this to provide their own
+   * Connection implementations.
+   */
+  protected Connection getConnection(SocketChannel channel, long time) {
+    return new Connection(channel, time);
+  }
+
+  /**
    * Setup response for the IPC Call.
    *
    * @param response buffer to serialize the response into
@@ -1606,7 +1632,7 @@ public abstract class HBaseServer implem
   private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
 
   /**
-   * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
+   * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
    * If the amount of data is large, it writes to channel in smaller chunks.
    * This is to avoid jdk from creating many direct buffers as the size of
    * buffer increases. This also minimizes extra copies in NIO layer
@@ -1617,16 +1643,21 @@ public abstract class HBaseServer implem
    * @param buffer buffer to write
    * @return number of bytes written
    * @throws java.io.IOException e
-   * @see WritableByteChannel#write(ByteBuffer)
+   * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
    */
-  protected static int channelWrite(WritableByteChannel channel,
+  protected int channelWrite(WritableByteChannel channel,
                                     ByteBuffer buffer) throws IOException {
-    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+
+    int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
            channel.write(buffer) : channelIO(null, channel, buffer);
+    if (count > 0) {
+      rpcMetrics.sentBytes.inc(count);
+    }
+    return count;
   }
 
   /**
-   * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
+   * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
    * If the amount of data is large, it writes to channel in smaller chunks.
    * This is to avoid jdk from creating many direct buffers as the size of
    * ByteBuffer increases. There should not be any performance degredation.
@@ -1635,17 +1666,22 @@ public abstract class HBaseServer implem
    * @param buffer buffer to write
    * @return number of bytes written
    * @throws java.io.IOException e
-   * @see ReadableByteChannel#read(ByteBuffer)
+   * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
    */
-  protected static int channelRead(ReadableByteChannel channel,
+  protected int channelRead(ReadableByteChannel channel,
                                    ByteBuffer buffer) throws IOException {
-    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+
+    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
            channel.read(buffer) : channelIO(channel, null, buffer);
+    if (count > 0) {
+      rpcMetrics.receivedBytes.inc(count);
+  }
+    return count;
   }
 
   /**
-   * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
-   * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
+   * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
+   * and {@link #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)}. Only
    * one of readCh or writeCh should be non-null.
    *
    * @param readCh read channel
@@ -1653,8 +1689,8 @@ public abstract class HBaseServer implem
    * @param buf buffer to read or write into/out of
    * @return bytes written
    * @throws java.io.IOException e
-   * @see #channelRead(ReadableByteChannel, ByteBuffer)
-   * @see #channelWrite(WritableByteChannel, ByteBuffer)
+   * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
+   * @see #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)
    */
   private static int channelIO(ReadableByteChannel readCh,
                                WritableByteChannel writeCh,

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Fri Nov 18 07:34:43 2011
@@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.ClusterSt
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.hbase.security.KerberosInfo;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -37,6 +39,9 @@ import org.apache.hadoop.hbase.util.Pair
  * number in HBaseRPCProtocolVersion
  *
  */
+@KerberosInfo(
+    serverPrincipal = "hbase.master.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
 public interface HMasterInterface extends VersionedProtocol {
   /**
    * This Interfaces' version. Version changes when the Interface changes.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java Fri Nov 18 07:34:43 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.HServerLoad;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.security.KerberosInfo;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 
@@ -30,6 +31,9 @@ import org.apache.hadoop.hbase.ipc.Versi
  * The Master publishes this Interface for RegionServers to register themselves
  * on.
  */
+@KerberosInfo(
+    serverPrincipal = "hbase.master.kerberos.principal",
+    clientPrincipal = "hbase.regionserver.kerberos.principal")
 public interface HMasterRegionInterface extends VersionedProtocol {
   /**
    * This Interfaces' version. Version changes when the Interface changes.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Fri Nov 18 07:34:43 2011
@@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.hbase.security.KerberosInfo;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
@@ -55,6 +57,9 @@ import org.apache.hadoop.hbase.ipc.Versi
  * <p>NOTE: if you change the interface, you must change the RPC version
  * number in HBaseRPCProtocolVersion
  */
+@KerberosInfo(
+    serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
 public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortable {
   /**
    * This Interfaces' version. Version changes when the Interface changes.

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,138 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.security.User;
+
+import java.net.InetAddress;
+
+/**
+ * Represents client information (authenticated username, remote address, protocol)
+ * for the currently executing request within a RPC server handler thread.  If
+ * called outside the context of a RPC request, all values will be
+ * <code>null</code>.
+ */
+public class RequestContext {
+  private static ThreadLocal<RequestContext> instance =
+      new ThreadLocal<RequestContext>() {
+        protected RequestContext initialValue() {
+          return new RequestContext(null, null, null);
+        }
+      };
+
+  public static RequestContext get() {
+    return instance.get();
+  }
+
+
+  /**
+   * Returns the user credentials associated with the current RPC request or
+   * <code>null</code> if no credentials were provided.
+   * @return
+   */
+  public static User getRequestUser() {
+    RequestContext ctx = instance.get();
+    if (ctx != null) {
+      return ctx.getUser();
+    }
+    return null;
+  }
+
+  /**
+   * Returns the username for any user associated with the current RPC
+   * request or <code>null</code> if no user is set.
+   */
+  public static String getRequestUserName() {
+    User user = getRequestUser();
+    if (user != null) {
+      return user.getShortName();
+    }
+    return null;
+  }
+
+  /**
+   * Indicates whether or not the current thread is within scope of executing
+   * an RPC request.
+   */
+  public static boolean isInRequestContext() {
+    RequestContext ctx = instance.get();
+    if (ctx != null) {
+      return ctx.isInRequest();
+    }
+    return false;
+  }
+
+  /**
+   * Initializes the client credentials for the current request.
+   * @param user
+   * @param remoteAddress
+   * @param protocol
+   */
+  public static void set(User user,
+      InetAddress remoteAddress,
+      Class<? extends VersionedProtocol> protocol) {
+    RequestContext ctx = instance.get();
+    ctx.user = user;
+    ctx.remoteAddress = remoteAddress;
+    ctx.protocol = protocol;
+    ctx.inRequest = true;
+  }
+
+  /**
+   * Clears out the client credentials for a given request.
+   */
+  public static void clear() {
+    RequestContext ctx = instance.get();
+    ctx.user = null;
+    ctx.remoteAddress = null;
+    ctx.protocol = null;
+    ctx.inRequest = false;
+  }
+
+  private User user;
+  private InetAddress remoteAddress;
+  private Class<? extends VersionedProtocol> protocol;
+  // indicates we're within a RPC request invocation
+  private boolean inRequest;
+
+  private RequestContext(User user, InetAddress remoteAddr,
+      Class<? extends VersionedProtocol> protocol) {
+    this.user = user;
+    this.remoteAddress = remoteAddr;
+    this.protocol = protocol;
+  }
+
+  public User getUser() {
+    return user;
+  }
+
+  public InetAddress getRemoteAddress() {
+    return remoteAddress;
+  }
+
+  public Class<? extends VersionedProtocol> getProtocol() {
+    return protocol;
+  }
+
+  public boolean isInRequest() {
+    return inRequest;
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java Fri Nov 18 07:34:43 2011
@@ -25,7 +25,7 @@ import java.net.InetSocketAddress;
 import javax.net.SocketFactory;
 
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.conf.Configuration;
 
 /** An RPC implementation. */
@@ -34,7 +34,7 @@ interface RpcEngine {
   /** Construct a client-side proxy object. */
   VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
                   long clientVersion, InetSocketAddress addr,
-                  UserGroupInformation ticket, Configuration conf,
+                  User ticket, Configuration conf,
                   SocketFactory factory, int rpcTimeout) throws IOException;
 
   /** Stop this proxy. */
@@ -43,7 +43,7 @@ interface RpcEngine {
   /** Expert: Make multiple, parallel calls to a set of servers. */
   Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
                 Class<? extends VersionedProtocol> protocol,
-                UserGroupInformation ticket, Configuration conf)
+                User ticket, Configuration conf)
     throws IOException, InterruptedException;
 
   /** Construct a server for a protocol implementation instance. */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Fri Nov 18 07:34:43 2011
@@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.util.Obje
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.conf.*;
 
@@ -124,13 +124,13 @@ class WritableRpcEngine implements RpcEn
   private static class Invoker implements InvocationHandler {
     private Class<? extends VersionedProtocol> protocol;
     private InetSocketAddress address;
-    private UserGroupInformation ticket;
+    private User ticket;
     private HBaseClient client;
     private boolean isClosed = false;
     final private int rpcTimeout;
 
     public Invoker(Class<? extends VersionedProtocol> protocol,
-                   InetSocketAddress address, UserGroupInformation ticket,
+                   InetSocketAddress address, User ticket,
                    Configuration conf, SocketFactory factory, int rpcTimeout) {
       this.protocol = protocol;
       this.address = address;
@@ -171,7 +171,7 @@ class WritableRpcEngine implements RpcEn
    * talking to a server at the named address. */
   public VersionedProtocol getProxy(
       Class<? extends VersionedProtocol> protocol, long clientVersion,
-      InetSocketAddress addr, UserGroupInformation ticket,
+      InetSocketAddress addr, User ticket,
       Configuration conf, SocketFactory factory, int rpcTimeout)
     throws IOException {
 
@@ -205,7 +205,7 @@ class WritableRpcEngine implements RpcEn
   public Object[] call(Method method, Object[][] params,
                        InetSocketAddress[] addrs,
                        Class<? extends VersionedProtocol> protocol,
-                       UserGroupInformation ticket, Configuration conf)
+                       User ticket, Configuration conf)
     throws IOException, InterruptedException {
 
     Invocation[] invocations = new Invocation[params.length];

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java Fri Nov 18 07:34:43 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -90,6 +91,12 @@ public class TableMapReduceUtil {
         e.printStackTrace();
       }
     }
+    try {
+      initCredentials(job);
+    } catch (IOException ioe) {
+      // just spit out the stack trace?  really?
+      ioe.printStackTrace();
+    }
   }
 
   /**
@@ -158,6 +165,18 @@ public class TableMapReduceUtil {
     if (addDependencyJars) {
       addDependencyJars(job);
     }
+    initCredentials(job);
+  }
+
+  public static void initCredentials(JobConf job) throws IOException {
+    if (User.isHBaseSecurityEnabled(job)) {
+      try {
+        User.getCurrent().obtainAuthTokenForJob(job);
+      } catch (InterruptedException ie) {
+        ie.printStackTrace();
+        Thread.interrupted();
+      }
+    }
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Fri Nov 18 07:34:43 2011
@@ -40,9 +40,11 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -133,6 +135,7 @@ public class TableMapReduceUtil {
     if (addDependencyJars) {
       addDependencyJars(job);
     }
+    initCredentials(job);
   }
   
   /**
@@ -211,6 +214,17 @@ public class TableMapReduceUtil {
               outputValueClass, job, addDependencyJars, TableInputFormat.class);
   }
 
+  public static void initCredentials(Job job) throws IOException {
+    if (User.isHBaseSecurityEnabled(job.getConfiguration())) {
+      try {
+        User.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job);
+      } catch (InterruptedException ie) {
+        LOG.info("Interrupted obtaining user authentication token");
+        Thread.interrupted();
+      }
+    }
+  }
+
   /**
    * Writes the given scan into a Base64 encoded string.
    *
@@ -364,6 +378,8 @@ public class TableMapReduceUtil {
     if (addDependencyJars) {
       addDependencyJars(job);
     }
+
+    initCredentials(job);
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri Nov 18 07:34:43 2011
@@ -244,14 +244,15 @@ implements HMasterInterface, HMasterRegi
     setName(MASTER + "-" + this.serverName.toString());
 
     Replication.decorateMasterConfiguration(this.conf);
-    this.rpcServer.startThreads();
 
     // Hack! Maps DFSClient => Master for logs.  HDFS made this
     // config param for task trackers, but we can piggyback off of it.
     if (this.conf.get("mapred.task.id") == null) {
       this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
     }
+
     this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
+    this.rpcServer.startThreads();
     this.metrics = new MasterMetrics(getServerName().toString());
   }
 

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates Kerberos related information to be used for authorizing connections
+ * over a given RPC protocol interface.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface KerberosInfo {
+  /** Key for getting server's Kerberos principal name from Configuration */
+  String serverPrincipal();
+  String clientPrincipal() default "";
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates Token related information to be used in authorizing connections
+ * over a given RPC protocol interface.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TokenInfo {
+  /** The type of Token.getKind() to be handled */
+  String value();
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java?rev=1203515&r1=1203514&r2=1203515&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java Fri Nov 18 07:34:43 2011
@@ -33,6 +33,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
+
 import org.apache.commons.logging.Log;
 
 /**
@@ -47,6 +48,9 @@ import org.apache.commons.logging.Log;
  * </p>
  */
 public abstract class User {
+  public static final String HBASE_SECURITY_CONF_KEY =
+      "hbase.security.authentication";
+
   /**
    * Flag to differentiate between API-incompatible changes to
    * {@link org.apache.hadoop.security.UserGroupInformation} between vanilla
@@ -61,8 +65,13 @@ public abstract class User {
     }
   }
   private static Log LOG = LogFactory.getLog(User.class);
+
   protected UserGroupInformation ugi;
 
+  public UserGroupInformation getUGI() {
+    return ugi;
+  }
+
   /**
    * Returns the full user name.  For Kerberos principals this will include
    * the host and realm portions of the principal name.
@@ -73,6 +82,15 @@ public abstract class User {
   }
 
   /**
+   * Returns the list of groups of which this user is a member.  On secure
+   * Hadoop this returns the group information for the user as resolved on the
+   * server.  For 0.20 based Hadoop, the group names are passed from the client.
+   */
+  public String[] getGroupNames() {
+    return ugi.getGroupNames();
+  }
+
+  /**
    * Returns the shortened version of the user name -- the portion that maps
    * to an operating system user name.
    * @return Short name
@@ -90,6 +108,24 @@ public abstract class User {
   public abstract <T> T runAs(PrivilegedExceptionAction<T> action)
       throws IOException, InterruptedException;
 
+  /**
+   * Requests an authentication token for this user and stores it in the
+   * user's credentials.
+   *
+   * @throws IOException
+   */
+  public abstract void obtainAuthTokenForJob(Configuration conf, Job job)
+      throws IOException, InterruptedException;
+
+  /**
+   * Requests an authentication token for this user and stores it in the
+   * user's credentials.
+   *
+   * @throws IOException
+   */
+  public abstract void obtainAuthTokenForJob(JobConf job)
+      throws IOException, InterruptedException;
+
   public String toString() {
     return ugi.toString();
   }
@@ -104,13 +140,29 @@ public abstract class User {
     } else {
       user = new HadoopUser();
     }
-    if (user.ugi == null) {
+    if (user.getUGI() == null) {
       return null;
     }
     return user;
   }
 
   /**
+   * Wraps an underlying {@code UserGroupInformation} instance.
+   * @param ugi The base Hadoop user
+   * @return
+   */
+  public static User create(UserGroupInformation ugi) {
+    if (ugi == null) {
+      return null;
+    }
+
+    if (IS_SECURE_HADOOP) {
+      return new SecureHadoopUser(ugi);
+    }
+    return new HadoopUser(ugi);
+  }
+
+  /**
    * Generates a new {@code User} instance specifically for use in test code.
    * @param name the full username
    * @param groups the group names to which the test user will belong
@@ -150,8 +202,8 @@ public abstract class User {
   }
 
   /**
-   * Returns whether or not Kerberos authentication is configured.  For
-   * non-secure Hadoop, this always returns <code>false</code>.
+   * Returns whether or not Kerberos authentication is configured for Hadoop.
+   * For non-secure Hadoop, this always returns <code>false</code>.
    * For secure Hadoop, it will return the value from
    * {@code UserGroupInformation.isSecurityEnabled()}.
    */
@@ -163,6 +215,15 @@ public abstract class User {
     }
   }
 
+  /**
+   * Returns whether or not secure authentication is enabled for HBase
+   * (whether <code>hbase.security.authentication</code> is set to
+   * <code>kerberos</code>.
+   */
+  public static boolean isHBaseSecurityEnabled(Configuration conf) {
+    return "kerberos".equalsIgnoreCase(conf.get(HBASE_SECURITY_CONF_KEY));
+  }
+
   /* Concrete implementations */
 
   /**
@@ -201,7 +262,7 @@ public abstract class User {
 
     @Override
     public String getShortName() {
-      return ugi.getUserName();
+      return ugi != null ? ugi.getUserName() : null;
     }
 
     @Override
@@ -260,6 +321,20 @@ public abstract class User {
       return result;
     }
 
+    @Override
+    public void obtainAuthTokenForJob(Configuration conf, Job job)
+        throws IOException, InterruptedException {
+      // this is a no-op.  token creation is only supported for kerberos
+      // authenticated clients
+    }
+
+    @Override
+    public void obtainAuthTokenForJob(JobConf job)
+        throws IOException, InterruptedException {
+      // this is a no-op.  token creation is only supported for kerberos
+      // authenticated clients
+    }
+
     /** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
     public static User createUserForTesting(Configuration conf,
         String name, String[] groups) {
@@ -311,6 +386,8 @@ public abstract class User {
    * 0.20 and versions 0.21 and above.
    */
   private static class SecureHadoopUser extends User {
+    private String shortName;
+
     private SecureHadoopUser() throws IOException {
       try {
         ugi = (UserGroupInformation) callStatic("getCurrentUser");
@@ -330,8 +407,11 @@ public abstract class User {
 
     @Override
     public String getShortName() {
+      if (shortName != null) return shortName;
+
       try {
-        return (String)call(ugi, "getShortUserName", null, null);
+        shortName = (String)call(ugi, "getShortUserName", null, null);
+        return shortName;
       } catch (RuntimeException re) {
         throw re;
       } catch (Exception e) {
@@ -372,6 +452,55 @@ public abstract class User {
       }
     }
 
+    @Override
+    public void obtainAuthTokenForJob(Configuration conf, Job job)
+        throws IOException, InterruptedException {
+      try {
+        Class c = Class.forName(
+            "org.apache.hadoop.hbase.security.token.TokenUtil");
+        Methods.call(c, null, "obtainTokenForJob",
+            new Class[]{Configuration.class, UserGroupInformation.class,
+                Job.class},
+            new Object[]{conf, ugi, job});
+      } catch (ClassNotFoundException cnfe) {
+        throw new RuntimeException("Failure loading TokenUtil class, "
+            +"is secure RPC available?", cnfe);
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (InterruptedException ie) {
+        throw ie;
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected error calling TokenUtil.obtainAndCacheToken()");
+      }
+    }
+
+    @Override
+    public void obtainAuthTokenForJob(JobConf job)
+        throws IOException, InterruptedException {
+      try {
+        Class c = Class.forName(
+            "org.apache.hadoop.hbase.security.token.TokenUtil");
+        Methods.call(c, null, "obtainTokenForJob",
+            new Class[]{JobConf.class, UserGroupInformation.class},
+            new Object[]{job, ugi});
+      } catch (ClassNotFoundException cnfe) {
+        throw new RuntimeException("Failure loading TokenUtil class, "
+            +"is secure RPC available?", cnfe);
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (InterruptedException ie) {
+        throw ie;
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected error calling TokenUtil.obtainAndCacheToken()");
+      }
+    }
+
     /** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
     public static User createUserForTesting(Configuration conf,
         String name, String[] groups) {

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java?rev=1203515&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java Fri Nov 18 07:34:43 2011
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles coordination of a single "leader" instance among many possible
+ * candidates.  The first {@code ZKLeaderManager} to successfully create
+ * the given znode becomes the leader, allowing the instance to continue
+ * with whatever processing must be protected.  Other {@ZKLeaderManager}
+ * instances will wait to be notified of changes to the leader znode.
+ * If the current master instance fails, the ephemeral leader znode will
+ * be removed, and all waiting instances will be notified, with the race
+ * to claim the leader znode beginning all over again.
+ */
+public class ZKLeaderManager extends ZooKeeperListener {
+  private static Log LOG = LogFactory.getLog(ZKLeaderManager.class);
+
+  private final AtomicBoolean leaderExists = new AtomicBoolean();
+  private String leaderZNode;
+  private byte[] nodeId;
+  private Stoppable candidate;
+
+  public ZKLeaderManager(ZooKeeperWatcher watcher, String leaderZNode,
+      byte[] identifier, Stoppable candidate) {
+    super(watcher);
+    this.leaderZNode = leaderZNode;
+    this.nodeId = identifier;
+    this.candidate = candidate;
+  }
+
+  public void start() {
+    try {
+      watcher.registerListener(this);
+      String parent = ZKUtil.getParent(leaderZNode);
+      if (ZKUtil.checkExists(watcher, parent) < 0) {
+        ZKUtil.createWithParents(watcher, parent);
+      }
+    } catch (KeeperException ke) {
+      watcher.abort("Unhandled zk exception when starting", ke);
+      candidate.stop("Unhandled zk exception starting up: "+ke.getMessage());
+    }
+  }
+
+  @Override
+  public void nodeCreated(String path) {
+    if (leaderZNode.equals(path) && !candidate.isStopped()) {
+      handleLeaderChange();
+    }
+  }
+
+  @Override
+  public void nodeDeleted(String path) {
+    if (leaderZNode.equals(path) && !candidate.isStopped()) {
+      handleLeaderChange();
+    }
+  }
+
+  private void handleLeaderChange() {
+    try {
+      synchronized(leaderExists) {
+        if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
+          LOG.info("Found new leader for znode: "+leaderZNode);
+          leaderExists.set(true);
+        } else {
+          LOG.info("Leader change, but no new leader found");
+          leaderExists.set(false);
+          leaderExists.notifyAll();
+        }
+      }
+    } catch (KeeperException ke) {
+      watcher.abort("ZooKeeper error checking for leader znode", ke);
+      candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage());
+    }
+  }
+
+  /**
+   * Blocks until this instance has claimed the leader ZNode in ZooKeeper
+   */
+  public void waitToBecomeLeader() {
+    while (!candidate.isStopped()) {
+      try {
+        if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
+          // claimed the leader znode
+          leaderExists.set(true);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Claimed the leader znode as '"+
+                Bytes.toStringBinary(nodeId)+"'");
+          }
+          return;
+        }
+
+        // if claiming the node failed, there should be another existing node
+        byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
+        if (currentId != null && Bytes.equals(currentId, nodeId)) {
+          // claimed with our ID, but we didn't grab it, possibly restarted?
+          LOG.info("Found existing leader with our ID ("+
+              Bytes.toStringBinary(nodeId)+"), removing");
+          ZKUtil.deleteNode(watcher, leaderZNode);
+          leaderExists.set(false);
+        } else {
+          LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId));
+          leaderExists.set(true);
+        }
+      } catch (KeeperException ke) {
+        watcher.abort("Unexpected error from ZK, stopping candidate", ke);
+        candidate.stop("Unexpected error from ZK: "+ke.getMessage());
+        return;
+      }
+
+      // wait for next chance
+      synchronized(leaderExists) {
+        while (leaderExists.get() && !candidate.isStopped()) {
+          try {
+            leaderExists.wait();
+          } catch (InterruptedException ie) {
+            LOG.debug("Interrupted waiting on leader", ie);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the leader znode, if it is currently claimed by this instance.
+   */
+  public void stepDownAsLeader() {
+    try {
+      synchronized(leaderExists) {
+        if (!leaderExists.get()) {
+          return;
+        }
+        byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
+        if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
+          LOG.info("Stepping down as leader");
+          ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
+          leaderExists.set(false);
+        } else {
+          LOG.info("Not current leader, no need to step down");
+        }
+      }
+    } catch (KeeperException ke) {
+      watcher.abort("Unhandled zookeeper exception removing leader node", ke);
+      candidate.stop("Unhandled zookeeper exception removing leader node: "
+          + ke.getMessage());
+    }
+  }
+
+  public boolean hasLeader() {
+    return leaderExists.get();
+  }
+}



Mime
View raw message