falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/2] FALCON-753 Change the ownership for staging dir to user submitting the feed. Contributed by Venkatesh Seetharam
Date Thu, 16 Oct 2014 23:27:13 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 1caadaf20 -> 15b89bc31


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index d5432eb..5c4fea3 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -21,6 +21,7 @@ package org.apache.falcon.workflow.engine;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
@@ -30,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieBundleBuilder;
 import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -63,6 +65,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -146,6 +149,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
             for (String clusterName: schedClusters) {
                 Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
+                prepareEntityBuildPath(entity, cluster);
                 Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
                 Properties properties = builder.build(cluster, buildPath);
                 if (properties == null) {
@@ -160,6 +164,29 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    /**
+     * Prepare the staging and logs dir for this entity with default permissions.
+     *
+     * @param entity  entity
+     * @param cluster cluster entity
+     * @throws FalconException
+     */
+    private void prepareEntityBuildPath(Entity entity, Cluster cluster) throws FalconException {
+        Path stagingPath = EntityUtil.getBaseStagingPath(cluster, entity);
+        Path logPath = EntityUtil.getLogPath(cluster, entity);
+
+        try {
+            HadoopClientFactory.mkdirsWithDefaultPerms(
+                    HadoopClientFactory.get().createProxiedFileSystem(
+                            ClusterHelper.getConfiguration(cluster)), stagingPath);
+            HadoopClientFactory.mkdirsWithDefaultPerms(
+                    HadoopClientFactory.get().createProxiedFileSystem(
+                            ClusterHelper.getConfiguration(cluster)), logPath);
+        } catch (IOException e) {
+            throw new FalconException("Error preparing base staging dirs: " + stagingPath, e);
+        }
+    }
+
     @Override
     public void dryRun(Entity entity, String clusterName) throws FalconException {
         OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
@@ -434,7 +461,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             for (String cluster : clusters) {
                 ProxyOozieClient client = OozieClientFactory.get(cluster);
-                List<String> wfNames = EntityUtil.getWorkflowNames(entity, cluster);
+                List<String> wfNames = EntityUtil.getWorkflowNames(entity);
                 List<WorkflowJob> wfs = getRunningWorkflows(cluster, wfNames);
                 if (wfs != null) {
                     for (WorkflowJob job : wfs) {
@@ -812,6 +839,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    @SuppressWarnings("MagicConstant")
     protected Map<String, List<CoordinatorAction>> getCoordActions(Entity entity, Date start, Date end,
                                                                    List<LifeCycle> lifeCycles) throws FalconException {
         Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
@@ -1071,6 +1099,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return new Date(1000L * 60 * minute + date.getTime());
     }
 
+    @SuppressWarnings("MagicConstant")
     private Date getCoordLastActionTime(CoordinatorJob coord) {
         if (coord.getNextMaterializedTime() != null) {
             Calendar cal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 68911ce..74cc509 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -51,8 +51,11 @@ import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -92,7 +95,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
     @BeforeClass
     public void setUpDFS() throws Exception {
-        CurrentUser.authenticate("falcon");
+        CurrentUser.authenticate(System.getProperty("user.name"));
 
         srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
         String srcHdfsUrl = srcMiniDFS.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
@@ -240,7 +243,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     }
 
     private COORDINATORAPP getCoordinator(EmbeddedCluster cluster, String appPath) throws Exception {
-        return getCoordinator(cluster.getFileSystem(), new Path(StringUtils.removeStart(appPath, "${nameNode}")));
+        return getCoordinator(cluster.getFileSystem(),
+                new Path(StringUtils.removeStart(appPath, "${nameNode}")));
     }
 
     private String getWorkflowAppPath() {
@@ -270,7 +274,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
         OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeed, Tag.REPLICATION);
 
-        List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster, new Path("/alpha/falcon/"));
+        List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster,
+                new Path("/alpha/falcon/"));
         final COORDINATORAPP alphaCoord = getCoordinator(trgMiniDFS,
             alphaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
         Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
@@ -339,9 +344,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     }
 
     public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) {
-        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), workflow.getName());
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(),
+                workflow.getName());
 
-        boolean preProcess = RuntimeProperties.get().getProperty("feed.late.allowed", "true").equalsIgnoreCase("true");
+        boolean preProcess = RuntimeProperties.get().getProperty("feed.late.allowed", "true").equalsIgnoreCase(
+                "true");
         if (preProcess) {
             assertAction(workflow, "pre-processing", true);
         }
@@ -421,8 +428,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
-        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
+        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster,
+                tableFeed);
+        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster,
+                tableFeed);
 
         // verify the replication param that feed replicator depends on
         Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
@@ -516,18 +525,40 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitions('input', 'hive-export')}");
     }
 
