flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [2/2] flume git commit: FLUME-2631. End to End authentication in Flume
Date Fri, 06 Mar 2015 07:20:09 GMT
FLUME-2631. End to End authentication in Flume

(Johny Rufus via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1b0f051b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1b0f051b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1b0f051b

Branch: refs/heads/flume-1.6
Commit: 1b0f051b610b5a102c869a9d06254258a3de898f
Parents: fa7ead5
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Mar 5 23:19:13 2015 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Mar 5 23:19:54 2015 -0800

----------------------------------------------------------------------
 flume-ng-auth/pom.xml                           |  88 +++++++
 .../flume/api/SecureRpcClientFactory.java       |  40 ++++
 .../apache/flume/api/SecureThriftRpcClient.java | 113 +++++++++
 .../flume/auth/FlumeAuthenticationUtil.java     |  99 ++++++++
 .../apache/flume/auth/FlumeAuthenticator.java   |  45 ++++
 .../flume/auth/KerberosAuthenticator.java       | 233 +++++++++++++++++++
 .../apache/flume/auth/PrivilegedExecutor.java   |  52 +++++
 .../apache/flume/auth/SecurityException.java    |  40 ++++
 .../apache/flume/auth/SimpleAuthenticator.java  |  88 +++++++
 .../java/org/apache/flume/auth/UGIExecutor.java |  80 +++++++
 .../flume/auth/TestFlumeAuthenticator.java      | 128 ++++++++++
 flume-ng-core/pom.xml                           |   5 +
 .../java/org/apache/flume/sink/ThriftSink.java  |  14 +-
 .../org/apache/flume/source/ThriftSource.java   |  67 +++++-
 flume-ng-dist/pom.xml                           |   4 +
 flume-ng-dist/src/main/assembly/bin.xml         |   1 +
 flume-ng-dist/src/main/assembly/src.xml         |   1 +
 .../api/RpcClientConfigurationConstants.java    |   2 +
 .../org/apache/flume/api/ThriftRpcClient.java   |  30 ++-
 flume-ng-sinks/flume-dataset-sink/pom.xml       |   7 -
 .../org/apache/flume/sink/kite/DatasetSink.java |  39 ++--
 .../apache/flume/sink/kite/KerberosUtil.java    | 187 ---------------
 .../flume/sink/kite/TestKerberosUtil.java       | 121 ----------
 .../apache/flume/sink/hdfs/BucketWriter.java    |  37 +--
 .../apache/flume/sink/hdfs/HDFSEventSink.java   | 229 ++----------------
 .../flume/sink/hdfs/TestBucketWriter.java       |  28 ++-
 .../flume/sink/hdfs/TestHDFSEventSink.java      |   2 +-
 .../org/apache/flume/sink/hbase/HBaseSink.java  |  34 +--
 .../sink/hbase/HBaseSinkSecurityManager.java    | 134 -----------
 pom.xml                                         |   7 +
 30 files changed, 1185 insertions(+), 770 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-auth/pom.xml b/flume-ng-auth/pom.xml
new file mode 100644
index 0000000..292731d
--- /dev/null
+++ b/flume-ng-auth/pom.xml
@@ -0,0 +1,88 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>flume-ng-auth</artifactId>
+  <name>Flume Auth</name>
+  <description>Flume Authentication</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <version>2.3.7</version>
+        <inherited>true</inherited>
+        <extensions>true</extensions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>${hadoop.common.artifact.id}</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <version>${hadoop2.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
new file mode 100644
index 0000000..c976458
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.api;
+
+import java.util.Properties;
+
+/**
+ * Factory class to construct Flume {@link RPCClient} implementations.
+ */
+public class SecureRpcClientFactory {
+
+  /**
+   * Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating with
+   * the next hop.
+   * @param props
+   * @return - An {@linkplain org.apache.flume.api.RpcClient} which uses thrift configured with the
+   * given parameters.
+   */
+  public static RpcClient getThriftInstance(Properties props) {
+    ThriftRpcClient client = new SecureThriftRpcClient();
+    client.configure(props);
+    return client;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
new file mode 100644
index 0000000..7316e1b
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.api;
+
+import org.apache.flume.FlumeException;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.FlumeAuthenticator;
+import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.thrift.transport.*;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class SecureThriftRpcClient extends ThriftRpcClient {
+
+  private static final String CLIENT_PRINCIPAL = "client-principal";
+  private static final String CLIENT_KEYTAB = "client-keytab";
+  private static final String SERVER_PRINCIPAL = "server-principal";
+
+  private String serverPrincipal;
+  private FlumeAuthenticator privilegedExecutor;
+
+  @Override
+  protected void configure(Properties properties) throws FlumeException {
+    super.configure(properties);
+    serverPrincipal = properties.getProperty(SERVER_PRINCIPAL);
+    if (serverPrincipal == null || serverPrincipal.isEmpty()) {
+      throw new IllegalArgumentException("Flume in secure mode, but Flume config doesn't "
+              + "specify a server principal to use for Kerberos auth.");
+    }
+    String clientPrincipal = properties.getProperty(CLIENT_PRINCIPAL);
+    String keytab = properties.getProperty(CLIENT_KEYTAB);
+    this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(clientPrincipal, keytab);
+    if(!privilegedExecutor.isAuthenticated()) {
+      throw new FlumeException("Authentication failed in Kerberos mode for " +
+              "principal " + clientPrincipal + " keytab " + keytab);
+    }
+  }
+
+  @Override
+  protected TTransport getTransport(TSocket tsocket) throws Exception {
+    Map<String, String> saslProperties = new HashMap<String, String>();
+    saslProperties.put(Sasl.QOP, "auth");
+    String[] names;
+    try {
+      names = FlumeAuthenticationUtil.splitKerberosName(serverPrincipal);
+    } catch (IOException e) {
+      throw new FlumeException(
+              "Error while trying to resolve Principal name - " + serverPrincipal, e);
+    }
+    return new UgiSaslClientTransport(
+            "GSSAPI", null, names[0], names[1], saslProperties, null, tsocket, privilegedExecutor);
+  }
+
+  /**
+   * This transport wraps the Sasl transports to set up the right UGI context for open().
+   */
+  public static class UgiSaslClientTransport extends TSaslClientTransport {
+    PrivilegedExecutor privilegedExecutor;
+    public UgiSaslClientTransport(String mechanism, String authorizationId,
+                String protocol, String serverName, Map<String, String> props,
+                CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) throws IOException {
+      super(mechanism, authorizationId, protocol, serverName, props, cbh,
+              transport);
+      this.privilegedExecutor = privilegedExecutor;
+    }
+
+    // open the SASL transport with using the current UserGroupInformation
+    // This is needed to get the current login context stored
+    @Override
+    public void open() throws FlumeException {
+      try {
+        this.privilegedExecutor.execute(
+          new PrivilegedExceptionAction<Void>() {
+            public Void run() throws FlumeException {
+              try {
+                UgiSaslClientTransport.super.open();
+              } catch (TTransportException e) {
+                throw new FlumeException("Failed to open SASL transport", e);
+              }
+              return null;
+            }
+          });
+      } catch (InterruptedException e) {
+        throw new FlumeException(
+                "Interrupted while opening underlying transport", e);
+      } catch (Exception e) {
+        throw new FlumeException("Failed to open SASL transport", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
new file mode 100644
index 0000000..02afc0d
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+
+import javax.security.auth.callback.CallbackHandler;
+import java.io.IOException;
+
+/**
+ * FlumeAuthentication utility class that provides methods to get an
+ * Authenticator. If proper credentials are provided KerberosAuthenticator is
+ * returned which can be used to execute as the authenticated principal ,
+ * or else a SimpleAuthenticator which executes without any authentication
+ */
+public class FlumeAuthenticationUtil {
+
+  private FlumeAuthenticationUtil() {}
+
+  private static KerberosAuthenticator kerbAuthenticator;
+
+  /**
+   * If principal and keytab are null, this method returns a SimpleAuthenticator
+   * which executes without authentication. If valid credentials are
+   * provided KerberosAuthenitcator is returned which can be used to execute as
+   * the authenticated principal. Invalid credentials result in
+   * IllegalArgumentException and Failure to authenticate results in SecurityException
+   *
+   * @param principal
+   * @param keytab
+   * @return FlumeAuthenticator
+   *
+   * @throws org.apache.flume.auth.SecurityException
+   */
+  public synchronized static FlumeAuthenticator getAuthenticator(
+          String principal, String keytab) throws SecurityException {
+
+    if(principal == null && keytab == null) {
+      return SimpleAuthenticator.getSimpleAuthenticator();
+    }
+
+    Preconditions.checkArgument(principal != null,
+            "Principal can not be null when keytab is provided");
+    Preconditions.checkArgument(keytab != null,
+            "Keytab can not be null when Principal is provided");
+
+    if(kerbAuthenticator == null) {
+      kerbAuthenticator = new KerberosAuthenticator();
+    }
+    kerbAuthenticator.authenticate(principal, keytab);
+
+    return kerbAuthenticator;
+  }
+
+  /**
+   * Returns the standard SaslGssCallbackHandler from the hadoop common module
+   *
+   * @return CallbackHandler
+   */
+  public static CallbackHandler getSaslGssCallbackHandler() {
+    return new SaslRpcServer.SaslGssCallbackHandler();
+  }
+
+  /**
+   * Resolves the principal using Hadoop common's SecurityUtil and splits
+   * the kerberos principal into three parts user name, host and kerberos realm
+   *
+   * @param principal
+   * @return String[] of username, hostname and kerberos realm
+   * @throws IOException
+   */
+  public static String[] splitKerberosName(String principal) throws IOException {
+    String resolvedPrinc = SecurityUtil.getServerPrincipal(principal, "");
+    return SaslRpcServer.splitKerberosName(resolvedPrinc);
+  }
+}
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java
new file mode 100644
index 0000000..dbe241d
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+/**
+ * FlumeAuthenticator extends on a PrivilegedExecutor providing capabilities to
+ * proxy as a different user
+ */
+public interface FlumeAuthenticator extends PrivilegedExecutor {
+  /**
+   * Returns the current instance if proxyUsername is null or
+   * returns the proxied Executor if proxyUserName is valid
+   * @param proxyUserName
+   * @return PrivilegedExecutor
+   */
+  public PrivilegedExecutor proxyAs(String proxyUserName);
+
+  /**
+   * Returns true, if the underlying Authenticator was obtained by
+   * successful kerberos authentication
+   * @return boolean
+   */
+  public boolean isAuthenticated();
+
+  /**
+   * For Authenticators backed by credentials, this method refreshes the
+   * credentials periodically
+   */
+  public void startCredentialRefresher();
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
new file mode 100644
index 0000000..3244046
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
+/**
+ * A kerberos authenticator, which authenticates using the supplied principal
+ * and keytab and executes with  authenticated privileges
+ */
+class KerberosAuthenticator implements FlumeAuthenticator {
+
+  private static final Logger LOG = LoggerFactory
+          .getLogger(KerberosAuthenticator.class);
+
+  private volatile UserGroupInformation ugi;
+  private volatile PrivilegedExecutor privilegedExecutor;
+  private Map<String, PrivilegedExecutor> proxyCache = new HashMap<String, PrivilegedExecutor>();
+
+
+  @Override
+  public  <T> T execute(PrivilegedAction<T> action) {
+    return privilegedExecutor.execute(action);
+  }
+
+  @Override
+  public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception {
+    return privilegedExecutor.execute(action);
+  }
+
+  @Override
+  public synchronized PrivilegedExecutor proxyAs(String proxyUserName) {
+    if(proxyUserName == null || proxyUserName.isEmpty()) {
+      return this;
+    }
+    if(proxyCache.get(proxyUserName) == null) {
+      UserGroupInformation proxyUgi;
+      proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, ugi);
+      printUGI(proxyUgi);
+      proxyCache.put(proxyUserName, new UGIExecutor(proxyUgi));
+    }
+    return proxyCache.get(proxyUserName);
+  }
+
+  @Override
+  public boolean isAuthenticated() {
+    return true;
+  }
+
+  /**
+   * When valid principal and keytab are provided and if authentication has
+   * not yet been done for this object, this method authenticates the
+   * credentials and populates the ugi. In case of null or invalid credentials
+   * IllegalArgumentException is thrown. In case of failure to authenticate,
+   * SecurityException is thrown. If authentication has already happened on
+   * this KerberosAuthenticator object, then this method checks to see if the current
+   * credentials passed are same as the validated credentials. If not, it throws
+   * an exception as this authenticator can represent only one Principal.
+   *
+   * @param principal
+   * @param keytab
+   */
+  public synchronized void authenticate(String principal, String keytab) {
+    // sanity checking
+
+    Preconditions.checkArgument(principal != null && !principal.isEmpty(),
+            "Invalid Kerberos principal: " + String.valueOf(principal));
+    Preconditions.checkArgument(keytab != null && !keytab.isEmpty(),
+            "Invalid Kerberos keytab: " + String.valueOf(keytab));
+    File keytabFile = new File(keytab);
+    Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(),
+            "Keytab is not a readable file: " + String.valueOf(keytab));
+
+
+    // resolve the requested principal
+    String resolvedPrincipal;
+    try {
+      // resolves _HOST pattern using standard Hadoop search/replace
+      // via DNS lookup when 2nd argument is empty
+      resolvedPrincipal = SecurityUtil.getServerPrincipal(principal, "");
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Host lookup error resolving kerberos principal ("
+              + principal + "). Exception follows.", e);
+    }
+    Preconditions.checkNotNull(resolvedPrincipal,
+            "Resolved Principal must not be null");
+
+
+    // be cruel and unusual when user tries to login as multiple principals
+    // this isn't really valid with a reconfigure but this should be rare
+    // enough to warrant a restart of the agent JVM
+    // TODO: find a way to interrogate the entire current config state,
+    // since we don't have to be unnecessarily protective if they switch all
+    // HDFS sinks to use a different principal all at once.
+
+    Preconditions.checkState(ugi == null || ugi.getUserName().equals(resolvedPrincipal),
+      "Cannot use multiple kerberos principals in the same agent. " +
+      " Must restart agent to use new principal or keytab. " +
+      "Previous = %s, New = %s", ugi, resolvedPrincipal);
+
+
+    // enable the kerberos mode of UGI, before doing anything else
+    if(!UserGroupInformation.isSecurityEnabled()) {
+      Configuration conf = new Configuration(false);
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+      UserGroupInformation.setConfiguration(conf);
+    }
+
+    // We are interested in currently logged in user with kerberos creds
+    UserGroupInformation curUser = null;
+    try {
+      curUser = UserGroupInformation.getLoginUser();
+      if(curUser != null && !curUser.hasKerberosCredentials()) {
+        curUser = null;
+      }
+    } catch (IOException e) {
+      LOG.warn("User unexpectedly had no active login. Continuing with " +
+              "authentication", e);
+    }
+
+    /*
+     *  if ugi is not null,
+     *     if ugi matches currently logged in kerberos user, we are good
+     *     else we are logged out, so relogin our ugi
+     *  else if ugi is null, login and populate state
+     */
+    try {
+      if (ugi != null) {
+        if (curUser != null && curUser.getUserName().equals(ugi.getUserName())) {
+          LOG.debug("Using existing principal login: {}", ugi);
+        } else {
+          LOG.info("Attempting kerberos Re-login as principal ({}) "
+                  , new Object[] { ugi.getUserName() } );
+          ugi.reloginFromKeytab();
+        }
+      } else {
+        LOG.info("Attempting kerberos login as principal ({}) from keytab " +
+                "file ({})", new Object[] { resolvedPrincipal, keytab } );
+        UserGroupInformation.loginUserFromKeytab(resolvedPrincipal, keytab);
+        this.ugi = UserGroupInformation.getLoginUser();
+        this.privilegedExecutor = new UGIExecutor(this.ugi);
+      }
+    } catch (IOException e) {
+      throw new SecurityException("Authentication error while attempting to "
+        + "login as kerberos principal (" + resolvedPrincipal + ") using "
+        + "keytab (" + keytab + "). Exception follows.", e);
+    }
+
+    printUGI(this.ugi);
+  }
+
+  private void printUGI(UserGroupInformation ugi) {
+    if (ugi != null) {
+      // dump login information
+      AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
+      LOG.info("\n{} \nUser: {} \nAuth method: {} \nKeytab: {} \n",
+        new Object[]{ authMethod.equals(AuthenticationMethod.PROXY) ?
+        "Proxy as: " : "Logged as: ", ugi.getUserName(), authMethod,
+        ugi.isFromKeytab() }
+      );
+    }
+  }
+
+  /**
+   * startCredentialRefresher should be used only for long running
+   * methods like Thrift source. For all privileged methods that use a UGI, the
+   * credentials are checked automatically and refreshed before the
+   * privileged method is executed in the UGIExecutor
+   */
+  @Override
+  public void startCredentialRefresher() {
+    int CHECK_TGT_INTERVAL = 120; // seconds
+    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+    scheduler.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          ugi.checkTGTAndReloginFromKeytab();
+        } catch (IOException e) {
+          LOG.warn("Error occured during checkTGTAndReloginFromKeytab() for user " +
+                  ugi.getUserName(), e);
+        }
+      }
+    }, CHECK_TGT_INTERVAL, CHECK_TGT_INTERVAL, TimeUnit.SECONDS);
+  }
+
+  @VisibleForTesting
+  String getUserName() {
+    if(ugi != null) {
+      return ugi.getUserName();
+    } else {
+      return null;
+    }
+  }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java
new file mode 100644
index 0000000..0aa321a
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+
+/**
+ * PrivilegedExecutor provides the ability to execute a PrivilegedAction
+ * or a PrivilegedExceptionAction. Implementors of this class, can chose to execute
+ * in normal mode or secure authenticated mode
+ */
+public interface PrivilegedExecutor {
+  /**
+   * This method is used to execute a privileged action, the implementor can
+   * chose to execute the action using the appropriate privileges
+   *
+   * @param action A PrivilegedExceptionAction to perform as the desired user
+   * @param <T> The return type of the action
+   * @return T the T value returned by action.run()
+   * @throws Exception
+   */
+  public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception;
+
+  /**
+   * This method is used to execute a privileged action, the implementor can
+   * chose to execute the action using the appropriate privileges
+   *
+   * @param action A PrivilegedAction to perform as the desired user
+   * @param <T> The return type of the action
+   * @return T the T value returned by action.run()
+   */
+  public <T> T execute(PrivilegedAction<T> action);
+}
+
+

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java
new file mode 100644
index 0000000..5760481
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+/**
+ * SecurityException thrown in the Flume security module
+ */
+public class SecurityException extends RuntimeException {
+  public SecurityException(String message) {
+    super(message);
+  }
+
+  public SecurityException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SecurityException(Throwable cause) {
+    super(cause);
+  }
+}
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
new file mode 100644
index 0000000..f7b5bea
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A no-op authenticator, which does not authenticate and executes
+ * without any authenticated privileges
+ */
+class SimpleAuthenticator implements FlumeAuthenticator {
+  private SimpleAuthenticator() {}
+
+  private static class SimpleAuthenticatorHolder {
+    public static SimpleAuthenticator authenticator = new SimpleAuthenticator();
+  }
+
+  public static SimpleAuthenticator getSimpleAuthenticator() {
+    return SimpleAuthenticatorHolder.authenticator;
+  }
+
+  private Map<String, PrivilegedExecutor> proxyCache =
+          new HashMap<String, PrivilegedExecutor>();
+
+
+  @Override
+  public <T> T execute(PrivilegedExceptionAction<T> action)
+          throws Exception {
+    return action.run();
+  }
+
+  @Override
+  public <T> T execute(PrivilegedAction<T> action) {
+    return action.run();
+  }
+
+  @Override
+  public synchronized PrivilegedExecutor proxyAs(String proxyUserName) {
+    if(proxyUserName == null || proxyUserName.isEmpty()) {
+      return this;
+    }
+    if(proxyCache.get(proxyUserName) == null) {
+      UserGroupInformation proxyUgi;
+      try {
+        proxyUgi = UserGroupInformation.createProxyUser(proxyUserName,
+                UserGroupInformation.getCurrentUser());
+      } catch (IOException e) {
+        throw new SecurityException("Unable to create proxy User", e);
+      }
+      proxyCache.put(proxyUserName, new UGIExecutor(proxyUgi));
+    }
+    return proxyCache.get(proxyUserName);
+  }
+
+  @Override
+  public boolean isAuthenticated() {
+    return false;
+  }
+
+  @Override
+  public void startCredentialRefresher() {
+    // no-op
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
new file mode 100644
index 0000000..a5aeef2
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+class UGIExecutor implements PrivilegedExecutor {
+  private UserGroupInformation ugi;
+
+  UGIExecutor(UserGroupInformation ugi) {
+    this.ugi = ugi;
+  }
+
+  @Override
+  public <T> T execute(PrivilegedAction<T> action) {
+    ensureValidAuth();
+    return ugi.doAs(action);
+  }
+
+  @Override
+  public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception {
+    ensureValidAuth();
+    try {
+      return ugi.doAs(action);
+    } catch (IOException ex) {
+      throw new SecurityException("Privileged action failed", ex);
+    } catch (InterruptedException ex) {
+      Thread.interrupted();
+      throw new SecurityException(ex);
+    }
+  }
+
+  private void ensureValidAuth() {
+    reloginUGI(ugi);
+    if(ugi.getAuthenticationMethod().equals(AuthenticationMethod.PROXY)) {
+      reloginUGI(ugi.getRealUser());
+    }
+  }
+
+  private void reloginUGI(UserGroupInformation ugi) {
+    try {
+      if(ugi.hasKerberosCredentials()) {
+        ugi.checkTGTAndReloginFromKeytab();
+      }
+    } catch (IOException e) {
+      throw new SecurityException("Error trying to relogin from keytab for user "
+              + ugi.getUserName(), e);
+    }
+  }
+
+  @VisibleForTesting
+  String getUserName() {
+    if(ugi != null) {
+      return ugi.getUserName();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
new file mode 100644
index 0000000..45ba2b0
--- /dev/null
+++ b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestFlumeAuthenticator {
+
+  private static MiniKdc kdc;
+  private static File workDir;
+  private static File flumeKeytab;
+  private static String flumePrincipal = "flume/localhost";
+  private static File aliceKeytab;
+  private static String alicePrincipal = "alice";
+  private static Properties conf;
+
+  @BeforeClass
+  public static void startMiniKdc() throws Exception {
+    workDir = new File(System.getProperty("test.dir", "target"),
+            TestFlumeAuthenticator.class.getSimpleName());
+    flumeKeytab = new File(workDir, "flume.keytab");
+    aliceKeytab = new File(workDir, "alice.keytab");
+    conf = MiniKdc.createConf();
+
+    kdc = new MiniKdc(conf, workDir);
+    kdc.start();
+
+    kdc.createPrincipal(flumeKeytab, flumePrincipal);
+    flumePrincipal = flumePrincipal + "@" + kdc.getRealm();
+
+    kdc.createPrincipal(aliceKeytab, alicePrincipal);
+    alicePrincipal = alicePrincipal + "@" + kdc.getRealm();
+  }
+
+  @AfterClass
+  public static void stopMiniKdc() {
+    if (kdc != null) {
+      kdc.stop();
+    }
+  }
+
+  @Test
+  public void testNullLogin() throws IOException {
+    String principal = null;
+    String keytab = null;
+
+    FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(
+            principal, keytab);
+    assertFalse(authenticator.isAuthenticated());
+  }
+
+  @Test
+  public void testFlumeLogin() throws IOException {
+    String principal = flumePrincipal;
+    String keytab = flumeKeytab.getAbsolutePath();
+    String expResult = principal;
+
+    FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(
+            principal, keytab);
+    assertTrue(authenticator.isAuthenticated());
+
+    String result = ((KerberosAuthenticator)authenticator).getUserName();
+    assertEquals("Initial login failed", expResult, result);
+
+    authenticator = FlumeAuthenticationUtil.getAuthenticator(
+            principal, keytab);
+    result = ((KerberosAuthenticator)authenticator).getUserName();
+    assertEquals("Re-login failed", expResult, result);
+
+    principal = alicePrincipal;
+    keytab = aliceKeytab.getAbsolutePath();
+    try {
+      authenticator = FlumeAuthenticationUtil.getAuthenticator(
+              principal, keytab);
+      result = ((KerberosAuthenticator)authenticator).getUserName();
+      fail("Login should have failed with a new principal: " + result);
+    } catch (Exception ex) {
+      assertTrue("Login with a new principal failed, but for an unexpected "
+          + "reason: " + ex.getMessage(),
+          ex.getMessage().contains("Cannot use multiple kerberos principals"));
+    }
+  }
+
+  @Test
+  public void testProxyAs() throws IOException {
+    String username = "alice";
+
+    String expResult = username;
+    FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(
+            null, null);
+    String result = ((UGIExecutor)(authenticator.proxyAs(username))).getUserName();
+    assertEquals("Proxy as didn't generate the expected username", expResult, result);
+
+    authenticator = FlumeAuthenticationUtil.getAuthenticator(
+            flumePrincipal, flumeKeytab.getAbsolutePath());
+
+    String login = ((KerberosAuthenticator)authenticator).getUserName();
+    assertEquals("Login succeeded, but the principal doesn't match",
+        flumePrincipal, login);
+
+    result = ((UGIExecutor)(authenticator.proxyAs(username))).getUserName();
+    assertEquals("Proxy as didn't generate the expected username", expResult, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 8992414..fe34c03 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -264,6 +264,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-auth</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
index baa60d0..32021d3 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
@@ -23,6 +23,8 @@ import java.util.Properties;
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientConfigurationConstants;
 import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.api.SecureRpcClientFactory;
+
 /**
  * <p>
  * A {@link org.apache.flume.Sink} implementation that can send events to an RPC server (such as
@@ -102,12 +104,18 @@ import org.apache.flume.api.RpcClientFactory;
 public class ThriftSink extends AbstractRpcSink {
   @Override
   protected RpcClient initializeRpcClient(Properties props) {
-    props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
-      RpcClientFactory.ClientType.THRIFT.name());
     // Only one thread is enough, since only one sink thread processes
     // transactions at any given time. Each sink owns its own Rpc client.
     props.setProperty(RpcClientConfigurationConstants
       .CONFIG_CONNECTION_POOL_SIZE, String.valueOf(1));
-    return RpcClientFactory.getInstance(props);
+    boolean enableKerberos =  Boolean.parseBoolean(props.getProperty(
+      RpcClientConfigurationConstants.KERBEROS_KEY, "false"));
+    if(enableKerberos) {
+      return SecureRpcClientFactory.getThriftInstance(props);
+    } else {
+      props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+              RpcClientFactory.ClientType.THRIFT.name());
+      return RpcClientFactory.getInstance(props);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
index 06bb604..1d8bb33 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -26,6 +26,8 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.FlumeException;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.FlumeAuthenticator;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SourceCounter;
@@ -45,12 +47,16 @@ import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLServerSocket;
+import javax.security.sasl.Sasl;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -60,10 +66,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.security.PrivilegedAction;
 
 public class ThriftSource extends AbstractSource implements Configurable,
   EventDrivenSource {
@@ -97,6 +106,10 @@ public class ThriftSource extends AbstractSource implements Configurable,
   private static final String KEYMANAGER_TYPE = "keymanager-type";
   private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
 
+  private static final String KERBEROS_KEY = "kerberos";
+  private static final String AGENT_PRINCIPAL = "agent-principal";
+  private static final String AGENT_KEYTAB = "agent-keytab";
+
   private Integer port;
   private String bindAddress;
   private int maxThreads = 0;
@@ -110,6 +123,9 @@ public class ThriftSource extends AbstractSource implements Configurable,
   private String keyManagerType;
   private final List<String> excludeProtocols = new LinkedList<String>();
   private boolean enableSsl = false;
+  private boolean enableKerberos = false;
+  private String principal;
+  private FlumeAuthenticator flumeAuth;
 
   @Override
   public void configure(Context context) {
@@ -171,6 +187,18 @@ public class ThriftSource extends AbstractSource implements Configurable,
                 "Thrift source configured with invalid keystore: " + keystore, ex);
       }
     }
+
+    principal = context.getString(AGENT_PRINCIPAL);
+    String keytab = context.getString(AGENT_KEYTAB);
+    enableKerberos = context.getBoolean(KERBEROS_KEY, false);
+    this.flumeAuth = FlumeAuthenticationUtil.getAuthenticator(principal, keytab);
+    if(enableKerberos) {
+      if(!flumeAuth.isAuthenticated()) {
+        throw new FlumeException("Authentication failed in Kerberos mode for " +
+                "principal " + principal + " keytab " + keytab);
+      }
+      flumeAuth.startCredentialRefresher();
+    }
   }
 
   @Override
@@ -195,7 +223,15 @@ public class ThriftSource extends AbstractSource implements Configurable,
     servingExecutor.submit(new Runnable() {
       @Override
       public void run() {
-        server.serve();
+        flumeAuth.execute(
+          new PrivilegedAction<Object>() {
+            @Override
+            public Object run() {
+              server.serve();
+              return null;
+            }
+          }
+        );
       }
     });
 
@@ -263,7 +299,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
   }
 
   private TServer getTThreadedSelectorServer() {
-    if(enableSsl) {
+    if(enableSsl || enableKerberos) {
       return null;
     }
     Class<?> serverClass;
@@ -277,6 +313,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
 
       TServerTransport serverTransport = new TNonblockingServerSocket(
               new InetSocketAddress(bindAddress, port));
+
       ExecutorService sourceService;
       ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
               "Flume Thrift IPC Thread %d").build();
@@ -328,14 +365,35 @@ public class ThriftSource extends AbstractSource implements Configurable,
     args.protocolFactory(getProtocolFactory());
 
     //populate the transportFactory
-    args.inputTransportFactory(new TFastFramedTransport.Factory());
-    args.outputTransportFactory(new TFastFramedTransport.Factory());
+    if(enableKerberos) {
+      args.transportFactory(getSASLTransportFactory());
+    } else {
+      args.transportFactory(new TFastFramedTransport.Factory());
+    }
 
     // populate the  Processor
     args.processor(new ThriftSourceProtocol
             .Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
   }
 
+  private TTransportFactory getSASLTransportFactory() {
+    String[] names;
+    try {
+      names = FlumeAuthenticationUtil.splitKerberosName(principal);
+    } catch (IOException e) {
+      throw new FlumeException(
+              "Error while trying to resolve Principal name - " + principal, e);
+    }
+    Map<String, String> saslProperties = new HashMap<String, String>();
+    saslProperties.put(Sasl.QOP, "auth");
+    TSaslServerTransport.Factory saslTransportFactory =
+            new TSaslServerTransport.Factory();
+    saslTransportFactory.addServerDefinition(
+            "GSSAPI", names[0], names[1], saslProperties,
+            FlumeAuthenticationUtil.getSaslGssCallbackHandler());
+    return saslTransportFactory;
+  }
+
   @Override
   public void stop() {
     if(server != null && server.isServing()) {
@@ -402,5 +460,4 @@ public class ThriftSource extends AbstractSource implements Configurable,
       return Status.OK;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index a083fe2..9f7c4f6 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -202,6 +202,10 @@
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-tools</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-auth</artifactId>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-dist/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/src/main/assembly/bin.xml b/flume-ng-dist/src/main/assembly/bin.xml
index 5aa7cc6..a61180d 100644
--- a/flume-ng-dist/src/main/assembly/bin.xml
+++ b/flume-ng-dist/src/main/assembly/bin.xml
@@ -68,6 +68,7 @@
         <exclude>flume-ng-clients/**</exclude>
         <exclude>flume-ng-embedded-agent/**</exclude>
         <exclude>flume-tools/**</exclude>
+        <exclude>flume-ng-auth/**</exclude>
         <exclude>**/target/**</exclude>
         <exclude>**/.classpath</exclude>
         <exclude>**/.project</exclude>

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-dist/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/src/main/assembly/src.xml b/flume-ng-dist/src/main/assembly/src.xml
index b1e79a2..e5f4156 100644
--- a/flume-ng-dist/src/main/assembly/src.xml
+++ b/flume-ng-dist/src/main/assembly/src.xml
@@ -49,6 +49,7 @@
         <include>org.apache.flume:flume-ng-clients</include>
         <include>org.apache.flume:flume-ng-embedded-agent</include>
         <include>org.apache.flume:flume-tools</include>
+        <include>org.apache.flume:flume-ng-auth</include>
       </includes>
 
       <sources>

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 33a2330..343e07b 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -145,6 +145,8 @@ public final class RpcClientConfigurationConstants {
   public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
   public static final String CONFIG_EXCLUDE_PROTOCOLS = "exclude-protocols";
 
+  public static final String KERBEROS_KEY = "kerberos";
+
   /**
    * Configuration constants for the NettyAvroRpcClient
    * NioClientSocketChannelFactory

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
index 4f75a2b..5c4cc41 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -28,6 +28,7 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +74,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
   public static final String CONFIG_PROTOCOL = "protocol";
   public static final String BINARY_PROTOCOL = "binary";
   public static final String COMPACT_PROTOCOL = "compact";
-  
+
   private int batchSize;
   private long requestTimeout;
   private final Lock stateLock;
@@ -83,7 +84,6 @@ public class ThriftRpcClient extends AbstractRpcClient {
   private ConnectionPoolManager connectionManager;
   private final ExecutorService callTimeoutPool;
   private final AtomicLong threadCounter;
-  private int connectionPoolSize;
   private final Random random = new Random();
   private String protocol;
 
@@ -95,7 +95,6 @@ public class ThriftRpcClient extends AbstractRpcClient {
   private static final String TRUSTMANAGER_TYPE = "trustmanager-type";
   private final List<String> excludeProtocols = new LinkedList<String>();
 
-
   public ThriftRpcClient() {
     stateLock = new ReentrantLock(true);
     connState = State.INIT;
@@ -319,7 +318,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
         requestTimeout =
           RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
       }
-      connectionPoolSize = Integer.parseInt(properties.getProperty(
+      int connectionPoolSize = Integer.parseInt(properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE,
         String.valueOf(RpcClientConfigurationConstants
           .DEFAULT_CONNECTION_POOL_SIZE)));
@@ -352,6 +351,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
           }
         }
       }
+
       connectionManager = new ConnectionPoolManager(connectionPoolSize);
       connState = State.READY;
     } catch (Throwable ex) {
@@ -372,33 +372,41 @@ public class ThriftRpcClient extends AbstractRpcClient {
     INIT, READY, DEAD
   }
 
+  protected TTransport getTransport(TSocket tsocket) throws Exception {
+    return new TFastFramedTransport(tsocket);
+  }
+
   /**
    * Wrapper around a client and transport, so we can clean up when this
    * client gets closed.
    */
   private class ClientWrapper {
     public final ThriftSourceProtocol.Client client;
-    public final TFastFramedTransport transport;
+    public final TTransport transport;
     private final int hashCode;
 
     public ClientWrapper() throws Exception{
       TSocket tsocket;
       if(enableSsl) {
-        // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have
-        // to do some magic to make sure that happens. Not an issue in JDK7
-        // Lifted from thrift-0.9.1 to make the SSLContext
-        SSLContext sslContext = createSSLContext(truststore, truststorePassword, trustManagerType, truststoreType);
+        // JDK6's factory doesn't appear to pass the protocol onto the Socket
+        // properly so we have to do some magic to make sure that happens.
+        // Not an issue in JDK7 Lifted from thrift-0.9.1 to make the SSLContext
+        SSLContext sslContext = createSSLContext(truststore, truststorePassword,
+                trustManagerType, truststoreType);
 
         // Create the factory from it
         SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
 
         // Create the TSocket from that
-        tsocket = createSSLSocket(sslSockFactory, hostname, port, 120000, excludeProtocols);
+        tsocket = createSSLSocket(
+                sslSockFactory, hostname, port, 120000, excludeProtocols);
       } else {
         tsocket = new TSocket(hostname, port);
       }
 
-      transport = new TFastFramedTransport(tsocket);
+
+     transport = getTransport(tsocket);
+
       // The transport is already open for SSL as part of TSSLTransportFactory.getClientSocket
       if(!transport.isOpen()) {
         transport.open();

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-dataset-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml b/flume-ng-sinks/flume-dataset-sink/pom.xml
index ad3f603..92f7021 100644
--- a/flume-ng-sinks/flume-dataset-sink/pom.xml
+++ b/flume-ng-sinks/flume-dataset-sink/pom.xml
@@ -150,13 +150,6 @@ limitations under the License.
     </dependency>
 
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minikdc</artifactId>
-      <version>${hadoop2.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index fd9f991..a9f42b8 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flume.sink.kite;
 
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.sink.kite.parser.EntityParserFactory;
 import org.apache.flume.sink.kite.parser.EntityParser;
 import org.apache.flume.sink.kite.policy.FailurePolicy;
@@ -25,8 +27,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+
 import java.net.URI;
-import java.security.PrivilegedExceptionAction;
+import java.security.PrivilegedAction;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.avro.Schema;
@@ -40,7 +43,6 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
 import org.kitesdk.data.DatasetIOException;
@@ -72,7 +74,7 @@ public class DatasetSink extends AbstractSink implements Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
 
   private Context context = null;
-  private UserGroupInformation login = null;
+  private PrivilegedExecutor privilegedExecutor;
 
   private String datasetName = null;
   private URI datasetUri = null;
@@ -159,15 +161,12 @@ public class DatasetSink extends AbstractSink implements Configurable {
   public void configure(Context context) {
     this.context = context;
 
-    // initialize login credentials
-    this.login = KerberosUtil.login(
-        context.getString(AUTH_PRINCIPAL),
-        context.getString(AUTH_KEYTAB));
-    String effectiveUser
-        = context.getString(AUTH_PROXY_USER);
-    if (effectiveUser != null) {
-      this.login = KerberosUtil.proxyAs(effectiveUser, login);
-    }
+    String principal = context.getString(AUTH_PRINCIPAL);
+    String keytab = context.getString(AUTH_KEYTAB);
+    String effectiveUser = context.getString(AUTH_PROXY_USER);
+
+    this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
+            principal, keytab).proxyAs(effectiveUser);
 
     // Get the dataset URI and name from the context
     String datasetURI = context.getString(CONFIG_KITE_DATASET_URI);
@@ -395,13 +394,15 @@ public class DatasetSink extends AbstractSink implements Configurable {
     // reset the commited flag whenver a new writer is created
     committedBatch = false;
     try {
-      View<GenericRecord> view = KerberosUtil.runPrivileged(login,
-          new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
-            @Override
-            public Dataset<GenericRecord> run() {
-              return Datasets.load(datasetUri);
-            }
-          });
+      View<GenericRecord> view;
+
+      view = privilegedExecutor.execute(
+        new PrivilegedAction<Dataset<GenericRecord>>() {
+          @Override
+          public Dataset<GenericRecord> run() {
+            return Datasets.load(datasetUri);
+          }
+        });
 
       DatasetDescriptor descriptor = view.getDataset().getDescriptor();
       Format format = descriptor.getFormat();

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
deleted file mode 100644
index c0dbffb..0000000
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.kitesdk.data.DatasetException;
-import org.kitesdk.data.DatasetIOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KerberosUtil {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KerberosUtil.class);
-
-  public static class SecurityException extends RuntimeException {
-    private SecurityException(String message) {
-      super(message);
-    }
-
-    private SecurityException(String message, Throwable cause) {
-      super(message, cause);
-    }
-
-    private SecurityException(Throwable cause) {
-      super(cause);
-    }
-  }
-
-  public static UserGroupInformation proxyAs(String username,
-                                             UserGroupInformation login) {
-    Preconditions.checkArgument(username != null && !username.isEmpty(),
-        "Invalid username: " + String.valueOf(username));
-    Preconditions.checkArgument(login != null,
-        "Cannot proxy without an authenticated user");
-
-    // hadoop impersonation works with or without kerberos security
-    return UserGroupInformation.createProxyUser(username, login);
-  }
-
-  /**
-   * Static synchronized method for static Kerberos login. <br/>
-   * Static synchronized due to a thundering herd problem when multiple Sinks
-   * attempt to log in using the same principal at the same time with the
-   * intention of impersonating different users (or even the same user).
-   * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay
-   * attach and it returns:
-   * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote>
-   * In addition, since the underlying Hadoop APIs we are using for
-   * impersonation are static, we define this method as static as well.
-   *
-   * @param principal
-   *         Fully-qualified principal to use for authentication.
-   * @param keytab
-   *         Location of keytab file containing credentials for principal.
-   * @return Logged-in user
-   * @throws SecurityException
-   *         if login fails.
-   * @throws IllegalArgumentException
-   *         if the principal or the keytab is not usable
-   */
-  public static synchronized UserGroupInformation login(String principal,
-                                                 String keytab) {
-    // If the principal or keytab isn't set, get the current (Linux) user
-    if (principal == null || keytab == null) {
-      try {
-        return UserGroupInformation.getCurrentUser();
-      } catch (IOException ex) {
-        LOG.error("Can't get current user: {}", ex.getMessage());
-        throw new RuntimeException(ex);
-      }
-    }
-
-    // resolve the requested principal, if it is present
-    String finalPrincipal = null;
-    if (principal != null && !principal.isEmpty()) {
-      try {
-        // resolves _HOST pattern using standard Hadoop search/replace
-        // via DNS lookup when 2nd argument is empty
-        finalPrincipal = SecurityUtil.getServerPrincipal(principal, "");
-      } catch (IOException e) {
-        throw new SecurityException(
-            "Failed to resolve Kerberos principal", e);
-      }
-    }
-
-    // check if there is a user already logged in
-    UserGroupInformation currentUser = null;
-    try {
-      currentUser = UserGroupInformation.getLoginUser();
-    } catch (IOException e) {
-      // not a big deal but this shouldn't typically happen because it will
-      // generally fall back to the UNIX user
-      LOG.debug("Unable to get login user before Kerberos auth attempt", e);
-    }
-
-    // if the current user is valid (matches the given principal and has a TGT)
-    // then use it
-    if (currentUser != null && currentUser.hasKerberosCredentials()) {
-      if (finalPrincipal == null ||
-          finalPrincipal.equals(currentUser.getUserName())) {
-        LOG.debug("Using existing login for {}: {}",
-            finalPrincipal, currentUser);
-        return currentUser;
-      } else {
-        // be cruel and unusual when user tries to login as multiple principals
-        // this isn't really valid with a reconfigure but this should be rare
-        // enough to warrant a restart of the agent JVM
-        // TODO: find a way to interrogate the entire current config state,
-        // since we don't have to be unnecessarily protective if they switch all
-        // HDFS sinks to use a different principal all at once.
-        throw new SecurityException(
-            "Cannot use multiple Kerberos principals: " + finalPrincipal +
-                " would replace " + currentUser.getUserName());
-      }
-    }
-
-    // prepare for a new login
-    Preconditions.checkArgument(principal != null && !principal.isEmpty(),
-        "Invalid Kerberos principal: " + String.valueOf(principal));
-    Preconditions.checkNotNull(finalPrincipal,
-        "Resolved principal must not be null");
-    Preconditions.checkArgument(keytab != null && !keytab.isEmpty(),
-        "Invalid Kerberos keytab: " + String.valueOf(keytab));
-    File keytabFile = new File(keytab);
-    Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(),
-        "Keytab is not a readable file: " + String.valueOf(keytab));
-
-    try {
-      // attempt static kerberos login
-      LOG.debug("Logging in as {} with {}", finalPrincipal, keytab);
-      UserGroupInformation.loginUserFromKeytab(principal, keytab);
-      return UserGroupInformation.getLoginUser();
-    } catch (IOException e) {
-      throw new SecurityException("Kerberos login failed", e);
-    }
-  }
-
-  /**
-   * Allow methods to act with the privileges of a login.
-   *
-   * If the login is null, the current privileges will be used.
-   *
-   * @param <T> The return type of the action
-   * @param login UserGroupInformation credentials to use for action
-   * @param action A PrivilegedExceptionAction to perform as another user
-   * @return the T value returned by action.run()
-   */
-  public static <T> T runPrivileged(UserGroupInformation login,
-                                    PrivilegedExceptionAction<T> action) {
-    try {
-      if (login == null) {
-        return action.run();
-      } else {
-        return login.doAs(action);
-      }
-    } catch (IOException ex) {
-      throw new DatasetIOException("Privileged action failed", ex);
-    } catch (InterruptedException ex) {
-      Thread.interrupted();
-      throw new DatasetException(ex);
-    } catch (Exception ex) {
-      throw Throwables.propagate(ex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java
deleted file mode 100644
index f53ef73..0000000
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.kite;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.minikdc.MiniKdc;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestKerberosUtil {
-
-  private static MiniKdc kdc;
-  private static File workDir;
-  private static File flumeKeytab;
-  private static String flumePrincipal = "flume/localhost";
-  private static File aliceKeytab;
-  private static String alicePrincipal = "alice";
-  private static Properties conf;
-
-  @BeforeClass
-  public static void startMiniKdc() throws Exception {
-    URL resource = Thread.currentThread()
-        .getContextClassLoader().getResource("enable-kerberos.xml");
-    Configuration.addDefaultResource("enable-kerberos.xml");
-
-    workDir = new File(System.getProperty("test.dir", "target"),
-        TestKerberosUtil.class.getSimpleName());
-    flumeKeytab = new File(workDir, "flume.keytab");
-    aliceKeytab = new File(workDir, "alice.keytab");
-    conf = MiniKdc.createConf();
-
-    kdc = new MiniKdc(conf, workDir);
-    kdc.start();
-
-    kdc.createPrincipal(flumeKeytab, flumePrincipal);
-    flumePrincipal = flumePrincipal + "@" + kdc.getRealm();
-
-    kdc.createPrincipal(aliceKeytab, alicePrincipal);
-    alicePrincipal = alicePrincipal + "@" + kdc.getRealm();
-  }
-
-  @AfterClass
-  public static void stopMiniKdc() {
-    if (kdc != null) {
-      kdc.stop();
-    }
-  }
-
-  @Test
-  public void testNullLogin() throws IOException {
-    String principal = null;
-    String keytab = null;
-    UserGroupInformation expResult = UserGroupInformation.getCurrentUser();
-    UserGroupInformation result = KerberosUtil.login(principal, keytab);
-    assertEquals(expResult, result);
-  }
-
-  @Test
-  public void testFlumeLogin() throws IOException {
-    String principal = flumePrincipal;
-    String keytab = flumeKeytab.getAbsolutePath();
-    String expResult = principal;
-
-    String result = KerberosUtil.login(principal, keytab).getUserName();
-    assertEquals("Initial login failed", expResult, result);
-
-    result = KerberosUtil.login(principal, keytab).getUserName();
-    assertEquals("Re-login failed", expResult, result);
-
-    principal = alicePrincipal;
-    keytab = aliceKeytab.getAbsolutePath();
-    try {
-      result = KerberosUtil.login(principal, keytab).getUserName();
-      fail("Login should have failed with a new principal: " + result);
-    } catch (KerberosUtil.SecurityException ex) {
-      assertTrue("Login with a new principal failed, but for an unexpected "
-          + "reason: " + ex.getMessage(),
-          ex.getMessage().contains("Cannot use multiple Kerberos principals: "));
-    }
-  }
-
-  @Test
-  public void testProxyAs() throws IOException {
-    String username = "alice";
-
-    UserGroupInformation login = UserGroupInformation.getCurrentUser();
-    String expResult = username;
-    String result = KerberosUtil.proxyAs(username, login).getUserName();
-    assertEquals("Proxy as didn't generate the expected username", expResult, result);
-
-    login = KerberosUtil.login(flumePrincipal, flumeKeytab.getAbsolutePath());
-    assertEquals("Login succeeded, but the principal doesn't match",
-        flumePrincipal, login.getUserName());
-
-    result = KerberosUtil.proxyAs(username, login).getUserName();
-    assertEquals("Proxy as didn't generate the expected username", expResult, result);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 62f4eee..6b97de6 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -38,6 +38,7 @@ import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.SystemClock;
+import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
 import org.apache.hadoop.conf.Configuration;
@@ -45,7 +46,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +75,7 @@ class BucketWriter {
   private final CompressionCodec codeC;
   private final CompressionType compType;
   private final ScheduledExecutorService timedRollerPool;
-  private final UserGroupInformation user;
+  private final PrivilegedExecutor proxyUser;
 
   private final AtomicLong fileExtensionCounter;
 
@@ -120,7 +120,7 @@ class BucketWriter {
     Context context, String filePath, String fileName, String inUsePrefix,
     String inUseSuffix, String fileSuffix, CompressionCodec codeC,
     CompressionType compType, HDFSWriter writer,
-    ScheduledExecutorService timedRollerPool, UserGroupInformation user,
+    ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser,
     SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
     String onCloseCallbackPath, long callTimeout,
     ExecutorService callTimeoutPool, long retryInterval,
@@ -138,7 +138,7 @@ class BucketWriter {
     this.compType = compType;
     this.writer = writer;
     this.timedRollerPool = timedRollerPool;
-    this.user = user;
+    this.proxyUser = proxyUser;
     this.sinkCounter = sinkCounter;
     this.idleTimeout = idleTimeout;
     this.onCloseCallback = onCloseCallback;
@@ -165,33 +165,6 @@ class BucketWriter {
     this.writer = dataWriter;
   }
 
-  /**
-   * Allow methods to act as another user (typically used for HDFS Kerberos)
-   * @param <T>
-   * @param action
-   * @return
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
-      throws IOException, InterruptedException {
-
-    if (user != null) {
-      return user.doAs(action);
-    } else {
-      try {
-        return action.run();
-      } catch (IOException ex) {
-        throw ex;
-      } catch (InterruptedException ex) {
-        throw ex;
-      } catch (RuntimeException ex) {
-        throw ex;
-      } catch (Exception ex) {
-        throw new RuntimeException("Unexpected exception.", ex);
-      }
-    }
-  }
 
   /**
    * Clear the class counters
@@ -700,7 +673,7 @@ class BucketWriter {
     Future<T> future = callTimeoutPool.submit(new Callable<T>() {
       @Override
       public T call() throws Exception {
-        return runPrivileged(new PrivilegedExceptionAction<T>() {
+        return proxyUser.execute(new PrivilegedExceptionAction<T>() {
           @Override
           public T run() throws Exception {
             return callRunner.call();


Mime
View raw message