falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/3] git commit: FALCON-851 Super user authorization is broken. Contributed by Venkatesh Seetharam
Date Mon, 03 Nov 2014 19:23:38 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master a37899c54 -> 82cea4fb9


FALCON-851 Super user authorization is broken. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/27693c6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/27693c6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/27693c6f

Branch: refs/heads/master
Commit: 27693c6ff59e2fe8e0128e734f2d05ba8d6f9ccc
Parents: a37899c
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Mon Nov 3 10:22:26 2014 -0800
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Mon Nov 3 10:22:26 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/entity/v0/Entity.java     |  2 +
 .../falcon/cleanup/AbstractCleanupHandler.java  |  8 +-
 .../org/apache/falcon/entity/EntityUtil.java    | 22 +-----
 .../apache/falcon/entity/FileSystemStorage.java |  3 +-
 .../org/apache/falcon/entity/ProcessHelper.java |  6 +-
 .../entity/parser/ClusterEntityParser.java      |  4 +-
 .../falcon/entity/parser/EntityParser.java      | 30 ++++++++
 .../falcon/entity/parser/FeedEntityParser.java  |  1 +
 .../entity/parser/ProcessEntityParser.java      |  5 +-
 .../falcon/entity/store/ConfigurationStore.java |  6 ++
 .../falcon/hadoop/HadoopClientFactory.java      | 79 +++++++++++++++-----
 .../falcon/security/AuthorizationProvider.java  |  8 ++
 .../org/apache/falcon/security/CurrentUser.java |  4 +
 .../security/DefaultAuthorizationProvider.java  | 49 ++----------
 .../org/apache/falcon/update/UpdateHelper.java  | 14 ++--
 .../DefaultAuthorizationProviderTest.java       |  2 +-
 .../org/apache/falcon/logging/LogProvider.java  |  3 +-
 .../apache/falcon/oozie/OozieEntityBuilder.java |  4 +-
 .../OozieOrchestrationWorkflowBuilder.java      |  5 +-
 .../feed/FeedReplicationCoordinatorBuilder.java |  2 +-
 .../oozie/process/ProcessBundleBuilder.java     |  2 +-
 .../ProcessExecutionWorkflowBuilder.java        |  2 +-
 .../engine/OozieHouseKeepingService.java        |  3 +-
 .../workflow/engine/OozieWorkflowEngine.java    | 11 ++-
 .../falcon/resource/AbstractEntityManager.java  |  2 +-
 .../security/FalconAuthorizationFilter.java     |  6 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |  3 +-
 28 files changed, 165 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f83e4a8..05cc582 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -125,6 +125,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-851 Super user authorization is broken (Venkatesh Seetharam)