-    @Test
-    public void testRetentionCoords() throws Exception {
-        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+    @DataProvider(name = "uMaskOptions")
+    private Object[][] createUMaskOptions() {
+        return new Object[][] {
+            {"000"}, // {FsAction.ALL, FsAction.ALL, FsAction.ALL},
+            {"077"}, // {FsAction.ALL, FsAction.NONE, FsAction.NONE}
+            {"027"}, // {FsAction.ALL, FsAction.READ_EXECUTE, FsAction.NONE}
+            {"017"}, // {FsAction.ALL, FsAction.READ_WRITE, FsAction.NONE}
+            {"012"}, // {FsAction.ALL, FsAction.READ_WRITE, FsAction.READ_EXECUTE}
+            {"022"}, // {FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE}
+        };
+    }
+
+    @Test(dataProvider = "uMaskOptions")
+    public void testRetentionCoords(String umask) throws Exception {
+        FileSystem fs = srcMiniDFS.getFileSystem();
+        Configuration conf = fs.getConf();
+        conf.set("fs.permissions.umask-mode", umask);
+
+        // ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
+        setUmaskInFsConf(srcCluster, umask);
+
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed,
+                srcCluster.getName());
         final Calendar instance = Calendar.getInstance();
         instance.roll(Calendar.YEAR, 1);
         cluster.getValidity().setEnd(instance.getTime());
 
         OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(feed, Tag.RETENTION);
-        List<Properties> coords = builder.buildCoords(srcCluster, new Path("/projects/falcon/"));
+        List<Properties> coords = builder.buildCoords(
+                srcCluster, new Path("/projects/falcon/" + umask));
         COORDINATORAPP coord = getCoordinator(srcMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
 
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(),
+                "${nameNode}/projects/falcon/" + umask + "/RETENTION");
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
@@ -553,16 +584,30 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("feedNames"), feed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
 
-        assertWorkflowRetries(coord);
-        verifyEntityProperties(feed, srcCluster,
-                WorkflowExecutionContext.EntityOperations.DELETE, props);
-        verifyBrokerProperties(srcCluster, props);
+        assertWorkflowRetries(getWorkflowapp(srcMiniDFS.getFileSystem(), coord));
+
+        try {
+            verifyClusterLocationsUMask(srcCluster, fs);
+            verifyWorkflowUMask(fs, coord, umask);
+        } finally {
+            cleanupWorkflowState(fs, coord);
+            FileSystem.closeAll();
+        }
     }
 
     @Test (dataProvider = "secureOptions")
     public void testRetentionCoordsForTable(String secureOption) throws Exception {
         StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
 
+        final String umask = "000";
+
+        FileSystem fs = trgMiniDFS.getFileSystem();
+        Configuration conf = fs.getConf();
+        conf.set("fs.permissions.umask-mode", umask);
+
+        // ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
+        setUmaskInFsConf(trgCluster, umask);
+
         org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(tableFeed, trgCluster.getName());
         final Calendar instance = Calendar.getInstance();
         instance.roll(Calendar.YEAR, 1);
@@ -604,9 +649,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
                 WorkflowExecutionContext.EntityOperations.DELETE, props);
 
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
-        assertHCatCredentials(
-            getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
-            new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "")).toString());
+        assertHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
+                coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
+
+        try {
+            verifyClusterLocationsUMask(trgCluster, fs);
+            verifyWorkflowUMask(fs, coord, umask);
+        } finally {
+            cleanupWorkflowState(fs, coord);
+            FileSystem.closeAll();
+        }
     }
 
     private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
