flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2343. Add Kerberos and user impersonation support to Dataset Sink.
Date Wed, 12 Mar 2014 18:41:19 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 63d26c19a -> 96b090b51


FLUME-2343. Add Kerberos and user impersonation support to Dataset Sink.

(Ryan Blue via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/96b090b5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/96b090b5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/96b090b5

Branch: refs/heads/trunk
Commit: 96b090b5117c34bba9f5104b47d005fe1c10c775
Parents: 63d26c1
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Wed Mar 12 11:40:04 2014 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Wed Mar 12 11:41:11 2014 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  24 +--
 .../org/apache/flume/sink/kite/DatasetSink.java |  24 ++-
 .../flume/sink/kite/DatasetSinkConstants.java   |   6 +
 .../apache/flume/sink/kite/KerberosUtil.java    | 176 +++++++++++++++++++
 4 files changed, 218 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/96b090b5/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index cedb283..4bcd8a2 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2047,16 +2047,20 @@ Note 2: In some cases, file rolling may occur slightly after the roll
interval
 has been exceeded. However, this delay will not exceed 5 seconds. In most
 cases, the delay is neglegible.
 
-=====================  =======  ===========================================================
-Property Name          Default  Description
-=====================  =======  ===========================================================
-**channel**            --
-**type**               --       Must be org.apache.flume.sink.kite.DatasetSink
-**kite.repo.uri**      --       URI of the repository to open
-**kite.dataset.name**  --       Name of the Dataset where records will be written
-kite.batchSize         100      Number of records to process in each batch
-kite.rollInterval      30       Maximum wait time (seconds) before data files are released
-=====================  =======  ===========================================================
+=======================  =======  ===========================================================
+Property Name            Default  Description
+=======================  =======  ===========================================================
+**channel**              --
+**type**                 --       Must be org.apache.flume.sink.kite.DatasetSink
+**kite.repo.uri**        --       URI of the repository to open
+**kite.dataset.name**    --       Name of the Dataset where records will be written
+kite.batchSize           100      Number of records to process in each batch
+kite.rollInterval        30       Maximum wait time (seconds) before data files are released
+auth.kerberosPrincipal   --       Kerberos user principal for secure authentication to HDFS
+auth.kerberosKeytab      --       Kerberos keytab location (local FS) for the principal
+auth.proxyUser           --       The effective user for HDFS actions, if different from
+                                  the kerberos principal
+=======================  =======  ===========================================================
 
 Custom Sink
 ~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/flume/blob/96b090b5/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index 1ee0a1f..ed1b8d0 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URL;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -47,6 +48,7 @@ import org.apache.flume.sink.AbstractSink;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetRepositories;
 import org.kitesdk.data.DatasetWriter;
@@ -69,8 +71,10 @@ public class DatasetSink extends AbstractSink implements Configurable {
   private String repositoryURI = null;
   private String datasetName = null;
   private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
+
   private Dataset<Object> targetDataset = null;
   private DatasetWriter<Object> writer = null;
+  private UserGroupInformation login = null;
   private SinkCounter counter = null;
 
   // for rolling files at a given interval
@@ -130,14 +134,30 @@ public class DatasetSink extends AbstractSink implements Configurable
{
 
   @Override
   public void configure(Context context) {
+    // initialize login credentials
+    this.login = KerberosUtil.login(
+        context.getString(DatasetSinkConstants.AUTH_PRINCIPAL),
+        context.getString(DatasetSinkConstants.AUTH_KEYTAB));
+    String effectiveUser =
+        context.getString(DatasetSinkConstants.AUTH_PROXY_USER);
+    if (effectiveUser != null) {
+      this.login = KerberosUtil.proxyAs(effectiveUser, login);
+    }
+
     this.repositoryURI = context.getString(
         DatasetSinkConstants.CONFIG_KITE_REPO_URI);
     Preconditions.checkNotNull(repositoryURI, "Repository URI is missing");
     this.datasetName = context.getString(
         DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
     Preconditions.checkNotNull(datasetName, "Dataset name is missing");
-    this.targetDataset = DatasetRepositories.open(repositoryURI)
-        .load(datasetName);
+
+    this.targetDataset = KerberosUtil.runPrivileged(login,
+        new PrivilegedExceptionAction<Dataset<Object>>() {
+      @Override
+      public Dataset<Object> run() {
+        return DatasetRepositories.open(repositoryURI).load(datasetName);
+      }
+    });
 
     String formatName = targetDataset.getDescriptor().getFormat().getName();
     Preconditions.checkArgument(allowedFormats().contains(formatName),

http://git-wip-us.apache.org/repos/asf/flume/blob/96b090b5/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
index 13c776e..09dfab6 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
@@ -48,4 +48,10 @@ public class DatasetSinkConstants {
       "flume.avro.schema.literal";
   public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url";
 
+  /**
+   * Hadoop authentication settings
+   */
+  public static final String AUTH_PROXY_USER = "auth.proxyUser";
+  public static final String AUTH_PRINCIPAL = "auth.kerberosPrincipal";
+  public static final String AUTH_KEYTAB = "auth.kerberosKeytab";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/96b090b5/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
new file mode 100644
index 0000000..92ad141
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.kitesdk.data.DatasetException;
+import org.kitesdk.data.DatasetIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosUtil {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KerberosUtil.class);
+
+  public static class SecurityException extends RuntimeException {
+    private SecurityException(String message) {
+      super(message);
+    }
+
+    private SecurityException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    private SecurityException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  public static UserGroupInformation proxyAs(String username,
+                                             UserGroupInformation login) {
+    Preconditions.checkArgument(username != null && !username.isEmpty(),
+        "Invalid username: " + String.valueOf(username));
+    Preconditions.checkArgument(login != null,
+        "Cannot proxy without an authenticated user");
+
+    // hadoop impersonation works with or without kerberos security
+    return UserGroupInformation.createProxyUser(username, login);
+  }
+
+  /**
+   * Static synchronized method for static Kerberos login. <br/>
+   * Static synchronized due to a thundering herd problem when multiple Sinks
+   * attempt to log in using the same principal at the same time with the
+   * intention of impersonating different users (or even the same user).
+   * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay
+   * attach and it returns:
+   * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote>
+   * In addition, since the underlying Hadoop APIs we are using for
+   * impersonation are static, we define this method as static as well.
+   *
+   * @param principal
+   *         Fully-qualified principal to use for authentication.
+   * @param keytab
+   *         Location of keytab file containing credentials for principal.
+   * @return Logged-in user
+   * @throws SecurityException
+   *         if login fails.
+   * @throws IllegalArgumentException
+   *         if the principal or the keytab is not usable
+   */
+  public static synchronized UserGroupInformation login(String principal,
+                                                 String keytab) {
+    // resolve the requested principal, if it is present
+    String finalPrincipal = null;
+    if (principal != null && !principal.isEmpty()) {
+      try {
+        // resolves _HOST pattern using standard Hadoop search/replace
+        // via DNS lookup when 2nd argument is empty
+        finalPrincipal = SecurityUtil.getServerPrincipal(principal, "");
+      } catch (IOException e) {
+        throw new SecurityException(
+            "Failed to resolve Kerberos principal", e);
+      }
+    }
+
+    // check if there is a user already logged in
+    UserGroupInformation currentUser = null;
+    try {
+      currentUser = UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      // not a big deal but this shouldn't typically happen because it will
+      // generally fall back to the UNIX user
+      LOG.debug("Unable to get login user before Kerberos auth attempt", e);
+    }
+
+    // if the current user is valid (matches the given principal) then use it
+    if (currentUser != null) {
+      if (finalPrincipal == null ||
+          finalPrincipal.equals(currentUser.getUserName())) {
+        LOG.debug("Using existing login for {}: {}",
+            finalPrincipal, currentUser);
+        return currentUser;
+      } else {
+        // be cruel and unusual when user tries to login as multiple principals
+        // this isn't really valid with a reconfigure but this should be rare
+        // enough to warrant a restart of the agent JVM
+        // TODO: find a way to interrogate the entire current config state,
+        // since we don't have to be unnecessarily protective if they switch all
+        // HDFS sinks to use a different principal all at once.
+        throw new SecurityException(
+            "Cannot use multiple Kerberos principals: " + finalPrincipal +
+                " would replace " + currentUser.getUserName());
+      }
+    }
+
+    // prepare for a new login
+    Preconditions.checkArgument(principal != null && !principal.isEmpty(),
+        "Invalid Kerberos principal: " + String.valueOf(principal));
+    Preconditions.checkNotNull(finalPrincipal,
+        "Resolved principal must not be null");
+    Preconditions.checkArgument(keytab != null && !keytab.isEmpty(),
+        "Invalid Kerberos keytab: " + String.valueOf(keytab));
+    File keytabFile = new File(keytab);
+    Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(),
+        "Keytab is not a readable file: " + String.valueOf(keytab));
+
+    try {
+      // attempt static kerberos login
+      LOG.debug("Logging in as {} with {}", finalPrincipal, keytab);
+      UserGroupInformation.loginUserFromKeytab(principal, keytab);
+      return UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      throw new SecurityException("Kerberos login failed", e);
+    }
+  }
+
+  /**
+   * Allow methods to act with the privileges of a login.
+   *
+   * If the login is null, the current privileges will be used.
+   *
+   * @param <T> The return type of the action
+   * @param login UserGroupInformation credentials to use for action
+   * @param action A PrivilegedExceptionAction to perform as another user
+   * @return the T value returned by action.run()
+   */
+  public static <T> T runPrivileged(UserGroupInformation login,
+                                    PrivilegedExceptionAction<T> action) {
+    try {
+      if (login == null) {
+        return action.run();
+      } else {
+        return login.doAs(action);
+      }
+    } catch (IOException ex) {
+      throw new DatasetIOException("Privileged action failed", ex);
+    } catch (InterruptedException ex) {
+      Thread.interrupted();
+      throw new DatasetException(ex);
+    } catch (Exception ex) {
+      throw Throwables.propagate(ex);
+    }
+  }
+}


Mime
View raw message