falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [2/2] git commit: FALCON-753 Change the ownership for staging dir to user submitting the feed. Contributed by Venkatesh Seetharam
Date Thu, 16 Oct 2014 23:27:14 GMT
FALCON-753 Change the ownership for staging dir to user submitting the feed. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/15b89bc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/15b89bc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/15b89bc3

Branch: refs/heads/master
Commit: 15b89bc31438fd76071585aacefd9a549d99ac38
Parents: 1caadaf
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Thu Oct 16 16:27:23 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Thu Oct 16 16:27:23 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../falcon/cleanup/AbstractCleanupHandler.java  |  53 ++++---
 .../org/apache/falcon/entity/EntityUtil.java    |  34 +++--
 .../entity/parser/ClusterEntityParser.java      |  98 +++++++------
 .../falcon/entity/parser/EntityParser.java      |   2 +-
 .../falcon/entity/store/ConfigurationStore.java |  28 ++--
 .../falcon/hadoop/HadoopClientFactory.java      |  76 +++++++---
 .../org/apache/falcon/security/CurrentUser.java |  46 +++++-
 .../security/DefaultAuthorizationProvider.java  |  21 +--
 .../workflow/WorkflowExecutionContext.java      |  12 +-
 .../falcon/cleanup/LogCleanupServiceTest.java   |   3 +-
 .../apache/falcon/entity/AbstractTestBase.java  |  49 ++++++-
 .../falcon/hadoop/HadoopClientFactoryTest.java  |   4 +-
 .../apache/falcon/security/CurrentUserTest.java |   2 +-
 docs/src/site/twiki/EntitySpecification.twiki   |   5 +
 docs/src/site/twiki/InstallationSteps.twiki     |   3 +
 docs/src/site/twiki/OnBoarding.twiki            |   4 +
 docs/src/site/twiki/Security.twiki              |  16 ++-
 .../apache/falcon/hadoop/JailedFileSystem.java  |   5 +
 .../falcon/messaging/JMSMessageConsumer.java    |   4 +
 .../falcon/messaging/JMSMessageProducer.java    |   9 +-
 .../org/apache/falcon/logging/JobLogMover.java  |   3 +-
 .../org/apache/falcon/logging/LogProvider.java  |   2 +-
 .../apache/falcon/oozie/OozieBundleBuilder.java |  34 +----
 .../falcon/oozie/OozieCoordinatorBuilder.java   |   3 +-
 .../apache/falcon/oozie/OozieEntityBuilder.java |  22 ++-
 .../OozieOrchestrationWorkflowBuilder.java      |  12 +-
 .../feed/FeedReplicationCoordinatorBuilder.java |   7 +-
 .../oozie/process/ProcessBundleBuilder.java     |   3 +-
 .../ProcessExecutionWorkflowBuilder.java        |   4 +-
 .../service/SharedLibraryHostingService.java    |  60 ++++----
 .../engine/OozieHouseKeepingService.java        |   2 +-
 .../workflow/engine/OozieWorkflowEngine.java    |  31 ++++-
 .../feed/OozieFeedWorkflowBuilderTest.java      | 139 ++++++++++++++++---
 .../falcon/resource/AbstractEntityManager.java  |  54 ++++++-
 .../security/FalconAuthorizationFilter.java     |   2 +-
 .../falcon/replication/FeedReplicator.java      |   7 +-
 .../apache/falcon/latedata/LateDataHandler.java |  17 +--
 .../rerun/handler/AbstractRerunConsumer.java    |   1 +
 .../falcon/rerun/handler/LateRerunConsumer.java |   2 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |   4 +-
 .../apache/falcon/retention/FeedEvictor.java    |  14 +-
 .../falcon/cluster/util/EmbeddedCluster.java    |   2 +-
 .../java/org/apache/falcon/cli/FalconCLIIT.java |   1 +
 .../apache/falcon/late/LateDataHandlerIT.java   |   2 +
 .../falcon/resource/EntityManagerJerseyIT.java  |   4 +-
 .../org/apache/falcon/resource/TestContext.java |  54 +++++++
 .../validation/ClusterEntityValidationIT.java   |  81 +++++++++--
 webapp/src/test/resources/cluster-template.xml  |   2 +-
 webapp/src/test/resources/process-template.xml  |   2 +-
 50 files changed, 744 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d53017f..7af3263 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,8 @@ Apache Falcon (incubating) Change log
 Trunk (Unreleased)
 
   INCOMPATIBLE CHANGES
+   FALCON-753 Change the ownership for staging dir to user submitting the feed
+   (Venkatesh Seetharam)
 
   NEW FEATURES
    FALCON-687 Add hooks for extensions in Audit (Venkatesh Seetharam)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index c315c25..cd088b2 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -29,6 +30,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
@@ -72,9 +74,8 @@ public abstract class AbstractCleanupHandler {
                 "log.cleanup.frequency." + timeunit + ".retention", "days(1)");
     }
 