@@ -636,4 +688,53 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
             }
         }
     }
+
+    private void verifyClusterLocationsUMask(Cluster aCluster, FileSystem fs) throws IOException {
+        String stagingLocation = ClusterHelper.getLocation(aCluster, "staging");
+        Path stagingPath = new Path(stagingLocation);
+        if (fs.exists(stagingPath)) {
+            FileStatus fileStatus = fs.getFileStatus(stagingPath);
+            Assert.assertEquals(fileStatus.getPermission().toShort(), 511);
+        }
+
+        String workingLocation = ClusterHelper.getLocation(aCluster, "working");
+        Path workingPath = new Path(workingLocation);
+        if (fs.exists(workingPath)) {
+            FileStatus fileStatus = fs.getFileStatus(workingPath);
+            Assert.assertEquals(fileStatus.getPermission().toShort(), 493);
+        }
+    }
+
+    private void verifyWorkflowUMask(FileSystem fs, COORDINATORAPP coord,
+                                     String defaultUMask) throws IOException {
+        Assert.assertEquals(fs.getConf().get("fs.permissions.umask-mode"), defaultUMask);
+
+        String appPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        Path wfPath = new Path(appPath);
+        FileStatus[] fileStatuses = fs.listStatus(wfPath);
+        for (FileStatus fileStatus : fileStatuses) {
+            Assert.assertEquals(fileStatus.getOwner(), CurrentUser.getProxyUGI().getShortUserName());
+
+            final FsPermission permission = fileStatus.getPermission();
+            if (!fileStatus.isDirectory()) {
+                Assert.assertEquals(permission.toString(),
+                        HadoopClientFactory.getFileDefaultPermission(fs.getConf()).toString());
+            }
+        }
+    }
+
+    private void cleanupWorkflowState(FileSystem fs, COORDINATORAPP coord) throws Exception {
+        String appPath = coord.getAction().getWorkflow().getAppPath();
+        Path wfPath = new Path(appPath.replace("${nameNode}", ""));
+        fs.delete(wfPath, true);
+    }
+
+    private static void setUmaskInFsConf(Cluster cluster, String umask) {
+        // ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
+        org.apache.falcon.entity.v0.cluster.Property property =
+                new org.apache.falcon.entity.v0.cluster.Property();
+        property.setName("fs.permissions.umask-mode");
+        property.setValue(umask);
+        cluster.getProperties().getProperties().add(property);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index e50da2d..23e0535 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -232,13 +232,15 @@ public abstract class AbstractEntityManager {
 
     // Parallel update can get very clumsy if two feeds are updated which
     // are referred by a single process. Sequencing them.
-    public synchronized APIResult update(HttpServletRequest request, String type, String entityName, String colo,
-                                         String effectiveTimeStr) {
+    public synchronized APIResult update(HttpServletRequest request, String type, String entityName,
+                                         String colo, String effectiveTimeStr) {
         checkColo(colo);
         try {
             EntityType entityType = EntityType.valueOf(type.toUpperCase());
             Entity oldEntity = EntityUtil.getEntity(type, entityName);
             Entity newEntity = deserializeEntity(request, entityType);
+            // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
+            decorateEntityWithACL(newEntity);
             validate(newEntity);
 
             validateUpdate(oldEntity, newEntity);
@@ -326,6 +328,8 @@ public abstract class AbstractEntityManager {
 
         EntityType entityType = EntityType.valueOf(type.toUpperCase());
         Entity entity = deserializeEntity(request, entityType);
+        // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
+        decorateEntityWithACL(entity);
 
         Entity existingEntity = configStore.get(entityType, entity.getName());
         if (existingEntity != null) {
@@ -344,6 +348,50 @@ public abstract class AbstractEntityManager {
         return entity;
     }
 
+    /**
+     * KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass.
+     * existingEntity in config store will have teh decoration and equals check fails
+     * if entity passed is not decorated for checking if entity already exists.
+     *
+     * @param entity entity
+     */
+    private void decorateEntityWithACL(Entity entity) {
+        if (SecurityUtil.isAuthorizationEnabled() || EntityUtil.getACL(entity) != null) {
+            return; // not necessary to decorate
+        }
+
+        final String proxyUser = CurrentUser.getUser();
+        final String defaultGroupName = CurrentUser.getPrimaryGroupName();
+        switch (entity.getEntityType()) {
+        case CLUSTER:
+            org.apache.falcon.entity.v0.cluster.ACL clusterACL =
+                    new org.apache.falcon.entity.v0.cluster.ACL();
+            clusterACL.setOwner(proxyUser);
+            clusterACL.setGroup(defaultGroupName);
+            ((org.apache.falcon.entity.v0.cluster.Cluster) entity).setACL(clusterACL);
+            break;
+
+        case FEED:
+            org.apache.falcon.entity.v0.feed.ACL feedACL =
+                    new org.apache.falcon.entity.v0.feed.ACL();
+            feedACL.setOwner(proxyUser);
+            feedACL.setGroup(defaultGroupName);
+            ((org.apache.falcon.entity.v0.feed.Feed) entity).setACL(feedACL);
+            break;
+
+        case PROCESS:
+            org.apache.falcon.entity.v0.process.ACL processACL =
+                    new org.apache.falcon.entity.v0.process.ACL();
+            processACL.setOwner(proxyUser);
+            processACL.setGroup(defaultGroupName);
+            ((org.apache.falcon.entity.v0.process.Process) entity).setACL(processACL);
+            break;
+
+        default:
+            break;
+        }
+    }
+
     protected Entity deserializeEntity(HttpServletRequest request, EntityType entityType)
         throws IOException, FalconException {
 
@@ -612,7 +660,7 @@ public abstract class AbstractEntityManager {
     protected boolean isEntityAuthorized(Entity entity) {
         try {
             SecurityUtil.getAuthorizationProvider().authorizeResource("entities", "list",
-                    entity.getEntityType().toString(), entity.getName(), CurrentUser.getProxyUgi());
+                    entity.getEntityType().toString(), entity.getName(), CurrentUser.getProxyUGI());
         } catch (Exception e) {
             LOG.error("Authorization failed for entity=" + entity.getName()
                     + " for user=" + CurrentUser.getUser(), e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
index b61360b..3daa419 100644
--- a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
@@ -67,7 +67,7 @@ public class FalconAuthorizationFilter implements Filter {
             LOG.info("Authorizing user={} against request={}", CurrentUser.getUser(), requestParts);
             authorizationProvider.authorizeResource(requestParts.getResource(),
                     requestParts.getAction(), requestParts.getEntityType(),
-                    requestParts.getEntityName(), CurrentUser.getProxyUgi());
+                    requestParts.getEntityName(), CurrentUser.getProxyUGI());
         }
 
         filterChain.doFilter(request, response);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 211c8bf..2bf2a98 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -19,7 +19,9 @@ package org.apache.falcon.replication;
 
 import org.apache.commons.cli.*;
 import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -129,9 +131,10 @@ public class FeedReplicator extends Configured implements Tool {
         return listPaths;
     }
 
-    private void executePostProcessing(DistCpOptions options) throws IOException {
+    private void executePostProcessing(DistCpOptions options) throws IOException, FalconException {
         Path targetPath = options.getTargetPath();
-        FileSystem fs = targetPath.getFileSystem(getConf());
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                targetPath.toUri(), getConf());
         List<Path> inPaths = options.getSourcePaths();
         assert inPaths.size() == 1 : "Source paths more than 1 can't be handled";
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index d854bdd..4e8e1cd 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
@@ -130,16 +128,13 @@ public class LateDataHandler extends Configured implements Tool {
         return computedMetrics;
     }
 
-    private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException, FalconException {
+    private void persistMetrics(Map<String, Long> metrics,
+                                Path file) throws IOException, FalconException {
         OutputStream out = null;
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), getConf());
+        try {  // created in a map job
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
             out = fs.create(file);
 
-            // making sure falcon can read this file
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(file, permission);
-
             for (Map.Entry<String, Long> entry : metrics.entrySet()) {
                 out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
             }
@@ -212,7 +207,7 @@ public class LateDataHandler extends Configured implements Tool {
     }
 
     private long usage(Path inPath, Configuration conf) throws IOException, FalconException {
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(inPath.toUri(), conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(inPath.toUri(), conf);
         FileStatus[] fileStatuses = fs.globStatus(inPath);
         if (fileStatuses == null || fileStatuses.length == 0) {
             return 0;
@@ -261,7 +256,7 @@ public class LateDataHandler extends Configured implements Tool {
         throws Exception {
 
         StringBuilder buffer = new StringBuilder();
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), conf);
         BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(file)));
         String line;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 459da84..9ee94c5 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -66,6 +66,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
                     continue;
                 }
 
+                // Login the user to access WfEngine as this user
                 CurrentUser.authenticate(message.getWorkflowUser());
                 String jobStatus = handler.getWfEngine().getWorkflowStatus(
                         message.getClusterName(), message.getWfId());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 80a3b83..9ba632e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -99,7 +99,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
         final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
         Configuration conf = LateRerunHandler.getConfiguration(storageEndpoint);
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(lateLogPath.toUri(), conf);
         if (!fs.exists(lateLogPath)) {
             LOG.warn("Late log file: {} not found", lateLogPath);
             return "";

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index bacc20f..c2cb09e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -37,7 +37,6 @@ import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.RerunPolicyFactory;
 import org.apache.falcon.rerun.queue.DelayedQueue;
-import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.conf.Configuration;
@@ -66,7 +65,6 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             if (wait == -1) {
                 LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
 
-                CurrentUser.authenticate(workflowUser);
                 java.util.Properties properties =
                         this.getWfEngine().getWorkflowProperties(cluster, wfId);
                 String logDir = properties.getProperty("logDir");
@@ -77,7 +75,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                 LOG.info("Going to delete path: {}", lateLogPath);
                 final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
                 Configuration conf = getConfiguration(storageEndpoint);
-                FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+                FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
                 if (fs.exists(lateLogPath)) {
                     boolean deleted = fs.delete(lateLogPath, true);
                     if (deleted) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index f7a4493..67ea181 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -35,6 +35,7 @@ import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.common.FeedDataPath.VARS;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -127,7 +128,8 @@ public class FeedEvictor extends Configured implements Tool {
 
         Path path = new Path(logFile);
         EvictedInstanceSerDe.serializeEvictedInstancePaths(
-                path.getFileSystem(getConf()), path, instancePaths);
+                HadoopClientFactory.get().createProxiedFileSystem(
+                        path.toUri(), getConf()), path, instancePaths);
 
         int len = buffer.length();
         if (len > 0) {
@@ -157,11 +159,10 @@ public class FeedEvictor extends Configured implements Tool {
         }
     }
 
-    private void fileSystemEvictor(String feedPath, String retentionLimit, String timeZone)
-        throws IOException, ELException {
-
+    private void fileSystemEvictor(String feedPath, String retentionLimit,
+                                   String timeZone) throws IOException, ELException, FalconException {
         Path normalizedPath = new Path(feedPath);
-        FileSystem fs = normalizedPath.getFileSystem(getConf());
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri());
         feedPath = normalizedPath.toUri().getPath();
         LOG.info("Normalized path: {}", feedPath);
 
@@ -526,7 +527,8 @@ public class FeedEvictor extends Configured implements Tool {
             if (isTableExternal) { // nuke the dirs if an external table
                 final String location = partitionToDrop.getLocation();
                 final Path path = new Path(location);
-                deleted = path.getFileSystem(new Configuration()).delete(path, true);
+                deleted = HadoopClientFactory.get()
+                        .createProxiedFileSystem(path.toUri()) .delete(path, true);
             }
             if (!isTableExternal || deleted) {
                 // replace ',' with ';' since message producer splits instancePaths string by ','

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index b1e518d..c67b8fc 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -127,7 +127,7 @@ public class EmbeddedCluster {
         locs.getLocations().add(location);
         location = new Location();
         location.setName("working");
-        location.setPath("/project/falcon/working");
+        location.setPath("/projects/falcon/working");
         locs.getLocations().add(location);
         clusterEntity.setLocations(locs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 6694af1..8cd86cc 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -1014,6 +1014,7 @@ public class FalconCLIIT {
         OutputStream out = new FileOutputStream(file);
         props.setProperty("falcon.recipe.processName", context.getProcessName());
         props.setProperty("falcon.recipe.src.cluster.name", context.getClusterName());
+        props.setProperty("falcon.recipe.processEndDate", context.getProcessEndTime());
         props.setProperty("falcon.recipe.inputFeedName", context.getInputFeedName());
         props.setProperty("falcon.recipe.outputFeedName", context.getOutputFeedName());
         props.setProperty("falcon.recipe.workflow.path", TestContext.class.getResource("/fs-workflow.xml").getPath());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index 96c99c5..cdeba63 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.latedata.LateDataHandler;
 import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.FSUtils;
 import org.apache.falcon.util.HiveTestUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -65,6 +66,7 @@ public class LateDataHandlerIT {
 
     @BeforeClass
     public void prepare() throws Exception {
+        CurrentUser.authenticate(TestContext.REMOTE_USER);
         TestContext.cleanupStore();
 
         String filePath = TestContext.overlayParametersOverTemplate(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index ed70a0b..f7e6bdb 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -109,8 +109,8 @@ public class EntityManagerJerseyIT {
         ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
         context.assertSuccessful(response);
         FileSystem fs = context.getCluster().getFileSystem();
-        assertLibs(fs, new Path("/project/falcon/working/libext/FEED/retention"));
-        assertLibs(fs, new Path("/project/falcon/working/libext/PROCESS"));
+        assertLibs(fs, new Path("/projects/falcon/working/libext/FEED/retention"));
+        assertLibs(fs, new Path("/projects/falcon/working/libext/PROCESS"));
 
         String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(new File(tmpFileName));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index e9545d1..64f98d4 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -32,11 +32,13 @@ import org.apache.falcon.cli.FalconCLI;
 import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.client.FalconClient;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.StartupProperties;
@@ -103,6 +105,7 @@ public class TestContext {
 
     protected String clusterName;
     protected String processName;
+    protected String processEndTime;
     protected String inputFeedName;
     protected String outputFeedName;
 
@@ -189,6 +192,10 @@ public class TestContext {
         return processName;
     }
 
+    public String getProcessEndTime() {
+        return processEndTime;
+    }
+
     public String getInputFeedName() {
         return inputFeedName;
     }
@@ -279,6 +286,8 @@ public class TestContext {
             try {
                 cluster = EmbeddedCluster.newCluster(overlay.get("cluster"), true);
                 clusterName = cluster.getCluster().getName();
+                deleteClusterLocations(cluster.getCluster(), cluster.getFileSystem());
+                createClusterLocations(cluster.getCluster(), cluster.getFileSystem());
             } catch (Exception e) {
                 throw new IOException("Unable to setup cluster info", e);
             }
@@ -286,6 +295,37 @@ public class TestContext {
         return submitFileToFalcon(entityType, tmpFile);
     }
 
+    public static void deleteClusterLocations(Cluster clusterEntity,
+                                              FileSystem fs) throws IOException {
+        String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+        Path stagingPath = new Path(stagingLocation);
+        if (fs.exists(stagingPath)) {
+            fs.delete(stagingPath, true);
+        }
+
+        String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
+        Path workingPath = new Path(workingLocation);
+        if (fs.exists(workingPath)) {
+            fs.delete(workingPath, true);
+        }
+    }
+
+    public static void createClusterLocations(Cluster clusterEntity,
+                                              FileSystem fs) throws IOException {
+        String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+        Path stagingPath = new Path(stagingLocation);
+        if (!fs.exists(stagingPath)) {
+            HadoopClientFactory.mkdirs(fs, stagingPath, HadoopClientFactory.ALL_PERMISSION);
+        }
+
+        String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
+        Path workingPath = new Path(workingLocation);
+        if (!fs.exists(workingPath)) {
+            HadoopClientFactory
+                    .mkdirs(fs, workingPath, HadoopClientFactory.READ_EXECUTE_PERMISSION);
+        }
+    }
+
     public ClientResponse submitFileToFalcon(EntityType entityType, String tmpFile) throws IOException {
 
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
@@ -376,6 +416,8 @@ public class TestContext {
         //only feeds with future dates can be scheduled
         Date endDate = new Date(System.currentTimeMillis() + 15 * 60 * 1000);
         overlay.put("feedEndDate", SchemaHelper.formatDateUTC(endDate));
+        processEndTime = SchemaHelper.formatDateUTC(endDate);
+        overlay.put("processEndDate", processEndTime);
         outputFeedName = "out" + time;
         overlay.put("outputFeedName", outputFeedName);
         processName = "p" + time;
@@ -436,6 +478,18 @@ public class TestContext {
         mkdir(fs, new Path(wfParent, "input/2012/04/20/00"));
         Path outPath = new Path(wfParent, "output");
         mkdir(fs, outPath, new FsPermission((short) 511));
+
+        // init cluster locations
+        initClusterLocations(cluster, fs);
+    }
+
+    private static void initClusterLocations(EmbeddedCluster cluster,
+                                             FileSystem fs) throws Exception {
+        String stagingPath = ClusterHelper.getLocation(cluster.getCluster(), "staging");
+        mkdir(fs, new Path(stagingPath), HadoopClientFactory.ALL_PERMISSION);
+
+        String workingPath = ClusterHelper.getLocation(cluster.getCluster(), "working");
+        mkdir(fs, new Path(workingPath), HadoopClientFactory.READ_EXECUTE_PERMISSION);
     }
 
     public static void cleanupStore() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
index 6a65bf6..cbbf90a 100644
--- a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
@@ -20,12 +20,19 @@ package org.apache.falcon.validation;
 
 import com.sun.jersey.api.client.ClientResponse;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.ACL;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interface;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.resource.TestContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -33,6 +40,7 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
 
@@ -41,13 +49,31 @@ import java.util.Map;
  * interface endpoints are valid.
  */
 public class ClusterEntityValidationIT {
+    private static final FsPermission OWNER_ONLY_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
     private final TestContext context = new TestContext();
     private Map<String, String> overlay;
 
+    private final ClusterEntityParser parser =
+            (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+    private Cluster cluster;
+    private FileSystem fs;
+
 
     @BeforeClass
     public void setup() throws Exception {
         TestContext.prepare();
+
+        overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(
+                TestContext.CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+        InputStream stream = new FileInputStream(filePath);
+        cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
+        Assert.assertNotNull(cluster);
+
+        fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
     }
 
     /**
@@ -89,15 +115,15 @@ public class ClusterEntityValidationIT {
         overlay = context.getUniqueOverlay();
         String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
         InputStream stream = new FileInputStream(filePath);
-        Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
-        Assert.assertNotNull(cluster);
-        cluster.setColo("default");  // validations will be ignored if not default & tests fail
+        Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
+        Assert.assertNotNull(clusterEntity);
+        clusterEntity.setColo("default");  // validations will be ignored if not default & tests fail
 
-        Interface anInterface = ClusterHelper.getInterface(cluster, interfacetype);
+        Interface anInterface = ClusterHelper.getInterface(clusterEntity, interfacetype);
         anInterface.setEndpoint(endpoint);
 
         File tmpFile = TestContext.getTempFile();
-        EntityType.CLUSTER.getMarshaller().marshal(cluster, tmpFile);
+        EntityType.CLUSTER.getMarshaller().marshal(clusterEntity, tmpFile);
         ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
         context.assertFailure(response);
     }
@@ -107,20 +133,55 @@ public class ClusterEntityValidationIT {
         overlay = context.getUniqueOverlay();
         String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
         InputStream stream = new FileInputStream(filePath);
-        Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
-        Assert.assertNotNull(cluster);
+        Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
+        Assert.assertNotNull(clusterEntity);
 
         // Adding ACL with authorization disabled must not hurt
         ACL clusterACL = new ACL();
         clusterACL.setOwner(TestContext.REMOTE_USER);
         clusterACL.setGroup(TestContext.REMOTE_USER);
-        cluster.setACL(clusterACL);
+        clusterEntity.setACL(clusterACL);
 
-        cluster.setColo("default");  // validations will be ignored if not default & tests fail
+        clusterEntity.setColo("default");  // validations will be ignored if not default & tests fail
 
         File tmpFile = TestContext.getTempFile();
-        EntityType.CLUSTER.getMarshaller().marshal(cluster, tmpFile);
+        EntityType.CLUSTER.getMarshaller().marshal(clusterEntity, tmpFile);
         ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
         context.assertSuccessful(response);
     }
+
+    @Test
+    public void testValidateClusterLocations() throws Exception {
+        TestContext.createClusterLocations(cluster, fs);
+        parser.validate(cluster);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateClusterLocationsThatDontExist() throws Exception {
+        TestContext.deleteClusterLocations(cluster, fs);
+        parser.validate(cluster);
+        Assert.fail("Should have thrown a validation exception");
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateClusterLocationsThatExistWithBadOwner() throws Exception {
+        TestContext.deleteClusterLocations(cluster, fs);
+        createClusterLocationsBadPermissions(cluster);
+        parser.validate(cluster);
+        Assert.fail("Should have thrown a validation exception");
+    }
+
+    private void createClusterLocationsBadPermissions(Cluster clusterEntity) throws IOException {
+        String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+        Path stagingPath = new Path(stagingLocation);
+        if (!fs.exists(stagingPath)) {
+            FileSystem.mkdirs(fs, stagingPath, OWNER_ONLY_PERMISSION);
+        }
+
+        String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
+        Path workingPath = new Path(workingLocation);
+        if (!fs.exists(workingPath)) {
+            FileSystem.mkdirs(fs, stagingPath, OWNER_ONLY_PERMISSION);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/resources/cluster-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/cluster-template.xml b/webapp/src/test/resources/cluster-template.xml
index bc17fb9..16b7c8c 100644
--- a/webapp/src/test/resources/cluster-template.xml
+++ b/webapp/src/test/resources/cluster-template.xml
@@ -34,7 +34,7 @@
     <locations>
         <location name="staging" path="/projects/falcon/staging"/>
         <location name="temp" path="/tmp"/>
-        <location name="working" path="/project/falcon/working"/>
+        <location name="working" path="/projects/falcon/working"/>
     </locations>
     <properties>
     </properties>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/resources/process-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/process-template.xml b/webapp/src/test/resources/process-template.xml
index 0add06a..cdce1b9 100644
--- a/webapp/src/test/resources/process-template.xml
+++ b/webapp/src/test/resources/process-template.xml
@@ -22,7 +22,7 @@
     <pipelines>testPipeline,dataReplicationPipeline</pipelines>
     <clusters>
         <cluster name="##src.cluster.name##">
-            <validity end="2012-04-21T00:00Z" start="2012-04-20T00:00Z"/>
+            <validity end="##processEndDate##" start="2012-04-20T00:00Z"/>
         </cluster>
     </clusters>
 


Mime
View raw message