hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1428119 - in /hbase/branches/0.94: security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
Date Thu, 03 Jan 2013 00:14:42 GMT
Author: larsh
Date: Thu Jan  3 00:14:41 2013
New Revision: 1428119

URL: http://svn.apache.org/viewvc?rev=1428119&view=rev
Log:
HBASE-7442 HBase remote CopyTable not working when security enabled (James Kinley)

Modified:
    hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java

Modified: hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java?rev=1428119&r1=1428118&r2=1428119&view=diff
==============================================================================
--- hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
(original)
+++ hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
Thu Jan  3 00:14:41 2013
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
@@ -72,33 +73,31 @@ public class SecureRpcEngine implements 
     super();
   }                                  // no public ctor
 
-  /* Cache a client using its socket factory as the hash key */
+  /* Cache a client using the configured clusterId */
   static private class ClientCache {
-    private Map<SocketFactory, SecureClient> clients =
-      new HashMap<SocketFactory, SecureClient>();
+    private Map<String, SecureClient> clients =
+      new HashMap<String, SecureClient>();
 
     protected ClientCache() {}
 
     /**
-     * Construct & cache an IPC client with the user-provided SocketFactory
-     * if no cached client exists.
+     * Construct & cache an IPC client with the configured
+     * {@link HConstants#CLUSTER_ID} if no cached client exists.
      *
-     * @param conf Configuration
-     * @param factory socket factory
+     * @param conf
+     *          Configuration
+     * @param factory
+     *          socket factory
      * @return an IPC client
      */
     protected synchronized SecureClient getClient(Configuration conf,
         SocketFactory factory) {
-      // Construct & cache client.  The configuration is only used for timeout,
-      // and Clients have connection pools.  So we can either (a) lose some
-      // connection pooling and leak sockets, or (b) use the same timeout for all
-      // configurations.  Since the IPC is usually intended globally, not
-      // per-job, we choose (a).
-      SecureClient client = clients.get(factory);
+      String clusterId = conf.get(HConstants.CLUSTER_ID, "default");
+      SecureClient client = clients.get(clusterId);
       if (client == null) {
         // Make an hbase client instead of hadoop Client.
         client = new SecureClient(HbaseObjectWritable.class, conf, factory);
-        clients.put(factory, client);
+        clients.put(clusterId, client);
       } else {
         client.incCount();
       }
@@ -106,10 +105,11 @@ public class SecureRpcEngine implements 
     }
 
     /**
-     * Construct & cache an IPC client with the default SocketFactory
-     * if no cached client exists.
+     * Construct & cache an IPC client with the configured
+     * {@link HConstants#CLUSTER_ID} if no cached client exists.
      *
-     * @param conf Configuration
+     * @param conf
+     *          Configuration
      * @return an IPC client
      */
     protected synchronized SecureClient getClient(Configuration conf) {
@@ -125,7 +125,7 @@ public class SecureRpcEngine implements 
       synchronized (this) {
         client.decCount();
         if (client.isZeroReference()) {
-          clients.remove(client.getSocketFactory());
+          clients.remove(client.getClusterId());
         }
       }
       if (client.isZeroReference()) {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1428119&r1=1428118&r2=1428119&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Thu Jan
 3 00:14:41 2013
@@ -1202,4 +1202,11 @@ public class HBaseClient {
              (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
     }
   }
+
+  /**
+   * @return the clusterId
+   */
+  public String getClusterId() {
+    return clusterId;
+  }
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1428119&r1=1428118&r2=1428119&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
Thu Jan  3 00:14:41 2013
@@ -219,6 +219,19 @@ public class TableMapReduceUtil {
   public static void initCredentials(Job job) throws IOException {
     if (User.isHBaseSecurityEnabled(job.getConfiguration())) {
       try {
+        // init credentials for remote cluster
+        String quorumAddress = job.getConfiguration().get(
+            TableOutputFormat.QUORUM_ADDRESS);
+        if (quorumAddress != null) {
+          String[] parts = ZKUtil.transformClusterKey(quorumAddress);
+          Configuration peerConf = HBaseConfiguration.create(job
+              .getConfiguration());
+          peerConf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
+          peerConf.set("hbase.zookeeper.client.port", parts[1]);
+          peerConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
+          User.getCurrent().obtainAuthTokenForJob(peerConf, job);
+        }
+        
         User.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job);
       } catch (InterruptedException ie) {
         LOG.info("Interrupted obtaining user authentication token");



Mime
View raw message