-    protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster,
+    protected FileStatus[] getAllLogs(FileSystem fs, Cluster cluster,
                                       Entity entity) throws FalconException {
-        FileSystem fs = getFileSystem(cluster);
         FileStatus[] paths;
         try {
             Path logPath = getLogPath(cluster, entity);
@@ -91,27 +92,42 @@ public abstract class AbstractCleanupHandler {
         return new Path(EntityUtil.getLogPath(cluster, entity), getRelativeLogPath());
     }
 
-    protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
-        throws FalconException {
+    private FileSystem getFileSystemAsEntityOwner(Cluster cluster,
+                                                  Entity entity) throws FalconException {
+        try {
+            final AccessControlList acl = EntityUtil.getACL(entity);
+            if (acl == null) {
+                throw new FalconException("ACL for entity " + entity.getName() + " is empty");
+            }
 
-        return HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+            final String proxyUser = acl.getOwner();
+            // user for proxying
+            CurrentUser.authenticate(proxyUser);
+            return HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+        } catch (Exception e) {
+            throw new FalconException(e);
+        }
     }
 
     protected void delete(String clusterName, Entity entity, long retention) throws FalconException {
         Cluster currentCluster = STORE.get(EntityType.CLUSTER, clusterName);
-        if (isClusterInCurrentColo(currentCluster.getColo())) {
-            LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: {}",
-                    entity.getEntityType(), entity.getName(), clusterName, retention);
-            FileStatus[] logs = getAllLogs(currentCluster, entity);
-            deleteInternal(currentCluster, entity, retention, logs);
-        } else {
+        if (!isClusterInCurrentColo(currentCluster.getColo())) {
             LOG.info("Ignoring cleanup for {}: {} in cluster: {} as this does not belong to current colo",
                     entity.getEntityType(), entity.getName(), clusterName);
+            return;
         }
+
+        LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: {}",
+                entity.getEntityType(), entity.getName(), clusterName, retention);
+
+        FileSystem fs = getFileSystemAsEntityOwner(currentCluster, entity);
+        FileStatus[] logs = getAllLogs(fs, currentCluster, entity);
+        deleteInternal(fs, currentCluster, entity, retention, logs);
     }
 
-    protected void deleteInternal(Cluster cluster, Entity entity,
-                                  long retention, FileStatus[] logs) throws FalconException {
+    private void deleteInternal(FileSystem fs, Cluster cluster, Entity entity,
+                                long retention, FileStatus[] logs) throws FalconException {
         if (logs == null || logs.length == 0) {
             LOG.info("Nothing to delete for cluster: {}, entity: {}", cluster.getName(),
                     entity.getName());
@@ -123,13 +139,10 @@ public abstract class AbstractCleanupHandler {
         for (FileStatus log : logs) {
             if (now - log.getModificationTime() > retention) {
                 try {
-                    boolean isDeleted = getFileSystem(cluster).delete(log.getPath(), true);
-                    if (!isDeleted) {
-                        LOG.error("Unable to delete path: {}", log.getPath());
-                    } else {
-                        LOG.info("Deleted path: {}", log.getPath());
-                    }
-                    deleteParentIfEmpty(getFileSystem(cluster), log.getPath().getParent());
+                    boolean isDeleted = fs.delete(log.getPath(), true);
+                    LOG.error(isDeleted ? "Deleted path: {}" : "Unable to delete path: {}",
+                            log.getPath());
+                    deleteParentIfEmpty(fs, log.getPath().getParent());
                 } catch (IOException e) {
                     throw new FalconException(" Unable to delete log file : "
                             + log.getPath() + " for entity " + entity.getName()

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index b8f2d7d..1a10986 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -27,6 +27,7 @@ import org.apache.falcon.Pair;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -197,6 +198,7 @@ public final class EntityUtil {
     }
 
     public static int getParallel(Feed feed) {
+        // todo - how this this supposed to work?
         return 1;
     }
 
@@ -442,7 +444,7 @@ public final class EntityUtil {
         return builder.getWorkflowTag(workflowName);
     }
 
-    public static List<String> getWorkflowNames(Entity entity, String cluster) {
+    public static List<String> getWorkflowNames(Entity entity) {
         switch(entity.getEntityType()) {
         case FEED:
             return Arrays.asList(getWorkflowName(Tag.RETENTION, entity).toString(),
@@ -581,20 +583,16 @@ public final class EntityUtil {
         Entity entity)
         throws FalconException {
         Path basePath = getBaseStagingPath(cluster, entity);
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
         try {
-            FileStatus[] filesArray = fs.listStatus(basePath, new PathFilter() {
+            return fs.listStatus(basePath, new PathFilter() {
                 @Override
                 public boolean accept(Path path) {
-                    if (path.getName().equals("logs")) {
-                        return false;
-                    }
-                    return true;
+                    return !path.getName().equals("logs");
                 }
             });
 
-            return filesArray;
-
         } catch (FileNotFoundException e) {
             LOG.info("Staging path " + basePath + " doesn't exist, entity is not scheduled");
             //Staging path doesn't exist if entity is not scheduled
@@ -755,4 +753,22 @@ public final class EntityUtil {
         return new Pair<Date, Date>(clusterMinStartDate.first, clusterMaxEndDate.first);
     }
 
+    public static AccessControlList getACL(Entity entity) {
+        switch (entity.getEntityType()) {
+        case CLUSTER:
+            return ((org.apache.falcon.entity.v0.cluster.Cluster) entity).getACL();
+
+        case FEED:
+            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
+
+        case PROCESS:
+            return ((org.apache.falcon.entity.v0.process.Process) entity).getACL();
+
+        default:
+            break;
+        }
+
+        throw new IllegalArgumentException("Unknown entity type: " + entity.getEntityType()
+                + " for: " + entity.getName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index fbbdbcb..7c4b99d 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
@@ -52,7 +52,7 @@ import java.io.IOException;
  */
 public class ClusterEntityParser extends EntityParser<Cluster> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ProcessEntityParser.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterEntityParser.class);
 
     public ClusterEntityParser() {
         super(EntityType.CLUSTER);
@@ -122,9 +122,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
                 conf.set(SecurityUtil.NN_PRINCIPAL, nameNodePrincipal);
             }
 
-            // todo: ideally check if the end user has access using createProxiedFileSystem
-            // hftp won't work and bug is logged at HADOOP-10215
-            HadoopClientFactory.get().createFileSystem(conf);
+            HadoopClientFactory.get().createProxiedFileSystem(conf);
         } catch (FalconException e) {
             throw new ValidationException("Invalid storage server or port: " + storageUrl, e);
         }
@@ -135,7 +133,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         LOG.info("Validating execute interface: {}", executeUrl);
 
         try {
-            HadoopClientFactory.validateJobClient(executeUrl);
+            HadoopClientFactory.get().validateJobClient(executeUrl);
         } catch (IOException e) {
             throw new ValidationException("Invalid Execute server or port: " + executeUrl, e);
         }
@@ -231,55 +229,69 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
     }
 
     /**
-     * Validate the locations on the cluster is owned by falcon.
+     * Validate the locations on the cluster exists with appropriate permissions
+     * for the user to write to this directory.
      *
      * @param cluster cluster entity
      * @throws ValidationException
      */
     private void validateLocations(Cluster cluster) throws ValidationException {
+        Configuration conf = ClusterHelper.getConfiguration(cluster);
+        FileSystem fs;
         try {
-            Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
-            for (Location location : cluster.getLocations().getLocations()) {
-                if (location.getName().equals("temp")) {
-                    continue;
-                }
-
-                try {
-                    Path locationPath = new Path(location.getPath());
-                    if (fs.exists(locationPath)) {
-                        FileStatus fileStatus = fs.getFileStatus(locationPath);
-                        checkPathPermissions(locationPath, fileStatus);
-                        checkPathOwner(locationPath, fileStatus);
-                    }
-                } catch (IOException e) {
-                    throw new ValidationException("Unable to validate the location " + location
-                            + "for cluster.", e);
-                }
-            }
+            fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
         } catch (FalconException e) {
-            throw new ValidationException("Unable to validate the locations for cluster.", e);
+            throw new ValidationException(
+                    "Unable to get file system handle for cluster " + cluster.getName(), e);
         }
-    }
 
-    private void checkPathPermissions(Path locationPath,
-                                      FileStatus fileStatus) throws ValidationException {
-        if (fileStatus.getPermission().getUserAction() != FsAction.ALL) {
-            LOG.error("Path {} doesn't have rwx permissions {}",
-                    locationPath, fileStatus.getPermission());
-            throw new ValidationException("Path " + locationPath
-                    + " doesn't have rwx permissions: " + fileStatus.getPermission());
+        for (Location location : cluster.getLocations().getLocations()) {
+            final String locationName = location.getName();
+            if (locationName.equals("temp")) {
+                continue;
+            }
+
+            try {
+                checkPathOwnerAndPermission(cluster.getName(), location.getPath(), fs,
+                        "staging".equals(locationName)
+                                ? HadoopClientFactory.ALL_PERMISSION
+                                : HadoopClientFactory.READ_EXECUTE_PERMISSION);
+            } catch (IOException e) {
+                throw new ValidationException("Unable to validate the location " + location
+                        + " for cluster " + cluster.getName(), e);
+            }
         }
     }
 
-    private void checkPathOwner(Path locationPath,
-                                FileStatus fileStatus) throws IOException, ValidationException {
-        final String owner = UserGroupInformation.getLoginUser().getShortUserName();
-        if (!fileStatus.getOwner().equals(owner)) {
-            LOG.error("Path {} with owner {} doesn't match the actual path owner {}",
-                    locationPath, owner, fileStatus.getOwner());
-            throw new ValidationException("Path [" + locationPath + "] with owner [" + owner
-                    + "] doesn't match the actual path  owner " + fileStatus.getOwner());
+    private void checkPathOwnerAndPermission(String clusterName, String location, FileSystem fs,
+                                             FsPermission expectedPermission)
+        throws IOException, ValidationException {
+
+        Path locationPath = new Path(location);
+        FileStatus fileStatus = fs.getFileStatus(locationPath);
+        if (!fs.exists(locationPath)) {
+            throw new ValidationException("Location " + location
+                    + " for cluster " + clusterName + " must exist.");
         }
+
+        // falcon owns this path on each cluster
+        final String loginUser = UserGroupInformation.getLoginUser().getShortUserName();
+        final String locationOwner = fileStatus.getOwner();
+        if (!locationOwner.equals(loginUser)) {
+            LOG.error("Location {} has owner {}, should be the process user {}",
+                    locationPath, locationOwner, loginUser);
+            throw new ValidationException("Path [" + locationPath + "] has owner [" + locationOwner
+                    + "], should be the process user " + loginUser);
+        }
+
+        if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) {
+            LOG.error("Location {} has permissions {}, should be {}",
+                    locationPath, fileStatus.getPermission(), expectedPermission);
+            throw new ValidationException("Path " + locationPath + " has permissions: "
+                    + fileStatus.getPermission() + ", should be " + expectedPermission);
+        }
+
+        // try to list to see if the user is able to write to this folder
+        fs.listStatus(locationPath);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index 8a3f669..ac58280 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -119,7 +119,7 @@ public abstract class EntityParser<T extends Entity> {
                              AccessControlList acl) throws AuthorizationException {
         try {
             SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName,
-                    getEntityType().name(), acl, "validate", CurrentUser.getProxyUgi());
+                    getEntityType().name(), acl, "validate", CurrentUser.getProxyUGI());
         } catch (FalconException e) {
             throw new AuthorizationException(e);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 25fc21b..62a3e5b 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -58,22 +58,16 @@ public final class ConfigurationStore implements FalconService {
     private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
     private static final String UTF_8 = CharEncoding.UTF_8;
 
-    private static final ConfigurationStore STORE = new ConfigurationStore();
+    private static final FsPermission STORE_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 
     private Set<ConfigurationChangeListener> listeners = new LinkedHashSet<ConfigurationChangeListener>();
 
     private ThreadLocal<Entity> updatesInProgress = new ThreadLocal<Entity>();
 
-    public static ConfigurationStore get() {
-        return STORE;
-    }
-
     private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary
         = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>();
 
-    private final FileSystem fs;
-    private final Path storePath;
-
     private static final Entity NULL = new Entity() {
         @Override
         public String getName() {
@@ -81,6 +75,15 @@ public final class ConfigurationStore implements FalconService {
         }
     };
 
+    private static final ConfigurationStore STORE = new ConfigurationStore();
+
+    public static ConfigurationStore get() {
+        return STORE;
+    }
+
+    private final FileSystem fs;
+    private final Path storePath;
+
     private ConfigurationStore() {
         for (EntityType type : EntityType.values()) {
             dictionary.put(type, new ConcurrentHashMap<String, Entity>());
@@ -98,13 +101,12 @@ public final class ConfigurationStore implements FalconService {
      */
     private FileSystem initializeFileSystem() {
         try {
-            FileSystem fileSystem = HadoopClientFactory.get().createFileSystem(storePath.toUri());
+            FileSystem fileSystem =
+                    HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
             if (!fileSystem.exists(storePath)) {
                 LOG.info("Creating configuration store directory: {}", storePath);
-                fileSystem.mkdirs(storePath);
                 // set permissions so config store dir is owned by falcon alone
-                FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-                fileSystem.setPermission(storePath, permission);
+                HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
             }
 
             return fileSystem;
@@ -331,7 +333,7 @@ public final class ConfigurationStore implements FalconService {
      */
     private void archive(EntityType type, String name) throws IOException {
         Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type);
-        fs.mkdirs(archivePath);
+        HadoopClientFactory.mkdirs(fs, archivePath, STORE_PERMISSION);
         fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"),
                 new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
         LOG.info("Archived configuration {}/{}", type, name);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index ecdbf14..3011b65 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -26,6 +26,9 @@ import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -44,6 +47,11 @@ public final class HadoopClientFactory {
     public static final String MR_JT_ADDRESS_KEY = "mapreduce.jobtracker.address";
     public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address";
 
+    public static final FsPermission READ_EXECUTE_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE);
+    public static final FsPermission ALL_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+
     private static final HadoopClientFactory INSTANCE = new HadoopClientFactory();
 
     private HadoopClientFactory() {
@@ -61,7 +69,7 @@ public final class HadoopClientFactory {
      * @throws org.apache.falcon.FalconException
      *          if the filesystem could not be created.
      */
-    public FileSystem createFileSystem(final URI uri) throws FalconException {
+    public FileSystem createFalconFileSystem(final URI uri) throws FalconException {
         Validate.notNull(uri, "uri cannot be null");
 
         try {
@@ -76,7 +84,7 @@ public final class HadoopClientFactory {
         }
     }
 
-    public FileSystem createFileSystem(final Configuration conf)
+    public FileSystem createFalconFileSystem(final Configuration conf)
         throws FalconException {
         Validate.notNull(conf, "configuration cannot be null");
 
@@ -90,17 +98,6 @@ public final class HadoopClientFactory {
         }
     }
 
-    public FileSystem createFileSystem(final URI uri, final Configuration conf)
-        throws FalconException {
-        Validate.notNull(uri, "uri cannot be null");
-
-        try {
-            return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf);
-        } catch (IOException e) {
-            throw new FalconException("Exception while getting FileSystem for: " + uri, e);
-        }
-    }
-
     /**
      * Return a FileSystem created with the authenticated proxy user for the specified conf.
      *
@@ -115,7 +112,7 @@ public final class HadoopClientFactory {
 
         String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
         try {
-            return createFileSystem(CurrentUser.getProxyUgi(), new URI(nameNode), conf);
+            return createFileSystem(getProxyUGI(), new URI(nameNode), conf);
         } catch (URISyntaxException e) {
             throw new FalconException("Exception while getting FileSystem for proxy: "
                     + CurrentUser.getUser(), e);
@@ -125,6 +122,32 @@ public final class HadoopClientFactory {
         }
     }
 
+    public FileSystem createProxiedFileSystem(final URI uri) throws FalconException {
+        return createProxiedFileSystem(uri, new Configuration());
+    }
+
+    public FileSystem createProxiedFileSystem(final URI uri,
+                                              final Configuration conf) throws FalconException {
+        Validate.notNull(uri, "uri cannot be null");
+
+        try {
+            return createFileSystem(getProxyUGI(), uri, conf);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for proxy: "
+                    + CurrentUser.getUser(), e);
+        }
+    }
+
+    private UserGroupInformation getProxyUGI() throws IOException {
+        try { // get the authenticated user
+            return CurrentUser.getProxyUGI();
+        } catch (Exception ignore) {
+            // ignore since the user authentication might have failed or in oozie
+        }
+
+        return UserGroupInformation.getCurrentUser();
+    }
+
     /**
      * Return a FileSystem created with the provided user for the specified URI.
      *
@@ -136,8 +159,8 @@ public final class HadoopClientFactory {
      *          if the filesystem could not be created.
      */
     @SuppressWarnings("ResultOfMethodCallIgnored")
-    public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri, final Configuration conf)
-        throws FalconException {
+    public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
+                                       final Configuration conf) throws FalconException {
         Validate.notNull(ugi, "ugi cannot be null");
         Validate.notNull(conf, "configuration cannot be null");
 
@@ -172,7 +195,7 @@ public final class HadoopClientFactory {
      * @param executeUrl jt url or RM url
      * @throws IOException
      */
-    public static void validateJobClient(String executeUrl) throws IOException {
+    public void validateJobClient(String executeUrl) throws IOException {
         final JobConf jobConf = new JobConf();
         jobConf.set(MR_JT_ADDRESS_KEY, executeUrl);
         jobConf.set(YARN_RM_ADDRESS_KEY, executeUrl);
@@ -190,4 +213,23 @@ public final class HadoopClientFactory {
             throw new IOException("Exception creating job client:" + e.getMessage(), e);
         }
     }
+
+    public static FsPermission getDirDefaultPermission(Configuration conf) {
+        return FsPermission.getDirDefault().applyUMask(FsPermission.getUMask(conf));
+    }
+
+    public static FsPermission getFileDefaultPermission(Configuration conf) {
+        return FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf));
+    }
+
+    public static void mkdirsWithDefaultPerms(FileSystem fs, Path path) throws IOException {
+        mkdirs(fs, path, getDirDefaultPermission(fs.getConf()));
+    }
+
+    public static void mkdirs(FileSystem fs, Path path,
+                              FsPermission permission) throws IOException {
+        if (!FileSystem.mkdirs(fs, path, permission)) {
+            throw new IOException("mkdir failed for " + path);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index b7d2c66..cfea143 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -49,7 +49,7 @@ public final class CurrentUser {
 
     private final ThreadLocal<Subject> currentSubject = new ThreadLocal<Subject>();
 
-    public static void authenticate(String user) {
+    public static void authenticate(final String user) {
         if (user == null || user.isEmpty()) {
             throw new IllegalStateException("Bad user name sent for authentication");
         }
@@ -59,6 +59,13 @@ public final class CurrentUser {
 
         Subject subject = new Subject();
         subject.getPrincipals().add(new FalconPrincipal(user));
+
+        try {  // initialize proxy user
+            createProxyUGI(user);
+        } catch (IOException e) {
+            throw new IllegalStateException("Unable to create a proxy user");
+        }
+
         LOG.info("Logging in {}", user);
         INSTANCE.currentSubject.set(subject);
     }
@@ -93,19 +100,38 @@ public final class CurrentUser {
             new ConcurrentHashMap<String, UserGroupInformation>();
 
     /**
+     * Create a proxy UGI object for the current authenticated user.
+     *
+     * @param proxyUser logged in user
+     * @return UGI object
+     * @throws IOException
+     */
+    public static UserGroupInformation createProxyUGI(String proxyUser) throws IOException {
+        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
+        if (proxyUgi == null) {
+            // taking care of a race condition, the latest UGI will be discarded
+            proxyUgi = UserGroupInformation.createProxyUser(
+                    proxyUser, UserGroupInformation.getLoginUser());
+            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
+        }
+
+        return proxyUgi;
+    }
+
+    /**
      * Dole out a proxy UGI object for the current authenticated user.
      *
      * @return UGI object
      * @throws java.io.IOException
      */
-    public static UserGroupInformation getProxyUgi() throws IOException {
+    public static UserGroupInformation getProxyUGI() throws IOException {
         String proxyUser = getUser();
 
         UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
         if (proxyUgi == null) {
             // taking care of a race condition, the latest UGI will be discarded
-            proxyUgi = UserGroupInformation
-                    .createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
+            proxyUgi = UserGroupInformation.createProxyUser(
+                    proxyUser, UserGroupInformation.getLoginUser());
             userUgiMap.putIfAbsent(proxyUser, proxyUgi);
         }
 
@@ -113,7 +139,17 @@ public final class CurrentUser {
     }
 
     public static Set<String> getGroupNames() throws IOException {
-        HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUgi().getGroupNames()));
+        HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUGI().getGroupNames()));
         return Collections.unmodifiableSet(s);
     }
+
+    public static String getPrimaryGroupName() {
+        try {
+            return getProxyUGI().getPrimaryGroupName();
+        } catch (IOException ignore) {
+            // ignored
+        }
+
+        return "unknown"; // this can only happen in tests
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
index e90518d..4b7c4a9 100644
--- a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
+++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -25,7 +25,6 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -284,7 +283,7 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         if (entityName != null) { // lifecycle actions
             Entity entity = getEntity(entityName, entityType);
             authorizeEntity(entity.getName(), entity.getEntityType().name(),
-                    getACL(entity), action, proxyUgi);
+                    EntityUtil.getACL(entity), action, proxyUgi);
         } else {
             // non lifecycle actions, lifecycle actions with null entity will validate later
             LOG.info("Authorization for action={} will be done in the API", action);
@@ -300,24 +299,6 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         }
     }
 
-    protected AccessControlList getACL(Entity entity) throws AuthorizationException {
-        switch (entity.getEntityType()) {
-        case CLUSTER:
-            return ((Cluster) entity).getACL();
-
-        case FEED:
-            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
-
-        case PROCESS:
-            return ((org.apache.falcon.entity.v0.process.Process) entity).getACL();
-
-        default:
-            break;
-        }
-
-        throw new AuthorizationException("Cannot get owner for entity: " + entity.getName());
-    }
-
     protected void authorizeLineageResource(String authenticatedUser, String action) {
         LOG.debug("User {} authorized for action {} ", authenticatedUser, action);
         // todo - do nothing for now, read-only for all

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index f3e643e..ebd4dd4 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -30,8 +30,6 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -285,13 +283,8 @@ public class WorkflowExecutionContext {
         OutputStream out = null;
         Path file = new Path(contextFile);
         try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
             out = fs.create(file);
-
-            // making sure falcon can read this file
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(file, permission);
-
             out.write(JSONValue.toJSONString(context).getBytes());
         } catch (IOException e) {
             throw new FalconException("Error serializing context to: " + contextFile,  e);
@@ -315,7 +308,8 @@ public class WorkflowExecutionContext {
     public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException {
         try {
             Path lineageDataPath = new Path(contextFile); // file has 777 permissions
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(lineageDataPath.toUri());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    lineageDataPath.toUri());
 
             BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(lineageDataPath)));
             return new WorkflowExecutionContext((Map<WorkflowExecutionArgs, String>) JSONValue.parse(in));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index 6ad742e..8e2e544 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -69,7 +70,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
     @Override
     @BeforeClass
     public void setup() throws Exception {
-        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster", CurrentUser.getUser());
         conf = dfsCluster.getConf();
         fs = dfsCluster.getFileSystem();
         fs.delete(new Path("/"), true);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 2d41661..a1319b8 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -91,6 +91,9 @@ public class AbstractTestBase {
     }
 
     protected void storeEntity(EntityType type, String name) throws Exception {
+        final String proxyUser = CurrentUser.getUser();
+        final String defaultGroupName = CurrentUser.getPrimaryGroupName();
+
         Unmarshaller unmarshaller = type.getUnmarshaller();
         store = ConfigurationStore.get();
         store.remove(type, name);
@@ -100,12 +103,16 @@ public class AbstractTestBase {
             cluster.setName(name);
             ClusterHelper.getInterface(cluster, Interfacetype.WRITE)
                     .setEndpoint(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+            decorateACL(proxyUser, defaultGroupName, cluster);
+
             store.publish(type, cluster);
             break;
 
         case FEED:
             Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(FEED_XML));
             feed.setName(name);
+            decorateACL(proxyUser, defaultGroupName, feed);
+
             store.publish(type, feed);
             break;
 
@@ -117,12 +124,52 @@ public class AbstractTestBase {
             if (!fs.exists(new Path(process.getWorkflow() + "/lib"))) {
                 fs.mkdirs(new Path(process.getWorkflow() + "/lib"));
             }
+
+            decorateACL(proxyUser, defaultGroupName, process);
+
             store.publish(type, process);
             break;
         default:
         }
     }
 
+    private void decorateACL(String proxyUser, String defaultGroupName, Cluster cluster) {
+        if (cluster.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.cluster.ACL clusterACL =
+                new org.apache.falcon.entity.v0.cluster.ACL();
+        clusterACL.setOwner(proxyUser);
+        clusterACL.setGroup(defaultGroupName);
+        cluster.setACL(clusterACL);
+    }
+
+    private void decorateACL(String proxyUser, String defaultGroupName, Feed feed) {
+        if (feed.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.feed.ACL feedACL =
+                new org.apache.falcon.entity.v0.feed.ACL();
+        feedACL.setOwner(proxyUser);
+        feedACL.setGroup(defaultGroupName);
+        feed.setACL(feedACL);
+    }
+
+    private void decorateACL(String proxyUser, String defaultGroupName,
+                             Process process) {
+        if (process.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.process.ACL processACL =
+                new org.apache.falcon.entity.v0.process.ACL();
+        processACL.setOwner(proxyUser);
+        processACL.setGroup(defaultGroupName);
+        process.setACL(processACL);
+    }
+
     public void setup() throws Exception {
         store = ConfigurationStore.get();
         for (EntityType type : EntityType.values()) {
@@ -147,7 +194,7 @@ public class AbstractTestBase {
 
     // assumes there will always be at least one group for a logged in user
     protected String getGroupName() throws IOException {
-        String[] groupNames = CurrentUser.getProxyUgi().getGroupNames();
+        String[] groupNames = CurrentUser.getProxyUGI().getGroupNames();
         System.out.println("groupNames = " + Arrays.asList(groupNames));
         return groupNames[0];
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
index 8f25f57..1cb08ac 100644
--- a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
+++ b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
@@ -64,7 +64,7 @@ public class HadoopClientFactoryTest {
             Configuration conf = embeddedCluster.getConf();
             URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
             Assert.assertNotNull(uri);
-            HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, conf);
+            HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUGI(), uri, conf);
             Assert.fail("Impersonation should have failed.");
         } catch (Exception e) {
             Assert.assertEquals(e.getCause().getClass(), RemoteException.class);
@@ -99,7 +99,7 @@ public class HadoopClientFactoryTest {
         Assert.assertNotNull(uri);
 
         CurrentUser.authenticate(System.getProperty("user.name"));
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, conf);
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUGI(), uri, conf);
         Assert.assertNotNull(fs);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
index a1861e1..187d85e 100644
--- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
+++ b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
@@ -37,7 +37,7 @@ public class CurrentUserTest {
     @Test
     public void testGetProxyUser() throws Exception {
         CurrentUser.authenticate("proxy");
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUgi();
+        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
         Assert.assertNotNull(proxyUgi);
         Assert.assertEquals(proxyUgi.getUserName(), "proxy");
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index a710dfd..b0dfb7f 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -69,6 +69,11 @@ Location has the name and the path, name is the type of locations like staging,
 and path is the hdfs path for each location.
 Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon
 should have read/write/execute permission on these locations.
+These locations MUST be created prior to submitting a cluster entity to Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute permissions so multiple
+users can write to this location
+*working* must have 755 permissions and the parent dirs must have execute permissions so multiple
+users can read from this location
 
 ---+++ ACL
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 9cddcbb..88f3226 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -261,6 +261,9 @@ src/bin/package.sh <<hadoop-version>> <<oozie-version>>
 bin/falcon-start
 </verbatim>
 Make sure the hadoop and oozie endpoints are according to your setup in examples/entity/filesystem/standalone-cluster.xml
+The cluster locations,staging and working dirs, MUST be created prior to submitting a cluster entity to Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute permissions
+*working* must have 755 permissions and the parent dirs must have execute permissions
 <verbatim>
 bin/falcon entity -submit -type cluster -file examples/entity/filesystem/standalone-cluster.xml
 </verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/OnBoarding.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/OnBoarding.twiki b/docs/src/site/twiki/OnBoarding.twiki
index 75d0d7e..4f49c5a 100644
--- a/docs/src/site/twiki/OnBoarding.twiki
+++ b/docs/src/site/twiki/OnBoarding.twiki
@@ -16,6 +16,10 @@
 ---+++ Sample Pipeline
 ---++++ Cluster   
 Cluster definition that contains end points for name node, job tracker, oozie and jms server:
+The cluster locations MUST be created prior to submitting a cluster entity to Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute permissions
+*working* must have 755 permissions and the parent dirs must have execute permissions
+
 <verbatim>
 <?xml version="1.0"?>
 <!--

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/Security.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Security.twiki b/docs/src/site/twiki/Security.twiki
index 4e33182..f4db28b 100644
--- a/docs/src/site/twiki/Security.twiki
+++ b/docs/src/site/twiki/Security.twiki
@@ -301,11 +301,17 @@ Falcon should be configured to communicate with Prism over TLS in secure mode. I
 
 ---++ Changes to ownership and permissions of directories managed by Falcon
 
-| *Directory*             | *Location*                                                        | *Owner* | *Permissions* |
-| Configuration Store     | ${config.store.uri}                                               | falcon  | 750           |
-| Oozie coord/bundle XMLs | ${cluster.staging-location}/workflows/{entity}/{entity-name}      | falcon  | 644           |
-| Shared libs             | {cluster.working}/{lib,libext}                                    | falcon  | 755           |
-| App logs                | ${cluster.staging-location}/workflows/{entity}/{entity-name}/logs | falcon  | 777           |
+| *Directory*              | *Location*                                                        | *Owner* | *Permissions* |
+| Configuration Store      | ${config.store.uri}                                               | falcon  | 700           |
+| Cluster Staging Location | ${cluster.staging-location}                                       | falcon  | 777           |
+| Cluster Working Location | ${cluster.working-location}                                       | falcon  | 755           |
+| Shared libs              | {cluster.working}/{lib,libext}                                    | falcon  | 755           |
+| Oozie coord/bundle XMLs  | ${cluster.staging-location}/workflows/{entity}/{entity-name}      | $user   | cluster umask |
+| App logs                 | ${cluster.staging-location}/workflows/{entity}/{entity-name}/logs | $user   | cluster umask |
+
+*Note:* Please note that the cluster staging and working locations MUST be created prior to
+submitting a cluster entity to Falcon. Also, note that the the parent dirs must have execute
+permissions.
 
 
 ---++ Backwards compatibility

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
index 72e390e..7156bbd 100644
--- a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
+++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
@@ -142,6 +142,11 @@ public class JailedFileSystem extends FileSystem {
     }
 
     @Override
+    public void setPermission(Path p, FsPermission permission) throws IOException {
+        localFS.setPermission(toLocalPath(p), permission);
+    }
+
+    @Override
     public FileChecksum getFileChecksum(Path f) throws IOException {
         final byte[] md5 = DigestUtils.md5(FileUtils.readFileToByteArray(new File(toLocalPath(f).toString())));
         return new FileChecksum() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 4a0bc2a..300fecf 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -21,6 +21,7 @@ package org.apache.falcon.messaging;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
@@ -99,6 +100,9 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
             WorkflowExecutionContext context = createContext(mapMessage);
             LOG.info("Created context from JMS message {}", context);
 
+            // Login the user so listeners can access FS and WfEngine as this user
+            CurrentUser.authenticate(context.getWorkflowUser());
+
             if (context.hasWorkflowFailed()) {
                 onFailure(context);
             } else if (context.hasWorkflowSucceeded()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 629e6a5..dece932 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,10 +18,10 @@
 
 package org.apache.falcon.messaging;
 
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -35,7 +35,6 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.Topic;
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -184,7 +183,7 @@ public class JMSMessageProducer {
         String[] feedPaths;
         try {
             feedPaths = getFeedPaths();
-        } catch (IOException e) {
+        } catch (Exception e) {
             LOG.error("Error getting instance paths: ", e);
             throw new RuntimeException(e);
         }
@@ -248,7 +247,7 @@ public class JMSMessageProducer {
         message.put(key.getName(), value);
     }
 
-    private String[] getFeedPaths() throws IOException {
+    private String[] getFeedPaths() throws Exception {
         WorkflowExecutionContext.EntityOperations operation = context.getOperation();
         if (operation == WorkflowExecutionContext.EntityOperations.GENERATE
                 || operation == WorkflowExecutionContext.EntityOperations.REPLICATE) {
@@ -258,7 +257,7 @@ public class JMSMessageProducer {
 
         // else case of feed retention
         Path logFile = new Path(context.getLogFile());
-        FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(logFile.toUri());
 
         if (!fs.exists(logFile)) {
             // Evictor Failed without deleting a single path

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 1a08ada..4a22ff2 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -20,6 +20,7 @@ package org.apache.falcon.logging;
 
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +72,7 @@ public class JobLogMover {
 
             Path path = new Path(context.getLogDir() + "/"
                     + String.format("%03d", context.getWorkflowRunId()));
-            FileSystem fs = path.getFileSystem(getConf());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
 
             if (EntityType.FEED.name().equalsIgnoreCase(context.getEntityType())
                     || notUserWorkflowEngineIsOozie(context.getUserWorkflowEngine())) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index 4ed8f52..2e5dffb 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -53,7 +53,7 @@ public final class LogProvider {
         try {
             Configuration conf = ClusterHelper.getConfiguration(clusterObj);
             // fs on behalf of the end user.
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             String resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance, runId);
             // if runId param is not resolved, i.e job is killed or not started or running
             if (resolvedRunId.equals("-")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 82f7251..957300a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.oozie;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -34,8 +33,6 @@ import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.oozie.client.OozieClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +42,6 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 import javax.xml.transform.stream.StreamSource;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
@@ -77,8 +73,6 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         bundle.setName(EntityUtil.getWorkflowName(entity).toString());
         // all the properties are set prior to bundle and coordinators creation
 
-        createLogsDir(cluster, buildPath); //create logs dir
-
         for (Properties coordProps : coords) {
             // add the coordinator to the bundle
             COORDINATOR coord = new COORDINATOR();
@@ -133,23 +127,6 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         return properties;
     }
 
-    private void createLogsDir(Cluster cluster, Path buildPath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(buildPath.toUri(),
-                ClusterHelper.getConfiguration(cluster));
-            Path logsDir = new Path(buildPath.getParent(), "logs");
-            if (!fs.mkdirs(logsDir)) {
-                throw new FalconException("Failed to create " + logsDir);
-            }
-
-            // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(logsDir, permission);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
     protected Path marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
         return marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
             OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
@@ -160,20 +137,17 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
     protected abstract List<Properties> buildCoords(Cluster cluster, Path bundlePath) throws FalconException;
 
     public static BUNDLEAPP unmarshal(Cluster cluster, Path path) throws FalconException {
-        InputStream resourceAsStream = null;
         try {
-            FileSystem fs =
-                HadoopClientFactory.get().createFileSystem(path.toUri(), ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                        path.toUri(), ClusterHelper.getConfiguration(cluster));
             Unmarshaller unmarshaller = OozieUtils.BUNDLE_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement = (JAXBElement<BUNDLEAPP>)
-                unmarshaller.unmarshal(new StreamSource(fs.open(path)), BUNDLEAPP.class);
+            @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement =
+                    unmarshaller.unmarshal(new StreamSource(fs.open(path)), BUNDLEAPP.class);
             return jaxbElement.getValue();
         } catch (JAXBException e) {
             throw new FalconException(e);
         } catch (IOException e) {
             throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index fe2136b..2ceb91e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -120,7 +120,8 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
         props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
         props.put("falconDataOperation", getOperation().name());
 
-        props.put(WorkflowExecutionArgs.LOG_DIR.getName(), getLogDirectory(cluster));
+        props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
+                getStoragePath(EntityUtil.getLogPath(cluster, entity)));
         props.put(OozieClient.EXTERNAL_ID,
             new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
                 "${coord:nominalTime()}").getId());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 1c3085c..e341fb8 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -127,8 +126,8 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
     }
 
-    protected Path marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
-        throws FalconException {
+    protected Path marshal(Cluster cluster, JAXBElement<?> jaxbElement,
+                           JAXBContext jaxbContext, Path outPath) throws FalconException {
         try {
             Marshaller marshaller = jaxbContext.createMarshaller();
             marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
@@ -140,8 +139,8 @@ public abstract class OozieEntityBuilder<T extends Entity> {
                 LOG.debug(writer.getBuffer().toString());
             }
 
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
             OutputStream out = fs.create(outPath);
             try {
                 marshaller.marshal(jaxbElement, out);
@@ -261,8 +260,11 @@ public abstract class OozieEntityBuilder<T extends Entity> {
 
     protected void copySharedLibs(Cluster cluster, Path libPath) throws FalconException {
         try {
-            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
-                libPath, cluster, FALCON_JAR_FILTER);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    libPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            SharedLibraryHostingService.pushLibsToHDFS(
+                    fs, StartupProperties.get().getProperty("system.lib.location"),
+                    libPath, FALCON_JAR_FILTER);
         } catch (IOException e) {
             throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
         }
@@ -279,16 +281,11 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         return prop;
     }
 
-    protected String getLogDirectory(Cluster cluster) {
-        return getStoragePath(new Path(EntityUtil.getBaseStagingPath(cluster, entity), "logs"));
-    }
-
     protected <T> T unmarshal(String template, JAXBContext context, Class<T> cls) throws FalconException {
         InputStream resourceAsStream = null;
         try {
             resourceAsStream = OozieEntityBuilder.class.getResourceAsStream(template);
             Unmarshaller unmarshaller = context.createUnmarshaller();
-            @SuppressWarnings("unchecked")
             JAXBElement<T> jaxbElement = unmarshaller.unmarshal(new StreamSource(resourceAsStream), cls);
             return jaxbElement.getValue();
         } catch (JAXBException e) {
@@ -310,5 +307,4 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         }
         throw new IllegalArgumentException("Unhandled type " + entity.getEntityType());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 2339284..3a3e26e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -210,7 +210,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
 
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
         String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
         try {
             addExtensionJars(fs, new Path(libext), wf);
             addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
@@ -261,12 +262,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     }
 
     // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
-    protected void createHiveConfiguration(Cluster cluster, Path workflowPath, String prefix) throws FalconException {
+    protected void createHiveConfiguration(Cluster cluster, Path workflowPath,
+                                           String prefix) throws FalconException {
         Configuration hiveConf = getHiveCredentialsAsConf(cluster);
 
         try {
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
 
             // create hive conf to stagingDir
             Path confPath = new Path(workflowPath + "/conf");
@@ -277,8 +279,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         }
     }
 
-    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf, String prefix)
-        throws IOException {
+    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
+                                          String prefix) throws IOException {
         OutputStream out = null;
         try {
             out = fs.create(new Path(confPath, prefix + "hive-site.xml"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 801d733..c5366dc 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -281,7 +281,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
                                         Path buildPath) throws FalconException {
         Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
 
         try {
             // copy import export scripts to stagingDir
@@ -298,7 +298,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         }
     }
 
-    private void copyHiveScript(FileSystem fs, Path scriptPath, String resource) throws IOException {
+    private void copyHiveScript(FileSystem fs, Path scriptPath,
+                                String resource) throws IOException {
         OutputStream out = null;
         InputStream in = null;
         try {
@@ -312,7 +313,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     }
 
     protected void persistHiveConfiguration(FileSystem fs, Path confPath,
-        Cluster cluster, String prefix) throws IOException {
+                                            Cluster cluster, String prefix) throws IOException {
         Configuration hiveConf = getHiveCredentialsAsConf(cluster);
         OutputStream out = null;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index 8f97ffa..a38fdf6 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -120,7 +120,8 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
 
     private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException {
         try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
 
             //Copy user workflow and lib to staging dir
             Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 865beaf..2700802 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
@@ -221,7 +222,8 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         }
 
         try {
-            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+            final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {  // File, not a Dir
                 archiveList.add(libPath.toString());
                 return;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index d273d61..9567c5f 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -28,7 +28,6 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -73,40 +72,35 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
         Path lib = new Path(ClusterHelper.getLocation(cluster, "working"), "lib");
         Path libext = new Path(ClusterHelper.getLocation(cluster, "working"), "libext");
         try {
+            FileSystem fs = HadoopClientFactory.get().createFalconFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+
             Properties properties = StartupProperties.get();
-            pushLibsToHDFS(properties.getProperty("system.lib.location"), lib, cluster, NON_FALCON_JAR_FILTER);
-            pushLibsToHDFS(properties.getProperty("libext.paths"), libext, cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.feed.paths"),
-                    new Path(libext, EntityType.FEED.name()) , cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.feed.replication.paths"),
-                    new Path(libext, EntityType.FEED.name() + "/replication"), cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.feed.retention.paths"),
-                    new Path(libext, EntityType.FEED.name() + "/retention"), cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.process.paths"),
-                    new Path(libext, EntityType.PROCESS.name()) , cluster, null);
+            pushLibsToHDFS(fs, properties.getProperty("system.lib.location"), lib,
+                    NON_FALCON_JAR_FILTER);
+            pushLibsToHDFS(fs, properties.getProperty("libext.paths"), libext, null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.feed.paths"),
+                    new Path(libext, EntityType.FEED.name()) , null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.feed.replication.paths"),
+                    new Path(libext, EntityType.FEED.name() + "/replication"), null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.feed.retention.paths"),
+                    new Path(libext, EntityType.FEED.name() + "/retention"), null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.process.paths"),
+                    new Path(libext, EntityType.PROCESS.name()) , null);
         } catch (IOException e) {
             throw new FalconException("Failed to copy shared libs to cluster" + cluster.getName(), e);
         }
     }
 
-    public static void pushLibsToHDFS(String src, Path target, Cluster cluster, FalconPathFilter pathFilter)
-        throws IOException, FalconException {
+    @SuppressWarnings("ConstantConditions")
+    public static void pushLibsToHDFS(FileSystem fs, String src, Path target,
+                                      FalconPathFilter pathFilter) throws IOException, FalconException {
         if (StringUtils.isEmpty(src)) {
             return;
         }
 
         LOG.debug("Copying libs from {}", src);
-        FileSystem fs;
-        try {
-            fs = getFileSystem(cluster);
-            fs.getConf().set("dfs.umaskmode", "022");  // drwxr-xr-x
-        } catch (Exception e) {
-            throw new FalconException("Unable to connect to HDFS: "
-                    + ClusterHelper.getStorageUrl(cluster), e);
-        }
-        if (!fs.exists(target) && !fs.mkdirs(target)) {
-            throw new FalconException("mkdir " + target + " failed");
-        }
+        createTargetPath(fs, target);
 
         for(String srcPaths : src.split(",")) {
             File srcFile = new File(srcPaths);
@@ -133,18 +127,20 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
                     }
                 }
                 fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()), targetFile);
-                LOG.info("Copied {} to {} in {}", file.getAbsolutePath(), targetFile.toString(), fs.getUri());
+                fs.setPermission(targetFile, HadoopClientFactory.READ_EXECUTE_PERMISSION);
+                LOG.info("Copied {} to {} in {}",
+                        file.getAbsolutePath(), targetFile.toString(), fs.getUri());
             }
         }
     }
 
-    // the dir is owned by Falcon but world-readable
-    private static FileSystem getFileSystem(Cluster cluster)
-        throws FalconException, IOException {
-        Configuration conf = ClusterHelper.getConfiguration(cluster);
-        conf.setInt("ipc.client.connect.max.retries", 10);
-
-        return HadoopClientFactory.get().createFileSystem(conf);
+    private static void createTargetPath(FileSystem fs,
+                                         Path target) throws IOException, FalconException {
+        // the dir and files MUST be readable by all users
+        if (!fs.exists(target)
+                && !FileSystem.mkdirs(fs, target, HadoopClientFactory.READ_EXECUTE_PERMISSION)) {
+            throw new FalconException("mkdir " + target + " failed");
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index bbed949..54cab51 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -58,7 +58,7 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener {
             LOG.info("Deleting entity path {} on cluster {}", entityPath, clusterName);
 
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             if (fs.exists(entityPath) && !fs.delete(entityPath, true)) {
                 throw new FalconException("Unable to cleanup entity path: " + entityPath);
             }


Mime
View raw message