Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9E88EDE92 for ; Fri, 7 Dec 2012 15:53:01 +0000 (UTC) Received: (qmail 95980 invoked by uid 500); 7 Dec 2012 15:53:01 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 95956 invoked by uid 500); 7 Dec 2012 15:53:01 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 95949 invoked by uid 99); 7 Dec 2012 15:53:01 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Dec 2012 15:53:01 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 34D3431D158; Fri, 7 Dec 2012 15:53:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brock@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1626: Support Hbase security in Hbase sink Message-Id: <20121207155301.34D3431D158@tyr.zones.apache.org> Date: Fri, 7 Dec 2012 15:53:01 +0000 (UTC) Updated Branches: refs/heads/flume-1.4 9fb173e43 -> bfbab6286 FLUME-1626: Support Hbase security in Hbase sink (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bfbab628 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bfbab628 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bfbab628 Branch: refs/heads/flume-1.4 Commit: bfbab628664b9386ec24833b287f1a62ac45b694 Parents: 9fb173e Author: Brock Noland Authored: Fri Dec 7 09:52:19 2012 -0600 Committer: Brock Noland Committed: Fri Dec 7 09:52:32 2012 -0600 ---------------------------------------------------------------------- flume-ng-sinks/flume-ng-hbase-sink/pom.xml | 5 + .../org/apache/flume/sink/hbase/HBaseSink.java | 72 +++++++-- .../hbase/HBaseSinkConfigurationConstants.java | 4 + .../flume/sink/hbase/HBaseSinkSecurityManager.java | 130 +++++++++++++++ 4 files changed, 198 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/bfbab628/flume-ng-sinks/flume-ng-hbase-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml index 25422e1..5928ecf 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml +++ b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml @@ -109,6 +109,11 @@ test + + org.apache.flume.flume-ng-sinks + flume-hdfs-sink + + http://git-wip-us.apache.org/repos/asf/flume/blob/bfbab628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index 021ecd0..835a69e 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import java.security.PrivilegedExceptionAction; +import org.apache.hadoop.hbase.security.User; /** @@ -90,6 +92,9 @@ public class HBaseSink extends AbstractSink implements Configurable { private HbaseEventSerializer serializer; private String eventSerializerType; private Context serializerContext; + private String kerberosPrincipal; + private String kerberosKeytab; + private User hbaseUser; public HBaseSink(){ this(HBaseConfiguration.create()); @@ -114,18 +119,32 @@ public class HBaseSink extends AbstractSink implements Configurable { throw new FlumeException("Could not load table, " + tableName + " from HBase", e); } - try { - if(!table.getTableDescriptor().hasFamily(columnFamily)) { - throw new IOException("Table " + tableName + - " has no such column family " + Bytes.toString(columnFamily)); + if (HBaseSinkSecurityManager.isSecurityEnabled(config)) { + hbaseUser = HBaseSinkSecurityManager.login(config, null, + kerberosPrincipal, kerberosKeytab); } - } catch (IOException e) { + } catch (Exception ex) { + throw new FlumeException("Failed to login to HBase using " + + "provided credentials.", ex); + } + try { + if (!runPrivileged(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws IOException { + return table.getTableDescriptor().hasFamily(columnFamily); + } + })) { + throw new IOException("Table " + tableName + + " has no such column family " + Bytes.toString(columnFamily)); + } + } catch (Exception e) { //Get getTableDescriptor also throws IOException, so catch the IOException //thrown above or by the getTableDescriptor() call. - throw new FlumeException("Error getting column family from HBase." + - "Please verify that the table "+ tableName +" and Column Family, " - + Bytes.toString(columnFamily) + " exists in HBase.", e); + throw new FlumeException("Error getting column family from HBase." + + "Please verify that the table " + tableName + " and Column Family, " + + Bytes.toString(columnFamily) + " exists in HBase, and the" + + " current user has permissions to access that table.", e); } super.start(); @@ -176,6 +195,8 @@ public class HBaseSink extends AbstractSink implements Configurable { logger.error("Could not instantiate event serializer." , e); Throwables.propagate(e); } + kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB, ""); + kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL, ""); } @Override @@ -202,13 +223,27 @@ public class HBaseSink extends AbstractSink implements Configurable { return status; } - private void putEventsAndCommit(List actions, List incs, + private void putEventsAndCommit(final List actions, final List incs, Transaction txn) throws EventDeliveryException { try { - table.batch(actions); - for(Increment i: incs){ - table.increment(i); - } + runPrivileged(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + table.batch(actions); + return null; + } + }); + + runPrivileged(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + for (final Increment i : incs) { + table.increment(i); + } + return null; + } + }); + txn.commit(); counterGroup.incrementAndGet("transaction.success"); } catch (Throwable e) { @@ -235,4 +270,15 @@ public class HBaseSink extends AbstractSink implements Configurable { txn.close(); } } + private T runPrivileged(final PrivilegedExceptionAction action) + throws Exception { + if(hbaseUser != null) { + if (logger.isDebugEnabled()) { + logger.debug("Calling runAs as hbase user: " + hbaseUser.getName()); + } + return hbaseUser.runAs(action); + } else { + return action.run(); + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/bfbab628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java index 62f7097..463c9c3 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java @@ -48,4 +48,8 @@ public class HBaseSinkConfigurationConstants { public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE; + public static final String CONFIG_KEYTAB = "kerberosKeytab"; + + public static final String CONFIG_PRINCIPAL = "kerberosPrincipal"; + } http://git-wip-us.apache.org/repos/asf/flume/blob/bfbab628/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java new file mode 100644 index 0000000..8bf0509 --- /dev/null +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.sink.hbase; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import org.apache.flume.FlumeException; +import org.apache.flume.sink.hdfs.KerberosUser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to handle logging into HBase with the credentials passed in. + */ +public class HBaseSinkSecurityManager { + + /* + * volatile for safe publication. Since this is updated only by + * a single thread (configuration) and read later by the sink threads, + * this can just be volatile, no need of Atomic reference. + */ + private volatile static KerberosUser loggedInUser; + private static final Logger LOG = + LoggerFactory.getLogger(HBaseSinkSecurityManager.class); + + /** + * Checks if security is enabled for the HBase cluster. + * + * @return - true if security is enabled on the HBase cluster and + * the underlying HDFS cluster. + */ + public static boolean isSecurityEnabled(Configuration conf) { + return User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf); + } + + /** + * Login the user using the configuration, and the hostname specified to use + * for logging in. + * + * @param conf - Configuration to use for logging the user in. + * @param hostname - The hostname to use for logging the user in. If no + * hostname is specified (null or empty string), the canonical hostname for + * the address returned by {@linkplain InetAddress#getLocalHost()} will be + * used. + * @return The logged in HBase {@linkplain User}. + * @throws IOException if login failed, or hostname lookup failed. + */ + public static synchronized User login(Configuration conf, String hostname, + String kerberosPrincipal, String kerberosKeytab) throws IOException { + if (kerberosPrincipal.isEmpty()) { + String msg = "Login failed, since kerberos principal was not specified."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + if (kerberosKeytab.isEmpty()) { + String msg = "Login failed, since kerberos keytab was not specified."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } else { + //If keytab is specified, user should want it take effect. + //HDFSEventSink will halt when keytab file is non-exist or unreadable + File kfile = new File(kerberosKeytab); + if (!(kfile.isFile() && kfile.canRead())) { + throw new IllegalArgumentException("The keyTab file: " + + kerberosKeytab + " is nonexistent or can't read. " + + "Please specify a readable keytab file for Kerberos auth."); + } + } + String principal = kerberosPrincipal; + try { + // resolves _HOST pattern using standard Hadoop search/replace + // via DNS lookup when 2nd argument is empty + principal = SecurityUtil.getServerPrincipal(kerberosPrincipal,""); + } catch (IOException e) { + LOG.error("Host lookup error resolving kerberos principal (" + + kerberosPrincipal + "). Exception follows.", e); + throw e; + } + Preconditions.checkNotNull(principal, "Principal must not be null"); + KerberosUser newUser = new KerberosUser(principal, kerberosKeytab); + //The HDFS Sink does not allow login credentials to change. + //To be uniform, we will do the same thing here. + User hbaseUser = null; + boolean loggedIn = false; + if (loggedInUser != null) { + Preconditions.checkArgument(newUser.equals(loggedInUser), + "Cannot switch kerberos credentials during a reconfiguration. " + + "Please restart the agent to set the new credentials."); + try { + hbaseUser = User.create(UserGroupInformation.getLoginUser()); + loggedIn = true; + } catch (IOException ex) { + LOG.warn("Previous login does not exist, " + + "will authenticate against KDC"); + } + } + if (!loggedIn) { + if (hostname == null || hostname.isEmpty()) { + hostname = InetAddress.getLocalHost().getCanonicalHostName(); + } + User.login(conf, kerberosKeytab, principal, hostname); + hbaseUser = User.create(UserGroupInformation.getLoginUser()); + loggedInUser = newUser; + //TODO: Set the loggedInUser to the current user. + LOG.info("Logged into HBase as user: " + hbaseUser.getName()); + } + return hbaseUser; + } +}