flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1626: Support Hbase security in Hbase sink
Date Fri, 07 Dec 2012 15:53:01 GMT
Updated Branches:
  refs/heads/flume-1.4 9fb173e43 -> bfbab6286


FLUME-1626: Support Hbase security in Hbase sink

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bfbab628
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bfbab628
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bfbab628

Branch: refs/heads/flume-1.4
Commit: bfbab628664b9386ec24833b287f1a62ac45b694
Parents: 9fb173e
Author: Brock Noland <brock@apache.org>
Authored: Fri Dec 7 09:52:19 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Fri Dec 7 09:52:32 2012 -0600

----------------------------------------------------------------------
 flume-ng-sinks/flume-ng-hbase-sink/pom.xml         |    5 +
 .../org/apache/flume/sink/hbase/HBaseSink.java     |   72 +++++++--
 .../hbase/HBaseSinkConfigurationConstants.java     |    4 +
 .../flume/sink/hbase/HBaseSinkSecurityManager.java |  130 +++++++++++++++
 4 files changed, 198 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/bfbab628/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
index 25422e1..5928ecf 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
@@ -109,6 +109,11 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.flume.flume-ng-sinks</groupId>
+      <artifactId>flume-hdfs-sink</artifactId>
+    </dependency>
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/flume/blob/bfbab628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index 021ecd0..835a69e 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import java.security.PrivilegedExceptionAction;
+import org.apache.hadoop.hbase.security.User;
 
 
 /**
@@ -90,6 +92,9 @@ public class HBaseSink extends AbstractSink implements Configurable {
   private HbaseEventSerializer serializer;
   private String eventSerializerType;
   private Context serializerContext;
+  private String kerberosPrincipal;
+  private String kerberosKeytab;
+  private User hbaseUser;
 
   public HBaseSink(){
     this(HBaseConfiguration.create());
@@ -114,18 +119,32 @@ public class HBaseSink extends AbstractSink implements Configurable
{
       throw new FlumeException("Could not load table, " + tableName +
           " from HBase", e);
     }
-
     try {
-      if(!table.getTableDescriptor().hasFamily(columnFamily)) {
-        throw new IOException("Table " + tableName +
-            " has no such column family " + Bytes.toString(columnFamily));
+      if (HBaseSinkSecurityManager.isSecurityEnabled(config)) {
+        hbaseUser = HBaseSinkSecurityManager.login(config, null,
+                kerberosPrincipal, kerberosKeytab);
       }
-    } catch (IOException e) {
+    } catch (Exception ex) {
+      throw new FlumeException("Failed to login to HBase using "
+              + "provided credentials.", ex);
+    }
+    try {
+      if (!runPrivileged(new PrivilegedExceptionAction<Boolean>() {
+        @Override
+        public Boolean run() throws IOException {
+          return table.getTableDescriptor().hasFamily(columnFamily);
+        }
+      })) {
+        throw new IOException("Table " + tableName
+                + " has no such column family " + Bytes.toString(columnFamily));
+      }
+    } catch (Exception e) {
       //Get getTableDescriptor also throws IOException, so catch the IOException
       //thrown above or by the getTableDescriptor() call.
-      throw new FlumeException("Error getting column family from HBase." +
-          "Please verify that the table "+ tableName +" and Column Family, "
-          + Bytes.toString(columnFamily) + " exists in HBase.", e);
+      throw new FlumeException("Error getting column family from HBase."
+              + "Please verify that the table " + tableName + " and Column Family, "
+              + Bytes.toString(columnFamily) + " exists in HBase, and the"
+              + " current user has permissions to access that table.", e);
     }
 
     super.start();
@@ -176,6 +195,8 @@ public class HBaseSink extends AbstractSink implements Configurable {
       logger.error("Could not instantiate event serializer." , e);
       Throwables.propagate(e);
     }
+    kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB, "");
+    kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL,
"");
   }
 
   @Override
@@ -202,13 +223,27 @@ public class HBaseSink extends AbstractSink implements Configurable
{
     return status;
   }
 
-  private void putEventsAndCommit(List<Row> actions, List<Increment> incs,
+  private void putEventsAndCommit(final List<Row> actions, final List<Increment>
incs,
       Transaction txn) throws EventDeliveryException {
     try {
-      table.batch(actions);
-      for(Increment i: incs){
-        table.increment(i);
-      }
+      runPrivileged(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          table.batch(actions);
+          return null;
+        }
+      });
+
+      runPrivileged(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          for (final Increment i : incs) {
+            table.increment(i);
+          }
+          return null;
+        }
+      });
+
       txn.commit();
       counterGroup.incrementAndGet("transaction.success");
     } catch (Throwable e) {
@@ -235,4 +270,15 @@ public class HBaseSink extends AbstractSink implements Configurable {
       txn.close();
     }
   }
+  private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
+          throws Exception {
+    if(hbaseUser != null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Calling runAs as hbase user: " + hbaseUser.getName());
+      }
+      return hbaseUser.runAs(action);
+    } else {
+      return action.run();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/bfbab628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index 62f7097..463c9c3 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -48,4 +48,8 @@ public class HBaseSinkConfigurationConstants {
 
   public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
 
+  public static final String CONFIG_KEYTAB = "kerberosKeytab";
+
+  public static final String CONFIG_PRINCIPAL = "kerberosPrincipal";
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/bfbab628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
new file mode 100644
index 0000000..8bf0509
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hbase;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import org.apache.flume.FlumeException;
+import org.apache.flume.sink.hdfs.KerberosUser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to handle logging into HBase with the credentials passed in.
+ */
+public class HBaseSinkSecurityManager {
+
+  /*
+   * volatile for safe publication. Since this is updated only by
+   * a single thread (configuration) and read later by the sink threads,
+   * this can just be volatile, no need of Atomic reference.
+   */
+  private volatile static KerberosUser loggedInUser;
+  private static final Logger LOG =
+          LoggerFactory.getLogger(HBaseSinkSecurityManager.class);
+
+  /**
+   * Checks if security is enabled for the HBase cluster.
+   *
+   * @return - true if security is enabled on the HBase cluster and
+   * the underlying HDFS cluster.
+   */
+  public static boolean isSecurityEnabled(Configuration conf) {
+    return User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf);
+  }
+
+  /**
+   * Login the user using the configuration, and the hostname specified to use
+   * for logging in.
+   *
+   * @param conf - Configuration to use for logging the user in.
+   * @param hostname - The hostname to use for logging the user in. If no
+   * hostname is specified (null or empty string), the canonical hostname for
+   * the address returned by {@linkplain InetAddress#getLocalHost()} will be
+   * used.
+   * @return The logged in HBase {@linkplain User}.
+   * @throws IOException if login failed, or hostname lookup failed.
+   */
+  public static synchronized User login(Configuration conf, String hostname,
+          String kerberosPrincipal, String kerberosKeytab) throws IOException {
+    if (kerberosPrincipal.isEmpty()) {
+      String msg = "Login failed, since kerberos principal was not specified.";
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+    if (kerberosKeytab.isEmpty()) {
+      String msg = "Login failed, since kerberos keytab was not specified.";
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    } else {
+      //If keytab is specified, user should want it take effect.
+      //HDFSEventSink will halt when keytab file is non-exist or unreadable
+      File kfile = new File(kerberosKeytab);
+      if (!(kfile.isFile() && kfile.canRead())) {
+        throw new IllegalArgumentException("The keyTab file: "
+                + kerberosKeytab + " is nonexistent or can't read. "
+                + "Please specify a readable keytab file for Kerberos auth.");
+      }
+    }
+    String principal = kerberosPrincipal;
+    try {
+      // resolves _HOST pattern using standard Hadoop search/replace
+      // via DNS lookup when 2nd argument is empty
+      principal = SecurityUtil.getServerPrincipal(kerberosPrincipal,"");
+    } catch (IOException e) {
+      LOG.error("Host lookup error resolving kerberos principal ("
+              + kerberosPrincipal + "). Exception follows.", e);
+      throw e;
+    }
+    Preconditions.checkNotNull(principal, "Principal must not be null");
+    KerberosUser newUser = new KerberosUser(principal, kerberosKeytab);
+    //The HDFS Sink does not allow login credentials to change.
+    //To be uniform, we will do the same thing here.
+    User hbaseUser = null;
+    boolean loggedIn = false;
+    if (loggedInUser != null) {
+      Preconditions.checkArgument(newUser.equals(loggedInUser),
+              "Cannot switch kerberos credentials during a reconfiguration. "
+              + "Please restart the agent to set the new credentials.");
+      try {
+        hbaseUser = User.create(UserGroupInformation.getLoginUser());
+        loggedIn = true;
+      } catch (IOException ex) {
+        LOG.warn("Previous login does not exist, "
+                + "will authenticate against KDC");
+      }
+    }
+    if (!loggedIn) {
+      if (hostname == null || hostname.isEmpty()) {
+        hostname = InetAddress.getLocalHost().getCanonicalHostName();
+      }
+      User.login(conf, kerberosKeytab, principal, hostname);
+      hbaseUser = User.create(UserGroupInformation.getLoginUser());
+      loggedInUser = newUser;
+      //TODO: Set the loggedInUser to the current user.
+      LOG.info("Logged into HBase as user: " + hbaseUser.getName());
+    }
+    return hbaseUser;
+  }
+}


Mime
View raw message