+
    FALCON-840 Possible NPE in filteredInstanceSet method of
    AbstractInstanceManager (Balu Vellanki via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/client/src/main/java/org/apache/falcon/entity/v0/Entity.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/Entity.java b/client/src/main/java/org/apache/falcon/entity/v0/Entity.java
index 252e860..7fb271d 100644
--- a/client/src/main/java/org/apache/falcon/entity/v0/Entity.java
+++ b/client/src/main/java/org/apache/falcon/entity/v0/Entity.java
@@ -29,6 +29,8 @@ import java.io.StringWriter;
 public abstract class Entity {
     public abstract String getName();
 
+    public abstract AccessControlList getACL();
+
     public EntityType getEntityType() {
         for (EntityType type : EntityType.values()) {
             if (type.getEntityClass().equals(getClass())) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index cd088b2..be300d7 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -30,7 +30,6 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
@@ -95,16 +94,13 @@ public abstract class AbstractCleanupHandler {
     private FileSystem getFileSystemAsEntityOwner(Cluster cluster,
                                                   Entity entity) throws FalconException {
         try {
-            final AccessControlList acl = EntityUtil.getACL(entity);
+            final AccessControlList acl = entity.getACL();
             if (acl == null) {
                 throw new FalconException("ACL for entity " + entity.getName() + " is empty");
             }
 
-            final String proxyUser = acl.getOwner();
-            // user for proxying
-            CurrentUser.authenticate(proxyUser);
             return HadoopClientFactory.get().createProxiedFileSystem(
-                    ClusterHelper.getConfiguration(cluster));
+                    ClusterHelper.getConfiguration(cluster), acl);
         } catch (Exception e) {
             throw new FalconException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 1a10986..bcebb94 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -27,7 +27,6 @@ import org.apache.falcon.Pair;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName;
 import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -584,7 +583,7 @@ public final class EntityUtil {
         throws FalconException {
         Path basePath = getBaseStagingPath(cluster, entity);
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
+                ClusterHelper.getConfiguration(cluster), entity.getACL());
         try {
             return fs.listStatus(basePath, new PathFilter() {
                 @Override
@@ -752,23 +751,4 @@ public final class EntityUtil {
         }
         return new Pair<Date, Date>(clusterMinStartDate.first, clusterMaxEndDate.first);
     }
-
-    public static AccessControlList getACL(Entity entity) {
-        switch (entity.getEntityType()) {
-        case CLUSTER:
-            return ((org.apache.falcon.entity.v0.cluster.Cluster) entity).getACL();
-
-        case FEED:
-            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
-
-        case PROCESS:
-            return ((org.apache.falcon.entity.v0.process.Process) entity).getACL();
-
-        default:
-            break;
-        }
-
-        throw new IllegalArgumentException("Unknown entity type: " + entity.getEntityType()
-                + " for: " + entity.getName());
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 012a6e7..953c19e 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -300,7 +300,8 @@ public class FileSystemStorage implements Storage {
                 getLocations(FeedHelper.getCluster(feed, clusterName), feed);
         Location location = getLocation(clusterSpecificLocation, locationType);
         try {
-            FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf());
+            FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(
+                getConf(), feed.getACL());
             Cluster cluster = ClusterHelper.getCluster(clusterName);
             Properties baseProperties = FeedHelper.getClusterProperties(cluster);
             baseProperties.putAll(FeedHelper.getFeedProperties(feed));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 2565bf6..8073229 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -86,7 +86,8 @@ public final class ProcessHelper {
     public static Path getUserWorkflowPath(Process process, org.apache.falcon.entity.v0.cluster.Cluster cluster,
         Path buildPath) throws FalconException {
         try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster), process.getACL());
             Path wfPath = new Path(process.getWorkflow().getPath());
             if (fs.isFile(wfPath)) {
                 return new Path(buildPath.getParent(), EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
@@ -107,7 +108,8 @@ public final class ProcessHelper {
             }
             Path libPath = new Path(userLibPath);
 
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster), process.getACL());
             if (fs.isFile(libPath)) {
                 return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 7c4b99d..5a7ec17 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -221,6 +221,8 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
             throw new ValidationException("Cluster ACL cannot be empty for:  " + cluster.getName());
         }
 
+        validateACLOwnerAndGroup(clusterACL);
+
         try {
             authorize(cluster.getName(), clusterACL);
         } catch (AuthorizationException e) {
@@ -239,7 +241,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         Configuration conf = ClusterHelper.getConfiguration(cluster);
         FileSystem fs;
         try {
-            fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+            fs = HadoopClientFactory.get().createProxiedFileSystem(conf, cluster.getACL());
         } catch (FalconException e) {
             throw new ValidationException(
                     "Unable to get file system handle for cluster " + cluster.getName(), e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index ac58280..e2742a1 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,9 @@ import javax.xml.bind.Unmarshaller;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Generic Abstract Entity Parser, the concrete FEED, PROCESS and CLUSTER should extend this parser
@@ -109,6 +113,32 @@ public abstract class EntityParser<T extends Entity> {
     public abstract void validate(T entity) throws FalconException;
 
     /**
+     * Checks if the acl owner is a valid user by fetching the groups for the owner.
+     * Also checks if the acl group is one of the fetched groups for membership.
+     * The only limitation is that a user cannot add a group in ACL that he does not belong to.
+     *
+     * @param acl  entity ACL
+     * @throws org.apache.falcon.entity.parser.ValidationException
+     */
+    protected void validateACLOwnerAndGroup(AccessControlList acl) throws ValidationException {
+        String aclOwner = acl.getOwner();
+        String aclGroup = acl.getGroup();
+
+        try {
+            UserGroupInformation proxyACLUser = UserGroupInformation.createProxyUser(
+                    aclOwner, UserGroupInformation.getLoginUser());
+            Set<String> groups = new HashSet<String>(Arrays.asList(proxyACLUser.getGroupNames()));
+            if (!groups.contains(aclGroup)) {
+                throw new AuthorizationException("Invalid group: " + aclGroup
+                        + " for user: " + aclOwner);
+            }
+        } catch (IOException e) {
+            throw new ValidationException("Invalid acl owner " + aclOwner
+                    + ", does not exist or does not belong to group: " + aclGroup);
+        }
+    }
+
+    /**
      * Validate if the entity owner is the logged-in authenticated user.
      *
      * @param entityName  entity name

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 34b764b..a724695 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -410,6 +410,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
 
         final ACL feedACL = feed.getACL();
+        validateACLOwnerAndGroup(feedACL);
         try {
             authorize(feed.getName(), feedACL);
         } catch (AuthorizationException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 9be4e85..aaaa229 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -122,7 +122,8 @@ public class ProcessEntityParser extends EntityParser<Process> {
         String nameNode = getNameNode(cluster);
         try {
             Configuration configuration = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                configuration, process.getACL());
             if (!fs.exists(new Path(workflowPath))) {
                 throw new ValidationException(
                         "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
@@ -243,6 +244,8 @@ public class ProcessEntityParser extends EntityParser<Process> {
             throw new ValidationException("Process ACL cannot be empty for:  " + process.getName());
         }
 
+        validateACLOwnerAndGroup(processACL);
+
         try {
             authorize(process.getName(), processACL);
         } catch (AuthorizationException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 62a3e5b..1c1c325 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity.store;
 
 import org.apache.commons.codec.CharEncoding;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -73,6 +74,11 @@ public final class ConfigurationStore implements FalconService {
         public String getName() {
             return "NULL";
         }
+
+        @Override
+        public AccessControlList getACL() {
+            return null;
+        }
     };
 
     private static final ConfigurationStore STORE = new ConfigurationStore();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index 3011b65..1496268 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -20,6 +20,7 @@ package org.apache.falcon.hadoop;
 
 import org.apache.commons.lang.Validate;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.StartupProperties;
@@ -84,11 +85,19 @@ public final class HadoopClientFactory {
         }
     }
 
+    /**
+     * This method is only used by Falcon internally to talk to the config store on HDFS.
+     *
+     * @param conf configuration.
+     * @return FileSystem created with the provided proxyUser/group.
+     * @throws org.apache.falcon.FalconException
+     *          if the filesystem could not be created.
+     */
     public FileSystem createFalconFileSystem(final Configuration conf)
         throws FalconException {
         Validate.notNull(conf, "configuration cannot be null");
 
-        String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
+        String nameNode = getNameNode(conf);
         try {
             return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf);
         } catch (URISyntaxException e) {
@@ -110,18 +119,20 @@ public final class HadoopClientFactory {
         throws FalconException {
         Validate.notNull(conf, "configuration cannot be null");
 
-        String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
-        try {
-            return createFileSystem(getProxyUGI(), new URI(nameNode), conf);
-        } catch (URISyntaxException e) {
-            throw new FalconException("Exception while getting FileSystem for proxy: "
-                    + CurrentUser.getUser(), e);
-        } catch (IOException e) {
-            throw new FalconException("Exception while getting FileSystem for proxy: "
-                    + CurrentUser.getUser(), e);
-        }
+        return createProxiedFileSystem(conf, null);
     }
 
+    private String getNameNode(Configuration conf) {
+        return conf.get(FS_DEFAULT_NAME_KEY);
+    }
+
+    /**
+     * This method is called from with in a workflow execution context.
+     *
+     * @param uri uri
+     * @return file system handle
+     * @throws FalconException
+     */
     public FileSystem createProxiedFileSystem(final URI uri) throws FalconException {
         return createProxiedFileSystem(uri, new Configuration());
     }
@@ -130,22 +141,45 @@ public final class HadoopClientFactory {
                                               final Configuration conf) throws FalconException {
         Validate.notNull(uri, "uri cannot be null");
 
+        return createProxiedFileSystem(uri, conf, null);
+    }
+
+    public FileSystem createProxiedFileSystem(final Configuration conf,
+                                              final AccessControlList acl) throws FalconException {
+        Validate.notNull(conf, "configuration cannot be null");
+
         try {
-            return createFileSystem(getProxyUGI(), uri, conf);
-        } catch (IOException e) {
+            return createProxiedFileSystem(new URI(getNameNode(conf)), conf, acl);
+        } catch (URISyntaxException e) {
             throw new FalconException("Exception while getting FileSystem for proxy: "
                     + CurrentUser.getUser(), e);
         }
     }
 
-    private UserGroupInformation getProxyUGI() throws IOException {
-        try { // get the authenticated user
-            return CurrentUser.getProxyUGI();
-        } catch (Exception ignore) {
-            // ignore since the user authentication might have failed or in oozie
+    // getFileSystemAsEntityOwner
+    public FileSystem createProxiedFileSystem(final URI uri,
+                                              final Configuration conf,
+                                              final AccessControlList acl) throws FalconException {
+        Validate.notNull(uri, "uri cannot be null");
+
+        try {
+            UserGroupInformation proxyUGI = getProxyUGI(acl);
+
+            return createFileSystem(proxyUGI, uri, conf);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for proxy: "
+                + CurrentUser.getUser(), e);
         }
+    }
+
+    private UserGroupInformation getProxyUGI(AccessControlList acl)
+        throws FalconException, IOException {
 
-        return UserGroupInformation.getCurrentUser();
+        return CurrentUser.isAuthenticated()
+            ? acl != null
+                && SecurityUtil.getAuthorizationProvider().isSuperUser(CurrentUser.getProxyUGI())
+                ? CurrentUser.createProxyUGI(acl.getOwner()) : CurrentUser.getProxyUGI()
+            : UserGroupInformation.getCurrentUser();
     }
 
     /**
@@ -166,7 +200,7 @@ public final class HadoopClientFactory {
 
         String nameNode = uri.getAuthority();
         if (nameNode == null) {
-            nameNode = conf.get(FS_DEFAULT_NAME_KEY);
+            nameNode = getNameNode(conf);
             if (nameNode != null) {
                 try {
                     new URI(nameNode).getAuthority();
@@ -177,6 +211,11 @@ public final class HadoopClientFactory {
         }
 
         try {
+            // prevent falcon impersonating falcon, no need to use doas
+            if (ugi.equals(UserGroupInformation.getLoginUser())) {
+                return FileSystem.get(uri, conf);
+            }
+
             return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
                 public FileSystem run() throws Exception {
                     return FileSystem.get(uri, conf);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
index 3133d91..1b36c4e 100644
--- a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
+++ b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
@@ -28,6 +28,14 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
 public interface AuthorizationProvider {
 
     /**
+     * Check if the authenticated user is a super user.
+     *
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @return true if sure user, else false
+     */
+    boolean isSuperUser(UserGroupInformation proxyUgi);
+
+    /**
      * Determines if the authenticated user is authorized to execute the action on the resource,
      * which is typically a REST resource path.
      * Throws an exception if not authorized.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index cfea143..3d35630 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -70,6 +70,10 @@ public final class CurrentUser {
         INSTANCE.currentSubject.set(subject);
     }
 
+    public static boolean isAuthenticated() {
+        return getSubject() != null;
+    }
+
     public static Subject getSubject() {
         return INSTANCE.currentSubject.get();
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
index 6b80a1b..a5af2c1 100644
--- a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
+++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -82,7 +81,7 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
     private static final String SUPER_USER_GROUP_KEY = FALCON_PREFIX + "superusergroup";
 
     /**
-     * Super ser group.
+     * Super user group.
      */
     private String superUserGroup;
     private Set<String> adminUsers;
@@ -148,9 +147,8 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
      * @param authenticatedUGI UGI
      * @return true if super user else false.
      */
-    protected boolean isSuperUser(UserGroupInformation authenticatedUGI) {
-        final String authenticatedUser = authenticatedUGI.getShortUserName();
-        return SUPER_USER.equals(authenticatedUser)
+    public boolean isSuperUser(UserGroupInformation authenticatedUGI) {
+        return SUPER_USER.equals(authenticatedUGI.getShortUserName())
                 || (!StringUtils.isEmpty(superUserGroup)
                     && isUserInGroup(superUserGroup, authenticatedUGI));
     }
@@ -180,7 +178,6 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
                 authenticatedUser, action, entityName, entityType);
 
         if (isSuperUser(proxyUgi)) {
-            validateACLOwnerAndGroup(acl.getOwner(), acl.getGroup());
             return;
         }
 
@@ -188,31 +185,6 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
     }
 
     /**
-     * Checks if the acl owner is a valid user by fetching the groups for the owner.
-     * Also checks if the acl group is one of the fetched groups for membership.
-     * The only limitation is that a user cannot add a group in ACL that he does not belong to.
-     *
-     * @param aclOwner ACL owner
-     * @param aclGroup ACL group
-     * @throws AuthorizationException
-     */
-    protected void validateACLOwnerAndGroup(String aclOwner,
-                                            String aclGroup) throws AuthorizationException {
-        try {
-            UserGroupInformation proxyACLUser = UserGroupInformation.createProxyUser(
-                    aclOwner, UserGroupInformation.getLoginUser());
-            Set<String> groups = new HashSet<String>(Arrays.asList(proxyACLUser.getGroupNames()));
-            if (!isUserInGroup(aclGroup, groups)) {
-                throw new AuthorizationException("Invalid group: " + aclGroup
-                        + " for user: " + aclOwner);
-            }
-        } catch (IOException e) {
-            throw new AuthorizationException("Invalid acl owner " + aclOwner
-                    + ", does not exist or does not belong to group: " + aclGroup);
-        }
-    }
-
-    /**
      * Validate if the entity owner is the logged-in authenticated user.
      *
      * @param entityName        entity name.
@@ -227,7 +199,7 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
                              String action, String authenticatedUser,
                              UserGroupInformation proxyUgi) throws AuthorizationException {
         if (isUserACLOwner(authenticatedUser, aclOwner)
-                && isUserInGroup(aclGroup, proxyUgi)) {
+                || isUserInGroup(aclGroup, proxyUgi)) {
             return;
         }
 
@@ -262,17 +234,6 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
      */
     protected boolean isUserInGroup(String group, UserGroupInformation proxyUgi) {
         Set<String> groups = getGroupNames(proxyUgi);
-        return isUserInGroup(group, groups);
-    }
-
-    /**
-     * Checks if the user's group matches the entity ACL group.
-     *
-     * @param group    Entity ACL group.
-     * @param groups   set of groups for the authenticated user.
-     * @return true if user groups contains entity acl group.
-     */
-    protected boolean isUserInGroup(String group, Set<String> groups) {
         return groups.contains(group);
     }
 
@@ -321,7 +282,7 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         if (entityName != null) { // lifecycle actions
             Entity entity = getEntity(entityName, entityType);
             authorizeEntity(entity.getName(), entity.getEntityType().name(),
-                    EntityUtil.getACL(entity), action, proxyUgi);
+                    entity.getACL(), action, proxyUgi);
         } else {
             // non lifecycle actions, lifecycle actions with null entity will validate later
             LOG.info("Authorization for action={} will be done in the API", action);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index ac882d5..7782c71 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -108,22 +108,24 @@ public final class UpdateHelper {
     }
 
     //Checks if the user workflow or lib is updated
-    public static boolean isWorkflowUpdated(String cluster, Entity entity, Path bundleAppPath) throws FalconException {
+    public static boolean isWorkflowUpdated(String cluster, Entity entity,
+                                            Path bundleAppPath) throws FalconException {
         if (entity.getEntityType() != EntityType.PROCESS) {
             return false;
         }
 
-        try {
-            if (bundleAppPath == null) {
-                return true;
-            }
+        if (bundleAppPath == null) {
+            return true;
+        }
 
+        try {
             Process process = (Process) entity;
             org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
                 ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
             Path checksum = new Path(bundleAppPath, EntityUtil.PROCESS_CHECKSUM_FILE);
             Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                conf, process.getACL());
             if (!fs.exists(checksum)) {
                 //Update if there is no checksum file(for backward compatibility)
                 return true;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
index 7bb2e0e..65effad 100644
--- a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
+++ b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
@@ -357,7 +357,7 @@ public class DefaultAuthorizationProviderTest {
         Assert.fail("Bad entity");
     }
 
-    @Test (expectedExceptions = AuthorizationException.class)
+    @Test
     public void testAuthorizeValidatePOSTOperationsGroupBadUser() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index 2e5dffb..6844f31 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -53,7 +53,8 @@ public final class LogProvider {
         try {
             Configuration conf = ClusterHelper.getConfiguration(clusterObj);
             // fs on behalf of the end user.
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                conf, entity.getACL());
             String resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance, runId);
             // if runId param is not resolved, i.e job is killed or not started or running
             if (resolvedRunId.equals("-")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index e341fb8..4108839 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -140,7 +140,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
             }
 
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+                    outPath.toUri(), ClusterHelper.getConfiguration(cluster), entity.getACL());
             OutputStream out = fs.create(outPath);
             try {
                 marshaller.marshal(jaxbElement, out);
@@ -261,7 +261,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
     protected void copySharedLibs(Cluster cluster, Path libPath) throws FalconException {
         try {
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    libPath.toUri(), ClusterHelper.getConfiguration(cluster));
+                    libPath.toUri(), ClusterHelper.getConfiguration(cluster), entity.getACL());
             SharedLibraryHostingService.pushLibsToHDFS(
                     fs, StartupProperties.get().getProperty("system.lib.location"),
                     libPath, FALCON_JAR_FILTER);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 3a3e26e..f7fed45 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -211,7 +211,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
         String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
+            ClusterHelper.getConfiguration(cluster), entity.getACL());
         try {
             addExtensionJars(fs, new Path(libext), wf);
             addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
@@ -268,7 +268,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
 
         try {
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf,
+                entity.getACL());
 
             // create hive conf to stagingDir
             Path confPath = new Path(workflowPath + "/conf");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index c5366dc..c578005 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -281,7 +281,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
                                         Path buildPath) throws FalconException {
         Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf, entity.getACL());
 
         try {
             // copy import export scripts to stagingDir

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index a38fdf6..3e54bd2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -121,7 +121,7 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
     private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException {
         try {
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    ClusterHelper.getConfiguration(cluster));
+                ClusterHelper.getConfiguration(cluster), entity.getACL());
 
             //Copy user workflow and lib to staging dir
             Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 2700802..24437fc 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -223,7 +223,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
 
         try {
             final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    ClusterHelper.getConfiguration(cluster));
+                ClusterHelper.getConfiguration(cluster), entity.getACL());
             if (fs.isFile(libPath)) {  // File, not a Dir
                 archiveList.add(libPath.toString());
                 return;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index 54cab51..d9fe8c1 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -58,7 +58,8 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener {
             LOG.info("Deleting entity path {} on cluster {}", entityPath, clusterName);
 
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf,
+                entity.getACL());
             if (fs.exists(entityPath) && !fs.delete(entityPath, true)) {
                 throw new FalconException("Unable to cleanup entity path: " + entityPath);
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 5c4fea3..7032182 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -49,6 +49,7 @@ import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
@@ -176,12 +177,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         Path logPath = EntityUtil.getLogPath(cluster, entity);
 
         try {
-            HadoopClientFactory.mkdirsWithDefaultPerms(
-                    HadoopClientFactory.get().createProxiedFileSystem(
-                            ClusterHelper.getConfiguration(cluster)), stagingPath);
-            HadoopClientFactory.mkdirsWithDefaultPerms(
-                    HadoopClientFactory.get().createProxiedFileSystem(
-                            ClusterHelper.getConfiguration(cluster)), logPath);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster), entity.getACL());
+            HadoopClientFactory.mkdirsWithDefaultPerms(fs, stagingPath);
+            HadoopClientFactory.mkdirsWithDefaultPerms(fs, logPath);
         } catch (IOException e) {
             throw new FalconException("Error preparing base staging dirs: " + stagingPath, e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index c4c6493..80b2429 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -356,7 +356,7 @@ public abstract class AbstractEntityManager {
      * @param entity entity
      */
     private void decorateEntityWithACL(Entity entity) {
-        if (SecurityUtil.isAuthorizationEnabled() || EntityUtil.getACL(entity) != null) {
+        if (SecurityUtil.isAuthorizationEnabled() || entity.getACL() != null) {
             return; // not necessary to decorate
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
index aff2006..16cc0ac 100644
--- a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
@@ -63,11 +63,11 @@ public class FalconAuthorizationFilter implements Filter {
     public void doFilter(ServletRequest request,
                          ServletResponse response,
                          FilterChain filterChain) throws IOException, ServletException {
-        HttpServletRequest httpRequest = (HttpServletRequest) request;
-        RequestParts requestParts = getUserRequest(httpRequest);
-
         if (isAuthorizationEnabled) {
+            HttpServletRequest httpRequest = (HttpServletRequest) request;
+            RequestParts requestParts = getUserRequest(httpRequest);
             LOG.info("Authorizing user={} against request={}", CurrentUser.getUser(), requestParts);
+
             try {
                 authorizationProvider.authorizeResource(requestParts.getResource(),
                         requestParts.getAction(), requestParts.getEntityType(),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/27693c6f/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index c2cb09e..6a8017e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -75,7 +75,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                 LOG.info("Going to delete path: {}", lateLogPath);
                 final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
                 Configuration conf = getConfiguration(storageEndpoint);
-                FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+                FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    conf, entity.getACL());
                 if (fs.exists(lateLogPath)) {
                     boolean deleted = fs.delete(lateLogPath, true);
                     if (deleted) {


Mime
View raw message