falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [3/4] incubator-falcon git commit: FALCON-864 Falcon superuser is unable to delete scheduled feed. Contributed by Venkatesh Seetharam
Date Fri, 07 Nov 2014 20:39:24 GMT
FALCON-864 Falcon superuser is unable to delete scheduled feed. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7cb0666b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7cb0666b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7cb0666b

Branch: refs/heads/master
Commit: 7cb0666b2f05118e3c86d70a71f7e27e872703e1
Parents: e9b12f4
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Fri Nov 7 11:31:16 2014 -0800
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Fri Nov 7 11:31:16 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../falcon/cleanup/AbstractCleanupHandler.java  |   4 +-
 .../org/apache/falcon/entity/EntityUtil.java    |   5 +-
 .../apache/falcon/entity/FileSystemStorage.java |   5 +-
 .../org/apache/falcon/entity/ProcessHelper.java |   4 +-
 .../entity/parser/ClusterEntityParser.java      |   5 +-
 .../falcon/entity/parser/EntityParser.java      |   2 +-
 .../falcon/entity/parser/FeedEntityParser.java  |   3 +-
 .../entity/parser/ProcessEntityParser.java      |   5 +-
 .../falcon/hadoop/HadoopClientFactory.java      |  55 +++----
 .../falcon/security/AuthorizationProvider.java  |  27 +++-
 .../org/apache/falcon/security/CurrentUser.java | 153 ++++++++++++-------
 .../security/DefaultAuthorizationProvider.java  | 151 +++++++++++-------
 .../apache/falcon/security/FalconPrincipal.java |  38 -----
 .../org/apache/falcon/update/UpdateHelper.java  |   3 +-
 .../apache/falcon/security/CurrentUserTest.java |  75 ++++++++-
 .../DefaultAuthorizationProviderTest.java       |   3 +-
 .../org/apache/falcon/logging/LogProvider.java  |   3 +-
 .../apache/falcon/oozie/OozieEntityBuilder.java |   4 +-
 .../OozieOrchestrationWorkflowBuilder.java      |   5 +-
 .../feed/FeedReplicationCoordinatorBuilder.java |   2 +-
 .../oozie/process/ProcessBundleBuilder.java     |   2 +-
 .../ProcessExecutionWorkflowBuilder.java        |   2 +-
 .../workflow/engine/OozieClientFactory.java     |  13 +-
 .../engine/OozieHouseKeepingService.java        |   3 +-
 .../workflow/engine/OozieWorkflowEngine.java    |   2 +-
 .../apache/oozie/client/ProxyOozieClient.java   |   5 +-
 .../falcon/resource/AbstractEntityManager.java  |  34 +++--
 .../falcon/security/FalconAuditFilter.java      |  16 +-
 .../security/FalconAuthenticationFilter.java    |   2 +-
 .../security/FalconAuthorizationFilter.java     |  84 ++++++++--
 .../security/FalconAuthorizationFilterTest.java |  14 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |   3 +-
 33 files changed, 459 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e021bec..3f63b2a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -129,6 +129,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-864 Falcon superuser is unable to delete scheduled feed
+   (Venkatesh Seetharam)
+
    FALCON-862 Falcon entity Rest API - filter by tags also returns entities
    that do not have tags (Balu Vellanki via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index be300d7..6fa6c73 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
@@ -99,8 +100,9 @@ public abstract class AbstractCleanupHandler {
                 throw new FalconException("ACL for entity " + entity.getName() + " is empty");
             }
 
+            CurrentUser.authenticate(acl.getOwner()); // proxy user
             return HadoopClientFactory.get().createProxiedFileSystem(
-                    ClusterHelper.getConfiguration(cluster), acl);
+                    ClusterHelper.getConfiguration(cluster));
         } catch (Exception e) {
             throw new FalconException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index bcebb94..59e43fb 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -579,11 +579,10 @@ public final class EntityUtil {
 
     //Returns all staging paths for the entity
     public static FileStatus[] getAllStagingPaths(org.apache.falcon.entity.v0.cluster.Cluster cluster,
-        Entity entity)
-        throws FalconException {
+                                                  Entity entity) throws FalconException {
         Path basePath = getBaseStagingPath(cluster, entity);
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster), entity.getACL());
+                ClusterHelper.getConfiguration(cluster));
         try {
             return fs.listStatus(basePath, new PathFilter() {
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 953c19e..76222fc 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -216,7 +216,7 @@ public class FileSystemStorage implements Storage {
     }
 
     public Path getWorkingDir() {
-        return new Path(CurrentUser.getSubject() == null ? "/" : "/user/" + CurrentUser.getUser());
+        return new Path(CurrentUser.isAuthenticated() ? "/user/" + CurrentUser.getUser() : "/");
     }
 
     @Override
@@ -300,8 +300,7 @@ public class FileSystemStorage implements Storage {
                 getLocations(FeedHelper.getCluster(feed, clusterName), feed);
         Location location = getLocation(clusterSpecificLocation, locationType);
         try {
-            FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(
-                getConf(), feed.getACL());
+            FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf());
             Cluster cluster = ClusterHelper.getCluster(clusterName);
             Properties baseProperties = FeedHelper.getClusterProperties(cluster);
             baseProperties.putAll(FeedHelper.getFeedProperties(feed));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 8073229..174f8f6 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -87,7 +87,7 @@ public final class ProcessHelper {
         Path buildPath) throws FalconException {
         try {
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster), process.getACL());
+                ClusterHelper.getConfiguration(cluster));
             Path wfPath = new Path(process.getWorkflow().getPath());
             if (fs.isFile(wfPath)) {
                 return new Path(buildPath.getParent(), EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
@@ -109,7 +109,7 @@ public final class ProcessHelper {
             Path libPath = new Path(userLibPath);
 
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster), process.getACL());
+                ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {
                 return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 5a7ec17..b801dec 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -70,6 +70,8 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
             validateScheme(cluster, Interfacetype.REGISTRY);
         }
 
+        validateACL(cluster);
+
         if (!EntityUtil.responsibleFor(cluster.getColo())) {
             return;
         }
@@ -81,7 +83,6 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         validateMessagingInterface(cluster);
         validateRegistryInterface(cluster);
 
-        validateACL(cluster);
         validateLocations(cluster);
     }
 
@@ -241,7 +242,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         Configuration conf = ClusterHelper.getConfiguration(cluster);
         FileSystem fs;
         try {
-            fs = HadoopClientFactory.get().createProxiedFileSystem(conf, cluster.getACL());
+            fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
         } catch (FalconException e) {
             throw new ValidationException(
                     "Unable to get file system handle for cluster " + cluster.getName(), e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index e2742a1..05b204d 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -149,7 +149,7 @@ public abstract class EntityParser<T extends Entity> {
                              AccessControlList acl) throws AuthorizationException {
         try {
             SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName,
-                    getEntityType().name(), acl, "validate", CurrentUser.getProxyUGI());
+                    getEntityType().name(), acl, "submit", CurrentUser.getAuthenticatedUGI());
         } catch (FalconException e) {
             throw new AuthorizationException(e);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index b4d29ee..63f9202 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -72,6 +72,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             throw new ValidationException("Feed should have at least one cluster");
         }
 
+        validateACL(feed);
         for (Cluster cluster : feed.getClusters().getClusters()) {
             validateEntityExists(EntityType.CLUSTER, cluster.getName());
             validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(),
@@ -487,7 +488,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
             if (dataLocation == null) {
                 throw new ValidationException(feed.getName() + " is a FileSystem based feed "
-                        + "but it doesn't contain location type - data in cluster " + cluster.getName().toString());
+                    + "but it doesn't contain location type - data in cluster " + cluster.getName());
             }
 
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index aaaa229..55887ac 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -64,6 +64,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
             process.setTimezone(TimeZone.getTimeZone("UTC"));
         }
 
+        validateACL(process);
         // check if dependent entities exists
         Set<String> clusters = new HashSet<String>();
         for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
@@ -99,7 +100,6 @@ public class ProcessEntityParser extends EntityParser<Process> {
         }
         validateDatasetName(process.getInputs(), process.getOutputs());
         validateLateInputs(process);
-        validateACL(process);
     }
 
     /**
@@ -122,8 +122,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
         String nameNode = getNameNode(cluster);
         try {
             Configuration configuration = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                configuration, process.getACL());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
             if (!fs.exists(new Path(workflowPath))) {
                 throw new ValidationException(
                         "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index 1496268..fb954ec 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -20,7 +20,6 @@ package org.apache.falcon.hadoop;
 
 import org.apache.commons.lang.Validate;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.StartupProperties;
@@ -33,6 +32,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
@@ -44,6 +45,8 @@ import java.security.PrivilegedExceptionAction;
  */
 public final class HadoopClientFactory {
 
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopClientFactory.class);
+
     public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
     public static final String MR_JT_ADDRESS_KEY = "mapreduce.jobtracker.address";
     public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address";
@@ -119,10 +122,15 @@ public final class HadoopClientFactory {
         throws FalconException {
         Validate.notNull(conf, "configuration cannot be null");
 
-        return createProxiedFileSystem(conf, null);
+        String nameNode = getNameNode(conf);
+        try {
+            return createProxiedFileSystem(new URI(nameNode), conf);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
+        }
     }
 
-    private String getNameNode(Configuration conf) {
+    private static String getNameNode(Configuration conf) {
         return conf.get(FS_DEFAULT_NAME_KEY);
     }
 
@@ -141,47 +149,14 @@ public final class HadoopClientFactory {
                                               final Configuration conf) throws FalconException {
         Validate.notNull(uri, "uri cannot be null");
 
-        return createProxiedFileSystem(uri, conf, null);
-    }
-
-    public FileSystem createProxiedFileSystem(final Configuration conf,
-                                              final AccessControlList acl) throws FalconException {
-        Validate.notNull(conf, "configuration cannot be null");
-
         try {
-            return createProxiedFileSystem(new URI(getNameNode(conf)), conf, acl);
-        } catch (URISyntaxException e) {
-            throw new FalconException("Exception while getting FileSystem for proxy: "
-                    + CurrentUser.getUser(), e);
-        }
-    }
-
-    // getFileSystemAsEntityOwner
-    public FileSystem createProxiedFileSystem(final URI uri,
-                                              final Configuration conf,
-                                              final AccessControlList acl) throws FalconException {
-        Validate.notNull(uri, "uri cannot be null");
-
-        try {
-            UserGroupInformation proxyUGI = getProxyUGI(acl);
-
-            return createFileSystem(proxyUGI, uri, conf);
+            return createFileSystem(CurrentUser.getProxyUGI(), uri, conf);
         } catch (IOException e) {
             throw new FalconException("Exception while getting FileSystem for proxy: "
                 + CurrentUser.getUser(), e);
         }
     }
 
-    private UserGroupInformation getProxyUGI(AccessControlList acl)
-        throws FalconException, IOException {
-
-        return CurrentUser.isAuthenticated()
-            ? acl != null
-                && SecurityUtil.getAuthorizationProvider().isSuperUser(CurrentUser.getProxyUGI())
-                ? CurrentUser.createProxyUGI(acl.getOwner()) : CurrentUser.getProxyUGI()
-            : UserGroupInformation.getCurrentUser();
-    }
-
     /**
      * Return a FileSystem created with the provided user for the specified URI.
      *
@@ -212,10 +187,14 @@ public final class HadoopClientFactory {
 
         try {
             // prevent falcon impersonating falcon, no need to use doas
-            if (ugi.equals(UserGroupInformation.getLoginUser())) {
+            final String proxyUserName = ugi.getShortUserName();
+            if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
+                LOG.info("Creating FS for the login user {}, impersonation not required",
+                    proxyUserName);
                 return FileSystem.get(uri, conf);
             }
 
+            LOG.info("Creating FS impersonating user {}", proxyUserName);
             return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
                 public FileSystem run() throws Exception {
                     return FileSystem.get(uri, conf);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
index 1b36c4e..a6f2564 100644
--- a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
+++ b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
@@ -18,10 +18,13 @@
 
 package org.apache.falcon.security;
 
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 
+import java.io.IOException;
+
 /**
  * An interface for authorizing user against an entity operation.
  */
@@ -30,10 +33,21 @@ public interface AuthorizationProvider {
     /**
      * Check if the authenticated user is a super user.
      *
-     * @param proxyUgi   proxy ugi for the authenticated user
+     * @param authenticatedUGI   proxy ugi for the authenticated user
      * @return true if sure user, else false
      */
-    boolean isSuperUser(UserGroupInformation proxyUgi);
+    boolean isSuperUser(UserGroupInformation authenticatedUGI);
+
+    /**
+     * Checks if authenticated user can proxy the entity acl owner.
+     *
+     * @param authenticatedUGI  proxy ugi for the authenticated user.
+     * @param aclOwner          entity ACL Owner.
+     * @param aclGroup          entity ACL group.
+     * @throws IOException
+     */
+    boolean shouldProxy(UserGroupInformation authenticatedUGI,
+                        String aclOwner, String aclGroup) throws IOException;
 
     /**
      * Determines if the authenticated user is authorized to execute the action on the resource,
@@ -44,14 +58,15 @@ public interface AuthorizationProvider {
      * @param action     action being authorized on resource and entity if applicable
      * @param entityType entity type in question, not for admin resource
      * @param entityName entity name in question, not for admin resource
-     * @param proxyUgi   proxy ugi for the authenticated user
+     * @param authenticatedUGI   proxy ugi for the authenticated user
      * @throws AuthorizationException
      */
     void authorizeResource(String resource,
                            String action,
                            String entityType,
                            String entityName,
-                           UserGroupInformation proxyUgi) throws AuthorizationException;
+                           UserGroupInformation authenticatedUGI)
+        throws AuthorizationException, EntityNotRegisteredException;
 
     /**
      * Determines if the authenticated user is authorized to execute the action on the entity.
@@ -61,10 +76,10 @@ public interface AuthorizationProvider {
      * @param entityType entity in question, applicable for entities and instance resource
      * @param acl        entity ACL
      * @param action     action being authorized on resource and entity if applicable
-     * @param proxyUgi   proxy ugi for the authenticated user
+     * @param authenticatedUGI   proxy ugi for the authenticated user
      * @throws AuthorizationException
      */
     void authorizeEntity(String entityName, String entityType,
                          AccessControlList acl, String action,
-                         UserGroupInformation proxyUgi) throws AuthorizationException;
+                         UserGroupInformation authenticatedUGI) throws AuthorizationException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index 3d35630..11adccd 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -18,11 +18,11 @@
 
 package org.apache.falcon.security;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,71 +32,114 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * Current authenticated user via REST.
- * Also doles out proxied UserGroupInformation. Caches proxied users.
+ * Current authenticated user via REST. Also captures the proxy user from authorized entity
+ * and doles out proxied UserGroupInformation. Caches proxied users.
  */
 public final class CurrentUser {
 
     private static final Logger LOG = LoggerFactory.getLogger(CurrentUser.class);
+    private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
 
-    private static final CurrentUser INSTANCE = new CurrentUser();
+    private final String authenticatedUser;
+    private String proxyUser;
 
-    private CurrentUser() {}
-
-    public static CurrentUser get() {
-        return INSTANCE;
+    private CurrentUser(String authenticatedUser) {
+        this.authenticatedUser = authenticatedUser;
+        this.proxyUser = authenticatedUser;
     }
 
-    private final ThreadLocal<Subject> currentSubject = new ThreadLocal<Subject>();
+    private static final ThreadLocal<CurrentUser> CURRENT_USER = new ThreadLocal<CurrentUser>();
 
+    /**
+     * Captures the authenticated user.
+     *
+     * @param user   authenticated user
+     */
     public static void authenticate(final String user) {
-        if (user == null || user.isEmpty()) {
+        if (StringUtils.isEmpty(user)) {
             throw new IllegalStateException("Bad user name sent for authentication");
         }
-        if (user.equals(getUserInternal())) {
-            return;
-        }
 
-        Subject subject = new Subject();
-        subject.getPrincipals().add(new FalconPrincipal(user));
+        LOG.info("Logging in {}", user);
+        CurrentUser currentUser = new CurrentUser(user);
+        CURRENT_USER.set(currentUser);
+    }
 
-        try {  // initialize proxy user
-            createProxyUGI(user);
-        } catch (IOException e) {
-            throw new IllegalStateException("Unable to create a proxy user");
+    /**
+     * Captures the entity owner if authenticated user is a super user.
+     *
+     * @param aclOwner entity acl owner
+     * @param aclGroup entity acl group
+     * @throws IOException
+     */
+    public static void proxy(final String aclOwner,
+                             final String aclGroup) throws IOException {
+        if (!isAuthenticated() || StringUtils.isEmpty(aclOwner)) {
+            throw new IllegalStateException("Authentication not done or Bad user name");
         }
 
-        LOG.info("Logging in {}", user);
-        INSTANCE.currentSubject.set(subject);
+        CurrentUser user = CURRENT_USER.get();
+        LOG.info("Authenticated user {} is proxying entity owner {}/{}",
+            user.authenticatedUser, aclOwner, aclGroup);
+        AUDIT.info("Authenticated user {} is proxying entity owner {}/{}",
+            user.authenticatedUser, aclOwner, aclGroup);
+        user.proxyUser = aclOwner;
     }
 
-    public static boolean isAuthenticated() {
-        return getSubject() != null;
+    /**
+     * Clears the context.
+     */
+    public static void clear() {
+        CURRENT_USER.remove();
     }
 
-    public static Subject getSubject() {
-        return INSTANCE.currentSubject.get();
+    /**
+     * Checks if the authenticate method is already called.
+     *
+     * @return true if authenticated user is set else false
+     */
+    public static boolean isAuthenticated() {
+        CurrentUser user = CURRENT_USER.get();
+        return user != null && user.authenticatedUser != null;
     }
 
-    public static String getUser() {
-        String user = getUserInternal();
-        if (user == null) {
+    /**
+     * Returns authenticated user.
+     *
+     * @return logged in authenticated user.
+     */
+    public static String getAuthenticatedUser() {
+        CurrentUser user = CURRENT_USER.get();
+        if (user == null || user.authenticatedUser == null) {
             throw new IllegalStateException("No user logged into the system");
         } else {
-            return user;
+            return user.authenticatedUser;
         }
     }
 
-    private static String getUserInternal() {
-        Subject subject = getSubject();
-        if (subject == null) {
-            return null;
+    /**
+     * Dole out a UGI object for the current authenticated user if authenticated
+     * else return current user.
+     *
+     * @return UGI object
+     * @throws java.io.IOException
+     */
+    public static UserGroupInformation getAuthenticatedUGI() throws IOException {
+        return CurrentUser.isAuthenticated()
+            ? createProxyUGI(getAuthenticatedUser()) : UserGroupInformation.getCurrentUser();
+    }
+
+    /**
+     * Returns the proxy user.
+     *
+     * @return proxy user
+     */
+    public static String getUser() {
+        CurrentUser user = CURRENT_USER.get();
+        if (user == null || user.proxyUser == null) {
+            throw new IllegalStateException("No user logged into the system");
         } else {
-            for (FalconPrincipal principal : subject.
-                    getPrincipals(FalconPrincipal.class)) {
-                return principal.getName();
-            }
-            return null;
+            return user.proxyUser;
         }
     }
 
@@ -104,7 +147,7 @@ public final class CurrentUser {
             new ConcurrentHashMap<String, UserGroupInformation>();
 
     /**
-     * Create a proxy UGI object for the current authenticated user.
+     * Create a proxy UGI object for the proxy user.
      *
      * @param proxyUser logged in user
      * @return UGI object
@@ -123,33 +166,39 @@ public final class CurrentUser {
     }
 
     /**
-     * Dole out a proxy UGI object for the current authenticated user.
+     * Dole out a proxy UGI object for the current authenticated user if authenticated
+     * else return current user.
      *
      * @return UGI object
      * @throws java.io.IOException
      */
     public static UserGroupInformation getProxyUGI() throws IOException {
-        String proxyUser = getUser();
-
-        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
-        if (proxyUgi == null) {
-            // taking care of a race condition, the latest UGI will be discarded
-            proxyUgi = UserGroupInformation.createProxyUser(
-                    proxyUser, UserGroupInformation.getLoginUser());
-            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
-        }
-
-        return proxyUgi;
+        return CurrentUser.isAuthenticated()
+            ? createProxyUGI(getUser()) : UserGroupInformation.getCurrentUser();
     }
 
+    /**
+     * Gets a collection of group names the proxy user belongs to.
+     *
+     * @return group names
+     * @throws IOException
+     */
     public static Set<String> getGroupNames() throws IOException {
         HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUGI().getGroupNames()));
         return Collections.unmodifiableSet(s);
     }
 
+    /**
+     * Returns the primary group name for the proxy user.
+     *
+     * @return primary group name for the proxy user
+     */
     public static String getPrimaryGroupName() {
         try {
-            return getProxyUGI().getPrimaryGroupName();
+            String[] groups = getProxyUGI().getGroupNames();
+            if (groups.length > 0) {
+                return groups[0];
+            }
         } catch (IOException ignore) {
             // ignored
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
index e7895f8..b59718c 100644
--- a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
+++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -21,6 +21,7 @@ package org.apache.falcon.security;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.Validate;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
@@ -31,6 +32,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -104,6 +106,40 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
     }
 
     /**
+     * Determines if the authenticated user is the user who started this process
+     * or belongs to the super user group.
+     *
+     * @param authenticatedUGI UGI
+     * @return true if super user else false.
+     */
+    public boolean isSuperUser(UserGroupInformation authenticatedUGI) {
+        return SUPER_USER.equals(authenticatedUGI.getShortUserName())
+            || (!StringUtils.isEmpty(superUserGroup)
+                    && isUserInGroup(superUserGroup, authenticatedUGI));
+    }
+
+    /**
+     * Checks if authenticated user should proxy the entity acl owner.
+     *
+     * @param authenticatedUGI  proxy ugi for the authenticated user.
+     * @param aclOwner          entity ACL Owner.
+     * @param aclGroup          entity ACL group.
+     * @throws IOException
+     */
+    @Override
+    public boolean shouldProxy(UserGroupInformation authenticatedUGI,
+                               final String aclOwner,
+                               final String aclGroup) throws IOException {
+        Validate.notNull(authenticatedUGI, "User cannot be empty or null");
+        Validate.notEmpty(aclOwner, "User cannot be empty or null");
+        Validate.notEmpty(aclGroup, "Group cannot be empty or null");
+
+        return isSuperUser(authenticatedUGI)
+            || (!isUserACLOwner(authenticatedUGI.getShortUserName(), aclOwner)
+                    && isUserInGroup(aclGroup, authenticatedUGI));
+    }
+
+    /**
      * Determines if the authenticated user is authorized to execute the action on the resource.
      * Throws an exception if not authorized.
      *
@@ -111,45 +147,38 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
      * @param action     action being authorized on resource and entity if applicable
      * @param entityType entity type in question, not for admin resource
      * @param entityName entity name in question, not for admin resource
-     * @param proxyUgi   proxy ugi for the authenticated user
+     * @param authenticatedUGI   proxy ugi for the authenticated user
      * @throws org.apache.hadoop.security.authorize.AuthorizationException
      */
     @Override
     public void authorizeResource(String resource, String action,
                                   String entityType, String entityName,
-                                  UserGroupInformation proxyUgi) throws AuthorizationException {
+                                  UserGroupInformation authenticatedUGI)
+        throws AuthorizationException, EntityNotRegisteredException {
+
         Validate.notEmpty(resource, "Resource cannot be empty or null");
         Validate.isTrue(RESOURCES.contains(resource), "Illegal resource: " + resource);
         Validate.notEmpty(action, "Action cannot be empty or null");
 
-        if (isSuperUser(proxyUgi)) {
-            return;
-        }
+        try {
+            if (isSuperUser(authenticatedUGI)) {
+                return;
+            }
 
-        if ("admin".equals(resource)) {
-            if (!"version".equals(action)) {
-                authorizeAdminResource(proxyUgi, action);
+            if ("admin".equals(resource)) {
+                if (!"version".equals(action)) {
+                    authorizeAdminResource(authenticatedUGI, action);
+                }
+            } else if ("entities".equals(resource) || "instance".equals(resource)) {
+                authorizeEntityResource(authenticatedUGI, entityName, entityType, action);
+            } else if ("metadata".equals(resource)) {
+                authorizeMetadataResource(authenticatedUGI, action);
             }
-        } else if ("entities".equals(resource) || "instance".equals(resource)) {
-            authorizeEntityResource(proxyUgi, entityName, entityType, action);
-        } else if ("metadata".equals(resource)) {
-            authorizeMetadataResource(proxyUgi.getShortUserName(), action);
+        } catch (IOException e) {
+            throw new AuthorizationException(e);
         }
     }
 
-    /**
-     * Determines if the authenticated user is the user who started this process
-     * or belongs to the super user group.
-     *
-     * @param authenticatedUGI UGI
-     * @return true if super user else false.
-     */
-    public boolean isSuperUser(UserGroupInformation authenticatedUGI) {
-        return SUPER_USER.equals(authenticatedUGI.getShortUserName())
-                || (!StringUtils.isEmpty(superUserGroup)
-                    && isUserInGroup(superUserGroup, authenticatedUGI));
-    }
-
     protected Set<String> getGroupNames(UserGroupInformation proxyUgi) {
         HashSet<String> s = new HashSet<String>(Arrays.asList(proxyUgi.getGroupNames()));
         return Collections.unmodifiableSet(s);
@@ -163,22 +192,26 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
      * @param entityType entity in question, applicable for entities and instance resource
      * @param acl        entity ACL
      * @param action     action being authorized on resource and entity if applicable
-     * @param proxyUgi   proxy ugi for the authenticated user
+     * @param authenticatedUGI   proxy ugi for the authenticated user
      * @throws org.apache.hadoop.security.authorize.AuthorizationException
      */
     @Override
-    public void authorizeEntity(String entityName, String entityType,
-                                AccessControlList acl, String action,
-                                UserGroupInformation proxyUgi) throws AuthorizationException {
-        String authenticatedUser = proxyUgi.getShortUserName();
-        LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, type{}",
-                authenticatedUser, action, entityName, entityType);
-
-        if (isSuperUser(proxyUgi)) {
-            return;
-        }
+    public void authorizeEntity(String entityName, String entityType, AccessControlList acl,
+                                String action, UserGroupInformation authenticatedUGI)
+        throws AuthorizationException {
+
+        try {
+            LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, type{}",
+                    authenticatedUGI.getShortUserName(), action, entityName, entityType);
 
-        checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUser, proxyUgi);
+            if (isSuperUser(authenticatedUGI)) {
+                return;
+            }
+
+            checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUGI);
+        } catch (IOException e) {
+            throw new AuthorizationException(e);
+        }
     }
 
     /**
@@ -188,15 +221,14 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
      * @param aclOwner          entity ACL Owner.
      * @param aclGroup          entity ACL group.
      * @param action            action being authorized on resource and entity if applicable.
-     * @param authenticatedUser authenticated user name.
-     * @param proxyUgi          proxy ugi for the authenticated user.
+     * @param authenticatedUGI          proxy ugi for the authenticated user.
      * @throws AuthorizationException
      */
-    protected void checkUser(String entityName, String aclOwner, String aclGroup,
-                             String action, String authenticatedUser,
-                             UserGroupInformation proxyUgi) throws AuthorizationException {
+    protected void checkUser(String entityName, String aclOwner, String aclGroup, String action,
+                             UserGroupInformation authenticatedUGI) throws AuthorizationException {
+        final String authenticatedUser = authenticatedUGI.getShortUserName();
         if (isUserACLOwner(authenticatedUser, aclOwner)
-                || isUserInGroup(aclGroup, proxyUgi)) {
+                || isUserInGroup(aclGroup, authenticatedUGI)) {
             return;
         }
 
@@ -237,15 +269,15 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
     /**
      * Check if the user has admin privileges.
      *
-     * @param proxyUgi proxy ugi for the authenticated user.
+     * @param authenticatedUGI proxy ugi for the authenticated user.
      * @param action   admin action on the resource.
      * @throws AuthorizationException if the user does not have admin privileges.
      */
-    protected void authorizeAdminResource(UserGroupInformation proxyUgi,
+    protected void authorizeAdminResource(UserGroupInformation authenticatedUGI,
                                           String action) throws AuthorizationException {
-        final String authenticatedUser = proxyUgi.getShortUserName();
+        final String authenticatedUser = authenticatedUGI.getShortUserName();
         LOG.debug("Authorizing user={} for admin, action={}", authenticatedUser, action);
-        if (adminUsers.contains(authenticatedUser) || isUserInAdminGroups(proxyUgi)) {
+        if (adminUsers.contains(authenticatedUser) || isUserInAdminGroups(authenticatedUGI)) {
             return;
         }
 
@@ -268,35 +300,44 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         return isUserGroupInAdmin;
     }
 
-    protected void authorizeEntityResource(UserGroupInformation proxyUgi,
+    protected void authorizeEntityResource(UserGroupInformation authenticatedUGI,
                                            String entityName, String entityType,
-                                           String action) throws AuthorizationException {
+                                           String action)
+        throws AuthorizationException, EntityNotRegisteredException {
+
         Validate.notEmpty(entityType, "Entity type cannot be empty or null");
         LOG.debug("Authorizing authenticatedUser={} against entity/instance action={}, "
                 + "entity name={}, entity type={}",
-                proxyUgi.getShortUserName(), action, entityName, entityType);
+                authenticatedUGI.getShortUserName(), action, entityName, entityType);
 
         if (entityName != null) { // lifecycle actions
             Entity entity = getEntity(entityName, entityType);
             authorizeEntity(entity.getName(), entity.getEntityType().name(),
-                    entity.getACL(), action, proxyUgi);
+                entity.getACL(), action, authenticatedUGI);
         } else {
             // non lifecycle actions, lifecycle actions with null entity will validate later
             LOG.info("Authorization for action={} will be done in the API", action);
         }
     }
 
-    private Entity getEntity(String entityName, String entityType) throws AuthorizationException {
+    private Entity getEntity(String entityName, String entityType)
+        throws EntityNotRegisteredException, AuthorizationException {
+
         try {
             EntityType type = EntityType.valueOf(entityType.toUpperCase());
             return EntityUtil.getEntity(type, entityName);
         } catch (FalconException e) {
-            throw new AuthorizationException(e);
+            if (e instanceof EntityNotRegisteredException) {
+                throw (EntityNotRegisteredException) e;
+            } else {
+                throw new AuthorizationException(e);
+            }
         }
     }
 
-    protected void authorizeMetadataResource(String authenticatedUser, String action) {
-        LOG.debug("User {} authorized for action {} ", authenticatedUser, action);
+    protected void authorizeMetadataResource(UserGroupInformation authenticatedUGI,
+                                             String action) throws AuthorizationException {
+        LOG.debug("User {} authorized for action {} ", authenticatedUGI.getShortUserName(), action);
         // todo - read-only for all metadata but needs to be implemented
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/security/FalconPrincipal.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/FalconPrincipal.java b/common/src/main/java/org/apache/falcon/security/FalconPrincipal.java
deleted file mode 100644
index ab93e1a..0000000
--- a/common/src/main/java/org/apache/falcon/security/FalconPrincipal.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import java.security.Principal;
-
-/**
- * Falcon JAAS principal object.
- */
-public class FalconPrincipal implements Principal {
-
-    private final String user;
-
-    public FalconPrincipal(String user) {
-        this.user = user;
-    }
-
-    @Override
-    public String getName() {
-        return user;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 7782c71..5a86ae3 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -124,8 +124,7 @@ public final class UpdateHelper {
                 ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
             Path checksum = new Path(bundleAppPath, EntityUtil.PROCESS_CHECKSUM_FILE);
             Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                conf, process.getACL());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             if (!fs.exists(checksum)) {
                 //Update if there is no checksum file(for backward compatibility)
                 return true;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
index 187d85e..9a3f365 100644
--- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
+++ b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.falcon.security;
 
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 /**
@@ -27,18 +29,89 @@ import org.testng.annotations.Test;
  */
 public class CurrentUserTest {
 
+    @AfterMethod
+    public void cleanUp() {
+        CurrentUser.clear();
+    }
+
     @Test(threadPoolSize = 10, invocationCount = 10, timeOut = 10000)
     public void testGetUser() throws Exception {
         String id = Long.toString(System.nanoTime());
         CurrentUser.authenticate(id);
+        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), id);
         Assert.assertEquals(CurrentUser.getUser(), id);
     }
 
+    @Test (expectedExceptions = IllegalStateException.class)
+    public void testAuthenticateBadUser() throws Exception {
+        CurrentUser.authenticate("");
+    }
+
+    @Test (expectedExceptions = IllegalStateException.class)
+    public void testGetAuthenticatedUserInvalid() throws Exception {
+        CurrentUser.getAuthenticatedUser();
+    }
+
+    @Test (expectedExceptions = IllegalStateException.class)
+    public void testGetUserInvalid() throws Exception {
+        CurrentUser.getUser();
+    }
+
+    @Test (expectedExceptions = IllegalStateException.class)
+    public void testProxyBadUser() throws Exception {
+        CurrentUser.authenticate("falcon");
+        CurrentUser.proxy("", "");
+    }
+
+    @Test (expectedExceptions = IllegalStateException.class)
+    public void testProxyWithNoAuth() throws Exception {
+        CurrentUser.proxy("falcon", "falcon");
+    }
+
     @Test
-    public void testGetProxyUser() throws Exception {
+    public void testGetProxyUserForAuthenticatedUser() throws Exception {
         CurrentUser.authenticate("proxy");
         UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
         Assert.assertNotNull(proxyUgi);
         Assert.assertEquals(proxyUgi.getUserName(), "proxy");
     }
+
+    @Test
+    public void testProxy() throws Exception {
+        CurrentUser.authenticate("real");
+
+        CurrentUser.proxy(EntityBuilderTestUtil.USER, "users");
+        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
+        Assert.assertNotNull(proxyUgi);
+        Assert.assertEquals(proxyUgi.getUserName(), EntityBuilderTestUtil.USER);
+
+        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "real");
+        Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER);
+    }
+
+    @Test
+    public void testProxySameUser() throws Exception {
+        CurrentUser.authenticate("falcon");
+
+        CurrentUser.proxy("falcon", "users");
+        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
+        Assert.assertNotNull(proxyUgi);
+        Assert.assertEquals(proxyUgi.getUserName(), "falcon");
+
+        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "falcon");
+        Assert.assertEquals(CurrentUser.getUser(), "falcon");
+    }
+
+    @Test
+    public void testSuperUser() throws Exception {
+        CurrentUser.authenticate(EntityBuilderTestUtil.USER);
+        CurrentUser.proxy("proxy", "users");
+
+        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
+        Assert.assertNotNull(proxyUgi);
+        Assert.assertEquals(proxyUgi.getUserName(), "proxy");
+
+        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), EntityBuilderTestUtil.USER);
+        Assert.assertEquals(CurrentUser.getUser(), "proxy");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
index 0a40359..162be12 100644
--- a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
+++ b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.security;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
@@ -344,7 +345,7 @@ public class DefaultAuthorizationProviderTest {
                 processEntity.getACL(), "submit", proxyUgi);
     }
 
-    @Test (expectedExceptions = AuthorizationException.class)
+    @Test (expectedExceptions = EntityNotRegisteredException.class)
     public void testAuthorizeResourceOperationsBadEntity() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
         StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index 6844f31..2e5dffb 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -53,8 +53,7 @@ public final class LogProvider {
         try {
             Configuration conf = ClusterHelper.getConfiguration(clusterObj);
             // fs on behalf of the end user.
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                conf, entity.getACL());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             String resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance, runId);
             // if runId param is not resolved, i.e job is killed or not started or running
             if (resolvedRunId.equals("-")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 4108839..e341fb8 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -140,7 +140,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
             }
 
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    outPath.toUri(), ClusterHelper.getConfiguration(cluster), entity.getACL());
+                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
             OutputStream out = fs.create(outPath);
             try {
                 marshaller.marshal(jaxbElement, out);
@@ -261,7 +261,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
     protected void copySharedLibs(Cluster cluster, Path libPath) throws FalconException {
         try {
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    libPath.toUri(), ClusterHelper.getConfiguration(cluster), entity.getACL());
+                    libPath.toUri(), ClusterHelper.getConfiguration(cluster));
             SharedLibraryHostingService.pushLibsToHDFS(
                     fs, StartupProperties.get().getProperty("system.lib.location"),
                     libPath, FALCON_JAR_FILTER);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index f7fed45..771295f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -211,7 +211,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
         String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-            ClusterHelper.getConfiguration(cluster), entity.getACL());
+            ClusterHelper.getConfiguration(cluster));
         try {
             addExtensionJars(fs, new Path(libext), wf);
             addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
@@ -268,8 +268,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
 
         try {
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf,
-                entity.getACL());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
 
             // create hive conf to stagingDir
             Path confPath = new Path(workflowPath + "/conf");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index c578005..c5366dc 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -281,7 +281,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
                                         Path buildPath) throws FalconException {
         Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf, entity.getACL());
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
 
         try {
             // copy import export scripts to stagingDir

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index 3e54bd2..8691ee5 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -121,7 +121,7 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
     private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException {
         try {
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster), entity.getACL());
+                ClusterHelper.getConfiguration(cluster));
 
             //Copy user workflow and lib to staging dir
             Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 24437fc..d271695 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -223,7 +223,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
 
         try {
             final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster), entity.getACL());
+                ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {  // File, not a Dir
                 archiveList.add(libPath.toString());
                 return;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index d598097..622238a 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -27,8 +27,6 @@ import org.apache.oozie.client.ProxyOozieClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Factory for providing appropriate oozie client.
  */
@@ -37,8 +35,6 @@ public final class OozieClientFactory {
     private static final Logger LOG = LoggerFactory.getLogger(OozieClientFactory.class);
     private static final String LOCAL_OOZIE = "local";
 
-    private static final ConcurrentHashMap<String, ProxyOozieClient> CACHE =
-            new ConcurrentHashMap<String, ProxyOozieClient>();
     private static volatile boolean localInitialized = false;
 
     private OozieClientFactory() {}
@@ -48,13 +44,8 @@ public final class OozieClientFactory {
 
         assert cluster != null : "Cluster cant be null";
         String oozieUrl = ClusterHelper.getOozieUrl(cluster);
-        if (!CACHE.containsKey(oozieUrl)) {
-            ProxyOozieClient ref = getClientRef(oozieUrl);
-            LOG.info("Caching Oozie client object for {}", oozieUrl);
-            CACHE.putIfAbsent(oozieUrl, ref);
-        }
-
-        return CACHE.get(oozieUrl);
+        LOG.info("Creating Oozie client object for {}", oozieUrl);
+        return getClientRef(oozieUrl);
     }
 
     public static ProxyOozieClient get(String clusterName) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index d9fe8c1..54cab51 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -58,8 +58,7 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener {
             LOG.info("Deleting entity path {} on cluster {}", entityPath, clusterName);
 
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf,
-                entity.getACL());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             if (fs.exists(entityPath) && !fs.delete(entityPath, true)) {
                 throw new FalconException("Unable to cleanup entity path: " + entityPath);
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 89bebe7..771839a 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -178,7 +178,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
         try {
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster), entity.getACL());
+                ClusterHelper.getConfiguration(cluster));
             HadoopClientFactory.mkdirsWithDefaultPerms(fs, stagingPath);
             HadoopClientFactory.mkdirsWithDefaultPerms(fs, logPath);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
index a7c6960..fcd8ca7 100644
--- a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
@@ -73,9 +73,10 @@ public class ProxyOozieClient extends AuthOozieClient {
         final URL decoratedUrl = decorateUrlWithUser(url);
         LOG.debug("ProxyOozieClient.createConnection: u={}, m={}", url, method);
 
-        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        // Login User "falcon" has the kerberos credentials
+        UserGroupInformation loginUserUGI = UserGroupInformation.getLoginUser();
         try {
-            return currentUser.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
+            return loginUserUGI.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
                 public HttpURLConnection run() throws Exception {
                     HttpURLConnection conn = ProxyOozieClient.super.createConnection(decoratedUrl, method);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 3308d72..ceacb4e 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -342,12 +342,23 @@ public abstract class AbstractEntityManager {
                             + "Can't be submitted again. Try removing before submitting.");
         }
 
+        tryProxy(entity); // proxy before validating since FS/Oozie needs to be proxied
         validate(entity);
         configStore.publish(entityType, entity);
         LOG.info("Submit successful: ({}): {}", type, entity.getName());
         return entity;
     }
 
+    private void tryProxy(Entity entity) throws IOException, FalconException {
+        final String aclOwner = entity.getACL().getOwner();
+        final String aclGroup = entity.getACL().getGroup();
+        if (SecurityUtil.isAuthorizationEnabled()
+                && SecurityUtil.getAuthorizationProvider().shouldProxy(
+                    CurrentUser.getAuthenticatedUGI(), aclOwner, aclGroup)) {
+            CurrentUser.proxy(aclOwner, aclGroup);
+        }
+    }
+
     /**
      * KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass.
      * existingEntity in config store will have teh decoration and equals check fails
@@ -541,13 +552,13 @@ public abstract class AbstractEntityManager {
 
     protected List<Entity> getEntities(String type, String startDate, String endDate, String cluster,
                                        String filterBy, String filterTags, String orderBy, String sortOrder,
-                                       int offset, int resultsPerPage) throws FalconException {
+                                       int offset, int resultsPerPage) throws FalconException, IOException {
         final Map<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
         final List<String> filterByTags = getFilterByTags(filterTags);
 
         EntityType entityType = EntityType.valueOf(type.toUpperCase());
         Collection<String> entityNames = configStore.getEntities(entityType);
-        if (entityNames == null || entityNames.isEmpty()) {
+        if (entityNames.isEmpty()) {
             return Collections.emptyList();
         }
 
@@ -564,9 +575,12 @@ public abstract class AbstractEntityManager {
                 throw FalconWebException.newException(e1, Response.Status.BAD_REQUEST);
             }
 
-            if (filterEntityByDatesAndCluster(entity, startDate, endDate, cluster)) {
+            if (SecurityUtil.isAuthorizationEnabled() && !isEntityAuthorized(entity)
+                || filterEntityByDatesAndCluster(entity, startDate, endDate, cluster)) {
+                // the user who requested list query has no permission to access this entity. Skip this entity
                 continue;
             }
+            tryProxy(entity);
 
             List<String> tags = EntityUtil.getTags(entity);
             List<String> pipelines = EntityUtil.getPipelines(entity);
@@ -658,11 +672,6 @@ public abstract class AbstractEntityManager {
     private boolean filterEntity(Entity entity, String entityStatus,
                                  Map<String, String> filterByFieldsValues, List<String> filterByTags,
                                  List<String> tags, List<String> pipelines) {
-        if (SecurityUtil.isAuthorizationEnabled() && !isEntityAuthorized(entity)) {
-            // the user who requested list query has no permission to access this entity. Skip this entity
-            return true;
-        }
-
         return !((filterByTags.isEmpty() || !filterEntityByTags(filterByTags, tags))
                 && (filterByFieldsValues.isEmpty()
                 || !filterEntityByFields(entity, filterByFieldsValues, entityStatus, pipelines)));
@@ -671,11 +680,12 @@ public abstract class AbstractEntityManager {
 
     protected boolean isEntityAuthorized(Entity entity) {
         try {
-            SecurityUtil.getAuthorizationProvider().authorizeResource("entities", "list",
-                    entity.getEntityType().toString(), entity.getName(), CurrentUser.getProxyUGI());
+            SecurityUtil.getAuthorizationProvider().authorizeEntity(entity.getName(),
+                    entity.getEntityType().toString(), entity.getACL(),
+                    "list", CurrentUser.getAuthenticatedUGI());
         } catch (Exception e) {
-            LOG.error("Authorization failed for entity=" + entity.getName()
-                    + " for user=" + CurrentUser.getUser(), e);
+            LOG.info("Authorization failed for entity=" + entity.getName()
+                + " for user=" + CurrentUser.getUser(), e);
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java
index 9a9b400..c1e54b8 100644
--- a/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuditFilter.java
@@ -69,6 +69,7 @@ public class FalconAuditFilter implements Filter {
             // put the request id into the response so users can trace logs for this request
             ((HttpServletResponse) response).setHeader(REQUEST_ID, requestId);
             NDC.pop();
+            CurrentUser.clear();
         }
     }
 
@@ -85,16 +86,13 @@ public class FalconAuditFilter implements Filter {
     }
 
     private String getUserFromRequest(HttpServletRequest httpRequest) {
-        try {
-            // get the authenticated user
-            return CurrentUser.getUser();
-        } catch (IllegalStateException ignore) {
-            // ignore since the user authentication might have failed
+        if (CurrentUser.isAuthenticated()) {
+            return CurrentUser.getAuthenticatedUser();
+        } else {
+            // look for the user in the request
+            final String userFromRequest = Servlets.getUserFromRequest(httpRequest);
+            return userFromRequest == null ? "UNKNOWN" : userFromRequest;
         }
-
-        // look for the user in the request
-        final String userFromRequest = Servlets.getUserFromRequest(httpRequest);
-        return userFromRequest == null ? "UNKNOWN" : userFromRequest;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
index fa30f0e..fa13a7d 100644
--- a/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
@@ -178,7 +178,7 @@ public class FalconAuthenticationFilter
                         try {
                             NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo());
                             CurrentUser.authenticate(user);
-                            LOG.info("Request from user: {}, URL={}", user,
+                            LOG.info("Request from authenticated user: {}, URL={}", user,
                                     Servlets.getRequestURI(httpRequest));
 
                             filterChain.doFilter(servletRequest, servletResponse);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
index 3cdb749..a3b7b9c 100644
--- a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
@@ -19,7 +19,14 @@
 package org.apache.falcon.security;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityNotRegisteredException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +38,7 @@ import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 
 /**
@@ -61,34 +69,37 @@ public class FalconAuthorizationFilter implements Filter {
     public void doFilter(ServletRequest request,
                          ServletResponse response,
                          FilterChain filterChain) throws IOException, ServletException {
-        int errorCode = HttpServletResponse.SC_FORBIDDEN;
-        String errorMessage = null;
-        HttpServletResponse httpResponse = (HttpServletResponse) response;
-
         if (isAuthorizationEnabled) {
             HttpServletRequest httpRequest = (HttpServletRequest) request;
             RequestParts requestParts = getUserRequest(httpRequest);
             LOG.info("Authorizing user={} against request={}", CurrentUser.getUser(), requestParts);
 
             try {
+                final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI();
                 authorizationProvider.authorizeResource(requestParts.getResource(),
                         requestParts.getAction(), requestParts.getEntityType(),
-                        requestParts.getEntityName(), CurrentUser.getProxyUGI());
+                        requestParts.getEntityName(), authenticatedUGI);
+                tryProxy(authenticatedUGI,
+                    requestParts.getEntityType(), requestParts.getEntityName());
+                LOG.info("Authorization succeeded for user={}, proxy={}",
+                    authenticatedUGI.getShortUserName(), CurrentUser.getUser());
             } catch (AuthorizationException e) {
-                errorMessage = e.getMessage();
+                sendError((HttpServletResponse) response,
+                    HttpServletResponse.SC_FORBIDDEN, e.getMessage());
+                return; // do not continue processing
+            } catch (EntityNotRegisteredException e) {
+                sendError((HttpServletResponse) response,
+                    HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
+                return; // do not continue processing
             } catch (IllegalArgumentException e) {
-                errorMessage = e.getMessage();
-                errorCode = HttpServletResponse.SC_BAD_REQUEST;
+                sendError((HttpServletResponse) response,
+                    HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
+                return; // do not continue processing
             }
         }
 
-        if (errorMessage == null) {  // continue processing if there was no exception
-            filterChain.doFilter(request, response);
-        } else {
-            if (!httpResponse.isCommitted()) {
-                httpResponse.sendError(errorCode, errorMessage);
-            }
-        }
+        // continue processing if there was no authorization error
+        filterChain.doFilter(request, response);
     }
 
     @Override
@@ -117,6 +128,49 @@ public class FalconAuthorizationFilter implements Filter {
         }
     }
 
+    private void tryProxy(UserGroupInformation authenticatedUGI,
+                          String entityType, String entityName) throws IOException {
+        if (entityType == null || entityName == null) {
+            return;
+        }
+
+        try {
+            EntityType type = EntityType.valueOf(entityType.toUpperCase());
+            Entity entity = EntityUtil.getEntity(type, entityName);
+            if (entity != null && entity.getACL() != null) {
+                final String aclOwner = entity.getACL().getOwner();
+                final String aclGroup = entity.getACL().getGroup();
+                if (authorizationProvider.shouldProxy(
+                        authenticatedUGI, aclOwner, aclGroup)) {
+                    CurrentUser.proxy(aclOwner, aclGroup);
+                }
+            }
+        } catch (FalconException ignore) {
+            // do nothing
+        }
+    }
+
+    private void sendError(HttpServletResponse httpResponse,
+                           int errorCode, String errorMessage) throws IOException {
+        LOG.error("Authorization failed : {}/{}", errorCode, errorMessage);
+        if (!httpResponse.isCommitted()) { // handle authorization error
+            httpResponse.setStatus(errorCode);
+            httpResponse.setContentType(MediaType.APPLICATION_JSON);
+            httpResponse.getOutputStream().print(getJsonResponse(errorCode, errorMessage));
+        }
+    }
+
+    private String getJsonResponse(int errorCode, String errorMessage) throws IOException {
+        try {
+            JSONObject response = new JSONObject();
+            response.put("errorCode", errorCode);
+            response.put("errorMessage", errorMessage);
+            return response.toString();
+        } catch (JSONException e) {
+            throw new IOException(e);
+        }
+    }
+
     private static class RequestParts {
         private final String resource;
         private final String action;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java b/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java
index 03dc792..54b5859 100644
--- a/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java
+++ b/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java
@@ -35,8 +35,10 @@ import org.testng.annotations.Test;
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
 import javax.servlet.FilterConfig;
+import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
 
 /**
  * Test for FalconAuthorizationFilter using mock objects.
@@ -89,8 +91,8 @@ public class FalconAuthorizationFilterTest {
             {"/metadata/lineage/vertices/all"},
             {"/metadata/lineage/vertices/_1"},
             {"/metadata/lineage/vertices/properties/_1"},
-            {"metadata/discovery/process_entity/sample-process/relations"},
-            {"metadata/discovery/process_entity/list?cluster=primary-cluster"},
+            {"/metadata/discovery/process_entity/sample-process/relations"},
+            {"/metadata/discovery/process_entity/list?cluster=primary-cluster"},
         };
     }
 
@@ -146,6 +148,14 @@ public class FalconAuthorizationFilterTest {
             Mockito.when(mockRequest.getRequestURI()).thenReturn("/api" + resource);
             Mockito.when(mockRequest.getPathInfo()).thenReturn(resource);
 
+            Mockito.when(mockResponse.getOutputStream()).thenReturn(
+                new ServletOutputStream() {
+                    @Override
+                    public void write(int b) throws IOException {
+                        System.out.print(b);
+                    }
+                });
+
             try {
                 filter.doFilter(mockRequest, mockResponse, mockChain);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7cb0666b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 6a8017e..c2cb09e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -75,8 +75,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                 LOG.info("Going to delete path: {}", lateLogPath);
                 final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
                 Configuration conf = getConfiguration(storageEndpoint);
-                FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    conf, entity.getACL());
+                FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
                 if (fs.exists(lateLogPath)) {
                     boolean deleted = fs.delete(lateLogPath, true);
                     if (deleted) {


Mime
View raw message