falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suh...@apache.org
Subject [1/2] FALCON-353 enable dry run feature of oozie for schedule and update. Contributed by Shwetha GS
Date Wed, 23 Jul 2014 09:03:30 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 954cd9554 -> 14476f9ab


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 38be792..94286ef 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -21,7 +21,6 @@ package org.apache.falcon.workflow.engine;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
@@ -31,8 +30,11 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.OozieBundleBuilder;
 import org.apache.falcon.oozie.OozieEntityBuilder;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.CONFIGURATION.Property;
+import org.apache.falcon.oozie.bundle.COORDINATOR;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesResult.Instance;
@@ -44,7 +46,7 @@ import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
@@ -61,7 +63,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -83,7 +84,6 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private static final Logger LOG = LoggerFactory.getLogger(OozieWorkflowEngine.class);
 
-    public static final String ENGINE = "oozie";
     private static final BundleJob MISSING = new NullBundleJob();
 
     private static final List<WorkflowJob.Status> WF_KILL_PRECOND =
@@ -113,6 +113,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private static final String[] BUNDLE_UPDATEABLE_PROPS =
         new String[]{"parallel", "clusters.clusters[\\d+].validity.end", };
 
+    public static final ConfigurationStore STORE = ConfigurationStore.get();
+
     public OozieWorkflowEngine() {
         registerListener(new OozieHouseKeepingService());
     }
@@ -136,29 +138,55 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             if (bundleJob == MISSING) {
                 schedClusters.add(cluster);
             } else {
-                LOG.debug("The entity {} is already scheduled on cluster {}", entity.getName(), cluster);
+                LOG.debug("Entity {} is already scheduled on cluster {}", entity.getName(), cluster);
             }
         }
 
         if (!schedClusters.isEmpty()) {
             OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
             for (String clusterName: schedClusters) {
-                Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
-                LOG.info("Scheduling {} on cluster {}", entity.toShortString(), clusterName);
+                Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
                 Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
                 Properties properties = builder.build(cluster, buildPath);
+                if (properties == null) {
+                    LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster);
+                    continue;
+                }
+
+                //Do dryRun of coords before schedule as schedule is asynchronous
+                dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)));
                 scheduleEntity(clusterName, properties, entity);
-                commitStagingPath(cluster, buildPath);
             }
         }
     }
 
-    private void commitStagingPath(Cluster cluster, Path path) throws FalconException {
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
-        try {
-            fs.create(new Path(path, EntityUtil.SUCCEEDED_FILE_NAME)).close();
-        } catch (IOException e) {
-            throw new FalconException(e);
+    @Override
+    public void dryRun(Entity entity, String clusterName) throws FalconException {
+        OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
+        Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis());
+        Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
+        Properties props = builder.build(cluster, buildPath);
+        if (props != null) {
+            dryRunInternal(cluster, new Path(props.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+        }
+    }
+
+
+    private void dryRunInternal(Cluster cluster, Path buildPath) throws FalconException {
+        BUNDLEAPP bundle = OozieBundleBuilder.unmarshal(cluster, buildPath);
+        ProxyOozieClient client = OozieClientFactory.get(cluster.getName());
+        for (COORDINATOR coord : bundle.getCoordinator()) {
+            Properties props = new Properties();
+            props.setProperty(OozieClient.COORDINATOR_APP_PATH, coord.getAppPath());
+            for (Property prop : coord.getConfiguration().getProperty()) {
+                props.setProperty(prop.getName(), prop.getValue());
+            }
+            try {
+                LOG.info("dryRun with properties {}", props);
+                client.dryrun(props);
+            } catch (OozieClientException e) {
+                throw new FalconException(e);
+            }
         }
     }
 
@@ -210,25 +238,35 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     //Return all bundles for the entity in the requested cluster
     private List<BundleJob> findBundles(Entity entity, String clusterName) throws FalconException {
+        Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
+        FileStatus[] stgPaths = EntityUtil.getAllStagingPaths(cluster, entity);
+        List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
+        if (stgPaths == null) {
+            return filteredJobs;
+        }
+
+        List<String> appPaths = new ArrayList<String>();
+        for (FileStatus file : stgPaths) {
+            appPaths.add(file.getPath().toUri().getPath());
+        }
 
         try {
-            List<BundleJob> jobs = OozieClientFactory.get(clusterName).getBundleJobsInfo(OozieClient.FILTER_NAME
+            List<BundleJob> jobs = OozieClientFactory.get(cluster.getName()).getBundleJobsInfo(OozieClient.FILTER_NAME
                 + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
             if (jobs != null) {
-                List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
                 for (BundleJob job : jobs) {
-                    //Filtering bundles that correspond to deleted entities(endtime is set when an entity is deleted)
-                    if (job.getEndTime() == null) {
-                        filteredJobs.add(job);
-                        LOG.debug("Found bundle {}", job.getId());
+                    if (appPaths.contains(new Path(job.getAppPath()).toUri().getPath())) {
+                        //Load bundle as coord info is not returned in getBundleJobsInfo()
+                        BundleJob bundle = getBundleInfo(clusterName, job.getId());
+                        filteredJobs.add(bundle);
+                        LOG.debug("Found bundle {} with app path {}", job.getId(), job.getAppPath());
                     }
                 }
-                return filteredJobs;
             }
-            return new ArrayList<BundleJob>();
         } catch (OozieClientException e) {
             throw new FalconException(e);
         }
+        return filteredJobs;
     }
 
     //Return all bundles for the entity for each cluster
@@ -254,15 +292,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     //Return latest bundle(last created) for the entity in the requested cluster
     private BundleJob findLatestBundle(Entity entity, String cluster) throws FalconException {
         List<BundleJob> bundles = findBundles(entity, cluster);
-        Date latest = null;
-        BundleJob bundle = MISSING;
-        for (BundleJob job : bundles) {
-            if (latest == null || latest.before(job.getCreatedTime())) {
-                bundle = job;
-                latest = job.getCreatedTime();
-            }
+        if (bundles == null || bundles.isEmpty()) {
+            return MISSING;
         }
-        return bundle;
+
+        return Collections.max(bundles, new Comparator<BundleJob>() {
+            @Override public int compare(BundleJob o1, BundleJob o2) {
+                return o1.getCreatedTime().compareTo(o2.getCreatedTime());
+            }
+        });
     }
 
     @Override
@@ -299,13 +337,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private String doBundleAction(Entity entity, BundleAction action, String cluster) throws FalconException {
-
         List<BundleJob> jobs = findBundles(entity, cluster);
-        if (jobs.isEmpty()) {
-            LOG.warn("No active job found for {}", entity.getName());
-            return "FAILED";
-        }
-
         beforeAction(entity, action, cluster);
         for (BundleJob job : jobs) {
             switch (action) {
@@ -332,9 +364,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             default:
             }
-            afterAction(entity, action, cluster);
         }
-
+        afterAction(entity, action, cluster);
         return "SUCCESS";
     }
 
@@ -930,20 +961,21 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public Date update(Entity oldEntity, Entity newEntity, String cluster, Date effectiveTime) throws FalconException {
-        boolean entityUpdated = UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster);
-        boolean wfUpdated = UpdateHelper.isWorkflowUpdated(cluster, newEntity);
+    public String update(Entity oldEntity, Entity newEntity, String cluster, Date effectiveTime)
+        throws FalconException {
+        BundleJob bundle = findLatestBundle(oldEntity, cluster);
 
-        if (!entityUpdated && !wfUpdated) {
-            LOG.debug("Nothing to update for cluster {}", cluster);
-            return null;
+        boolean entityUpdated = false;
+        boolean wfUpdated = false;
+        if (bundle != MISSING) {
+            entityUpdated = UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster, new Path(bundle.getAppPath()));
+            wfUpdated = UpdateHelper.isWorkflowUpdated(cluster, newEntity, new Path(bundle.getAppPath()));
         }
 
         Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-        Path stagingPath = EntityUtil.getLastCommittedStagingPath(clusterEntity, oldEntity);
-        if (stagingPath != null) {  //update if entity is scheduled
-            BundleJob bundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
-            bundle = getBundleInfo(cluster, bundle.getId());
+        StringBuilder result = new StringBuilder();
+        //entity is scheduled before and either entity or workflow is updated
+        if (bundle != MISSING && (entityUpdated || wfUpdated)) {
             LOG.info("Updating entity through Workflow Engine {}", newEntity.toShortString());
             Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
             if (newEndTime.before(now())) {
@@ -958,58 +990,76 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 LOG.info("Change operation is adequate! : {}, bundle: {}", cluster, bundle.getId());
                 updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity),
                     EntityUtil.getEndTime(newEntity, cluster));
-                return newEndTime;
+                return getUpdateString(newEntity, new Date(), bundle, bundle);
             }
 
-            LOG.debug("Going to update! : {} for cluster {}, bundle: {}",
-                    newEntity.toShortString(), cluster, bundle.getId());
-            effectiveTime = updateInternal(oldEntity, newEntity, clusterEntity, bundle, false, effectiveTime,
-                CurrentUser.getUser());
-            LOG.info("Entity update complete: {} for cluster {}, bundle: {}",
-                    newEntity.toShortString(), cluster, bundle.getId());
+            LOG.debug("Going to update! : {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, bundle
+                .getId());
+            result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle, effectiveTime,
+                CurrentUser.getUser())).append("\n");
+            LOG.info("Entity update complete: {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster,
+                bundle.getId());
+        }
+
+        result.append(updateDependents(clusterEntity, oldEntity, newEntity, effectiveTime));
+        return result.toString();
+    }
+
+    private String getUpdateString(Entity entity, Date date, BundleJob oldBundle, BundleJob newBundle) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(entity.toShortString()).append("/Effective Time: ").append(SchemaHelper.formatDateUTC(date));
+        builder.append(". Old workflow id: ");
+        List<String> coords = new ArrayList<String>();
+        for (CoordinatorJob coord : oldBundle.getCoordinators()) {
+            coords.add(coord.getId());
+        }
+        builder.append(StringUtils.join(coords, ','));
+
+        if (newBundle != null) {
+            builder.append(". New workflow id: ");
+            coords.clear();
+            for (CoordinatorJob coord : newBundle.getCoordinators()) {
+                coords.add(coord.getId());
+            }
+            if (coords.isEmpty()) {
+                builder.append(newBundle.getId());
+            } else {
+                builder.append(StringUtils.join(coords, ','));
+            }
         }
+        return  builder.toString();
+    }
 
+    private String updateDependents(Cluster cluster, Entity oldEntity, Entity newEntity,
+        Date effectiveTime) throws FalconException {
         //Update affected entities
         Set<Entity> affectedEntities = EntityGraph.get().getDependents(oldEntity);
+        StringBuilder result = new StringBuilder();
         for (Entity affectedEntity : affectedEntities) {
             if (affectedEntity.getEntityType() != EntityType.PROCESS) {
                 continue;
             }
 
-            LOG.info("Dependent entities need to be updated {}", affectedEntity.toShortString());
-            if (!UpdateHelper.shouldUpdate(oldEntity, newEntity, affectedEntity, cluster)) {
+            LOG.info("Dependent entity {} need to be updated", affectedEntity.toShortString());
+            if (!UpdateHelper.shouldUpdate(oldEntity, newEntity, affectedEntity, cluster.getName())) {
                 continue;
             }
 
-            BundleJob affectedProcBundle = findLatestBundle(affectedEntity, cluster);
+            BundleJob affectedProcBundle = findLatestBundle(affectedEntity, cluster.getName());
             if (affectedProcBundle == MISSING) {
+                LOG.info("Dependent entity {} is not scheduled", affectedEntity.getName());
                 continue;
             }
 
             LOG.info("Triggering update for {}, {}", cluster, affectedProcBundle.getId());
 
-            Date depEndTime = updateInternal(affectedEntity, affectedEntity, clusterEntity, affectedProcBundle,
-                false, effectiveTime, affectedProcBundle.getUser());
-            if (effectiveTime == null || effectiveTime.after(depEndTime)) {
-                effectiveTime = depEndTime;
-            }
+            result.append(updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle,
+                effectiveTime, affectedProcBundle.getUser())).append("\n");
             LOG.info("Entity update complete: {} for cluster {}, bundle: {}",
-                    affectedEntity.toShortString(), cluster, affectedProcBundle.getId());
+                affectedEntity.toShortString(), cluster, affectedProcBundle.getId());
         }
-        LOG.info("Entity update and all dependent entities updated: {}", oldEntity.toShortString());
-        return effectiveTime;
-    }
-
-    //Returns bundle whose app path is same as the staging path(argument)
-    private BundleJob findBundleForStagingPath(String cluster, Entity entity, Path stagingPath) throws FalconException {
-        List<BundleJob> bundles = findBundles(entity, cluster);
-        String bundlePath = stagingPath.toUri().getPath();
-        for (BundleJob bundle : bundles) {
-            if (bundle.getAppPath().endsWith(bundlePath)) {
-                return bundle;
-            }
-        }
-        return null;
+        LOG.info("All dependent entities updated for: {}", oldEntity.toShortString());
+        return result.toString();
     }
 
     private Date now() {
@@ -1039,6 +1089,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
         }
 
+        if (bundle.getCoordinators() == null || bundle.getCoordinators().isEmpty()) {
+            throw new FalconException("Invalid state. Oozie coords are still not created. Try again later");
+        }
+
         // change coords
         for (CoordinatorJob coord : bundle.getCoordinators()) {
             LOG.debug("Updating endtime of coord {} to {} on cluster {}",
@@ -1068,83 +1122,48 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private void suspendCoords(String cluster, BundleJob bundle) throws FalconException {
-        for (CoordinatorJob coord : bundle.getCoordinators()) {
-            suspend(cluster, coord.getId());
-        }
-    }
+    private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
+        Date inEffectiveTime, String user) throws FalconException {
+        String clusterName = cluster.getName();
 
-    private void resumeCoords(String cluster, BundleJob bundle) throws FalconException {
-        for (CoordinatorJob coord : bundle.getCoordinators()) {
-            resume(cluster, coord.getId());
-        }
-    }
+        Date effectiveTime = getEffectiveTime(cluster, newEntity, inEffectiveTime);
+        LOG.info("Effective time " + effectiveTime);
 
-    private Date updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
-        boolean alreadyCreated, Date inEffectiveTime, String user) throws FalconException {
-        Job.Status oldBundleStatus = oldBundle.getStatus();
-        String clusterName = cluster.getName();
+        //Validate that new entity can be scheduled
+        dryRunForUpdate(cluster, newEntity, effectiveTime);
 
-        //Suspend coords as bundle suspend doesn't suspend coords synchronously
-        suspendCoords(clusterName, oldBundle);
-
-        Path stagingPath = EntityUtil.getLatestStagingPath(cluster, oldEntity);
-        //find last scheduled bundle
-        BundleJob latestBundle = findBundleForStagingPath(clusterName, oldEntity, stagingPath);
-        Date effectiveTime;
-        if (oldBundle.getAppPath().endsWith(stagingPath.toUri().getPath())
-                || latestBundle == null || !alreadyCreated) {
-            // new entity is not scheduled yet, create new bundle
-            LOG.info("New bundle hasn't been created yet. So will create one");
-
-            //pick effective time as now() + 3 min to handle any time diff between falcon and oozie
-            //oozie rejects changes with endtime < now
-            effectiveTime = offsetTime(now(), 3);
-            if (inEffectiveTime != null && inEffectiveTime.after(effectiveTime)) {
-                //If the user has specified effective time and is valid,
-                // pick user specified effective time
-                effectiveTime = inEffectiveTime;
-            }
+        //Set end times for old coords
+        updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime);
 
-            //pick start time for new bundle which is after effectiveTime
-            effectiveTime = EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime);
+        //schedule new entity
+        String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime, user);
+        BundleJob newBundle = getBundleInfo(clusterName, newJobId);
 
-            //schedule new bundle
-            String newBundleId = scheduleForUpdate(newEntity, cluster, effectiveTime, user);
-            //newBundleId and latestBundle will be null if effectiveTime = process end time
-            if (newBundleId != null) {
-                latestBundle = getBundleInfo(clusterName, newBundleId);
-                LOG.info("New bundle {} scheduled successfully with start time {}",
-                        newBundleId, SchemaHelper.formatDateUTC(effectiveTime));
-            }
-        } else {
-            LOG.info("New bundle has already been created. Bundle Id: {}, Start: {}, End: {}",
-                    latestBundle.getId(),
-                    SchemaHelper.formatDateUTC(latestBundle.getStartTime()),
-                    latestBundle.getEndTime());
+        return getUpdateString(newEntity, effectiveTime, oldBundle, newBundle);
+    }
 
-            //pick effectiveTime from already created bundle
-            effectiveTime = getMinStartTime(latestBundle);
-            LOG.info("Will set old coord end time to {}",
-                    SchemaHelper.formatDateUTC(effectiveTime));
-        }
-        if (effectiveTime != null) {
-            //set endtime for old coords
-            updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime);
+    private Date getEffectiveTime(Cluster cluster, Entity newEntity, Date inEffectiveTime) {
+        //pick effective time as now() + 3 min to handle any time diff between falcon and oozie
+        //oozie rejects changes with endtime < now
+        Date effectiveTime = offsetTime(now(), 3);
+        if (inEffectiveTime != null && inEffectiveTime.after(effectiveTime)) {
+            //If the user has specified effective time and is valid,
+            // pick user specified effective time
+            effectiveTime = inEffectiveTime;
         }
 
-        if (oldBundleStatus != Job.Status.SUSPENDED
-                && oldBundleStatus != Job.Status.PREPSUSPENDED) {
-            //resume coords
-            resumeCoords(clusterName, oldBundle);
-        }
+        //pick start time for new bundle which is after effectiveTime
+        return EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime);
+    }
 
-        //latestBundle will be null if effectiveTime = process end time
-        if (latestBundle != null) {
-            //create _SUCCESS in staging path to mark update is complete(to handle roll-forward for updates)
-            commitStagingPath(cluster, new Path(latestBundle.getAppPath()));
+    private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime) throws FalconException {
+        Entity clone = entity.copy();
+        EntityUtil.setStartDate(clone, cluster.getName(), startTime);
+        try {
+            dryRun(clone, cluster.getName());
+        } catch (FalconException e) {
+            throw new FalconException("The new entity " + entity.toShortString() + " can't be scheduled", e);
         }
-        return effectiveTime;
     }
 
     private String scheduleForUpdate(Entity entity, Cluster cluster, Date startDate, String user)
@@ -1159,7 +1178,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             OozieEntityBuilder builder = OozieEntityBuilder.get(clone);
             Properties properties = builder.build(cluster, buildPath);
             if (properties != null) {
-                LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster, properties);
+                LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster.getName(),
+                    properties);
                 return scheduleEntity(cluster.getName(), properties, entity);
             } else {
                 LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
@@ -1176,18 +1196,6 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private Date getMinStartTime(BundleJob bundle) {
-        Date startTime = null;
-        if (bundle.getCoordinators() != null) {
-            for (CoordinatorJob coord : bundle.getCoordinators()) {
-                if (startTime == null || startTime.after(coord.getStartTime())) {
-                    startTime = coord.getStartTime();
-                }
-            }
-        }
-        return startTime;
-    }
-
     private BundleJob getBundleInfo(String cluster, String bundleId) throws FalconException {
         try {
             return OozieClientFactory.get(cluster).getBundleJobInfo(bundleId);
@@ -1299,8 +1307,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private String run(String cluster, Properties props) throws FalconException {
         try {
+            LOG.info("Scheduling on cluster {} with properties {}", cluster, props);
+            props.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
             String jobId = OozieClientFactory.get(cluster).run(props);
-            LOG.info("Submitted {} on cluster {} with properties: {}", jobId, cluster, props);
+            LOG.info("Submitted {} on cluster {}", jobId, cluster);
             return jobId;
         } catch (OozieClientException e) {
             LOG.error("Unable to schedule workflows", e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 542634d..c99c36c 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -57,8 +57,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
 import javax.xml.bind.JAXBException;
 import java.io.IOException;
 import java.util.Calendar;
@@ -228,22 +226,26 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("mapBandwidthKB"), "102400");
 
         assertLibExtensions(coord, "replication");
-        WORKFLOWAPP wf = getWorkflowapp(coord);
+        WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
         assertWorkflowRetries(wf);
 
         Assert.assertFalse(Storage.TYPE.TABLE == FeedHelper.getStorageType(feed, trgCluster));
     }
 
+    private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
+        assertLibExtensions(trgMiniDFS.getFileSystem(), coord, EntityType.FEED, lifecycle);
+    }
+
     private COORDINATORAPP getCoordinator(EmbeddedCluster cluster, String appPath) throws Exception {
         return getCoordinator(cluster.getFileSystem(), new Path(StringUtils.removeStart(appPath, "${nameNode}")));
     }
 
     private String getWorkflowAppPath() {
-        return "${nameNode}/projects/falcon/REPLICATION/" + srcCluster.getName();
+        return "${nameNode}/projects/falcon/REPLICATION/" + srcCluster.getName() + "/workflow.xml";
     }
 
     private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
-        assertWorkflowRetries(getWorkflowapp(coord));
+        assertWorkflowRetries(getWorkflowapp(trgMiniDFS.getFileSystem(), coord));
     }
 
     private void assertWorkflowRetries(WORKFLOWAPP wf) throws JAXBException, IOException {
@@ -261,37 +263,6 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         }
     }
 
-    private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
-        WORKFLOWAPP wf = getWorkflowapp(coord);
-        List<Object> actions = wf.getDecisionOrForkOrJoin();
-        for (Object obj : actions) {
-            if (!(obj instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) obj;
-            List<String> files = null;
-            if (action.getJava() != null) {
-                files = action.getJava().getFile();
-            } else if (action.getPig() != null) {
-                files = action.getPig().getFile();
-            } else if (action.getMapReduce() != null) {
-                files = action.getMapReduce().getFile();
-            }
-            if (files != null) {
-                Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
-                        + lifecycle + "/ext.jar"));
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
-                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
-    }
-
     @Test
     public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
         OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeed, Tag.REPLICATION);
@@ -340,7 +311,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Date endDate = feedCluster.getValidity().getEnd();
         Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
 
-        WORKFLOWAPP workflow = getWorkflowapp(coord);
+        WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
         assertWorkflowDefinition(fsReplFeed, workflow);
 
         List<Object> actions = workflow.getDecisionOrForkOrJoin();
@@ -438,7 +409,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals("${now(0,-40)}", outEventInstance);
 
         // assert FS staging area
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        Path wfPath = new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "")).getParent();
         final FileSystem fs = trgMiniDFS.getFileSystem();
         Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
         Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
@@ -486,8 +457,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
 
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
-        assertReplicationHCatCredentials(getWorkflowapp(coord),
-                coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
+        assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), wfPath.toString());
     }
 
     private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
@@ -555,7 +525,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         List<Properties> coords = builder.buildCoords(srcCluster, new Path("/projects/falcon/"));
         COORDINATORAPP coord = getCoordinator(srcMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
 
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(),
+            "${nameNode}/projects/falcon/RETENTION/workflow.xml");
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
@@ -601,7 +572,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         List<Properties> coords = builder.buildCoords(trgCluster, new Path("/projects/falcon/"));
         COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
 
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(),
+            "${nameNode}/projects/falcon/RETENTION/workflow.xml");
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + tableFeed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
@@ -634,8 +606,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertWorkflowRetries(coord);
 
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
-        assertHCatCredentials(getWorkflowapp(coord),
-                coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
+        assertHCatCredentials(
+            getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
+            new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "")).getParent().toString());
     }
 
     private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index 54a2ea7..cc0c419 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -30,19 +30,25 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 import javax.xml.transform.stream.StreamSource;
 import javax.xml.validation.Schema;
 import javax.xml.validation.SchemaFactory;
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Collection;
+import java.util.List;
 
 /**
  * Base for falcon unit tests involving configuration store.
@@ -96,7 +102,7 @@ public class AbstractTestBase {
     }
 
     protected COORDINATORAPP getCoordinator(FileSystem fs, Path path) throws Exception {
-        String coordStr = readFile(fs, new Path(path, "coordinator.xml"));
+        String coordStr = readFile(fs, path);
 
         Unmarshaller unmarshaller = JAXBContext.newInstance(COORDINATORAPP.class).createUnmarshaller();
         SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
@@ -138,4 +144,41 @@ public class AbstractTestBase {
             }
         }
     }
+
+    protected void assertLibExtensions(FileSystem fs, COORDINATORAPP coord, EntityType type,
+        String lifecycle) throws Exception {
+        WORKFLOWAPP wf = getWorkflowapp(fs, coord);
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        String lifeCyclePath = lifecycle == null ? "" : "/" + lifecycle;
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            List<String> files = null;
+            if (action.getJava() != null) {
+                files = action.getJava().getFile();
+            } else if (action.getPig() != null) {
+                files = action.getPig().getFile();
+            } else if (action.getMapReduce() != null) {
+                files = action.getMapReduce().getFile();
+            }
+            if (files != null) {
+                Assert.assertTrue(files.get(files.size() - 1).endsWith(
+                    "/projects/falcon/working/libext/" + type.name() + lifeCyclePath + "/ext.jar"));
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected WORKFLOWAPP getWorkflowapp(FileSystem fs, COORDINATORAPP coord) throws JAXBException, IOException {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        return getWorkflowapp(fs, new Path(wfPath));
+    }
+
+    @SuppressWarnings("unchecked")
+    protected WORKFLOWAPP getWorkflowapp(FileSystem fs, Path path) throws JAXBException, IOException {
+        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(fs.open(path))).getValue();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 5ceea75..3655af9 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -65,11 +65,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
@@ -197,7 +193,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         assertEquals(props.get("mapred.job.priority"), "LOW");
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
-        assertLibExtensions(coord);
+        assertLibExtensions(fs, coord, EntityType.PROCESS, null);
     }
 
     private String getLogPath(Process process) {
@@ -309,7 +305,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
         testParentWorkflow(process, parentWorkflow);
 
         List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
@@ -367,7 +363,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
         testParentWorkflow(process, parentWorkflow);
 
         List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
@@ -425,7 +421,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
         testParentWorkflow(process, parentWorkflow);
 
         List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
@@ -479,7 +475,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
         testParentWorkflow(process, parentWorkflow);
 
         List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
@@ -501,7 +497,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
     }
 
     private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
-        Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml");
+        Path hiveConfPath = new Path(new Path(wfPath).getParent(), "conf/hive-site.xml");
         Assert.assertTrue(fs.exists(hiveConfPath));
 
         if (SecurityUtil.isSecurityEnabled()) {
@@ -590,7 +586,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
 
         Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
         assertHCatCredentials(parentWorkflow, wfPath);
@@ -638,33 +634,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals("1.0.0", processWorkflow.getVersion());
     }
 
-    @SuppressWarnings("unchecked")
-    private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
-                fs.open(new Path(wfPath, "workflow.xml")))).getValue();
-        List<Object> actions = wf.getDecisionOrForkOrJoin();
-        for (Object obj : actions) {
-            if (!(obj instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) obj;
-            List<String> files = null;
-            if (action.getJava() != null) {
-                files = action.getJava().getFile();
-            } else if (action.getPig() != null) {
-                files = action.getPig().getFile();
-            } else if (action.getMapReduce() != null) {
-                files = action.getMapReduce().getFile();
-            }
-            if (files != null) {
-                Assert.assertTrue(files.get(files.size() - 1)
-                        .endsWith("/projects/falcon/working/libext/PROCESS/ext.jar"));
-            }
-        }
-    }
-
     private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
         throws Exception {
         OozieEntityBuilder builder = OozieEntityBuilder.get(process);
@@ -685,7 +654,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         assertEquals(coord.getControls().getTimeout(), timeout);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        return getParentWorkflow(new Path(wfPath));
+        return getWorkflowapp(fs, new Path(wfPath));
     }
 
     public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
@@ -708,16 +677,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
     }
 
-    @SuppressWarnings("unchecked")
-    private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
-        String workflow = readFile(fs, new Path(path, "workflow.xml"));
-
-        JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
-        return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
-    }
-
     @AfterMethod
     public void cleanup() throws Exception {
         cleanupStore();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/prism/src/main/java/org/apache/falcon/FalconWebException.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/FalconWebException.java b/prism/src/main/java/org/apache/falcon/FalconWebException.java
index d552c07..736a8b5 100644
--- a/prism/src/main/java/org/apache/falcon/FalconWebException.java
+++ b/prism/src/main/java/org/apache/falcon/FalconWebException.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
@@ -28,6 +27,8 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 
 /**
  * Exception for REST APIs.
@@ -36,34 +37,28 @@ public class FalconWebException extends WebApplicationException {
 
     private static final Logger LOG = LoggerFactory.getLogger(FalconWebException.class);
 
-    public static FalconWebException newException(Throwable e,
-                                                  Response.Status status) {
-        LOG.error("Failure reason", e);
-        return newException(e.getMessage() + "\n" + getAddnInfo(e), status);
+    public static FalconWebException newException(Throwable e, Response.Status status) {
+        return newException(getMessage(e), status);
     }
 
     public static FalconWebException newInstanceException(Throwable e, Response.Status status) {
-        LOG.error("Failure reason", e);
-        return newInstanceException(e.getMessage() + "\n" + getAddnInfo(e), status);
+        return newInstanceException(getMessage(e), status);
     }
 
     public static FalconWebException newInstanceSummaryException(Throwable e, Response.Status status) {
-        LOG.error("Failure reason", e);
-        String message = e.getMessage() + "\n" + getAddnInfo(e);
+        String message = getMessage(e);
         LOG.error("Action failed: {}\nError: {}", status, message);
         APIResult result = new InstancesSummaryResult(APIResult.Status.FAILED, message);
         return new FalconWebException(Response.status(status).entity(result).type(MediaType.TEXT_XML_TYPE).build());
     }
 
-    public static FalconWebException newException(APIResult result,
-                                                  Response.Status status) {
+    public static FalconWebException newException(APIResult result, Response.Status status) {
         LOG.error("Action failed: {}\nError: {}", status, result.getMessage());
         return new FalconWebException(Response.status(status).
                 entity(result).type(MediaType.TEXT_XML_TYPE).build());
     }
 
-    public static FalconWebException newException(String message,
-                                                  Response.Status status) {
+    public static FalconWebException newException(String message, Response.Status status) {
         LOG.error("Action failed: {}\nError: {}", status, message);
         APIResult result = new APIResult(APIResult.Status.FAILED, message);
         return new FalconWebException(Response.status(status).
@@ -77,19 +72,9 @@ public class FalconWebException extends WebApplicationException {
     }
 
     private static String getMessage(Throwable e) {
-        if (StringUtils.isEmpty(e.getMessage())) {
-            return e.getClass().getName();
-        }
-        return e.getMessage();
-    }
-
-    private static String getAddnInfo(Throwable e) {
-        String addnInfo = "";
-        Throwable cause = e.getCause();
-        if (cause != null && cause.getMessage() != null && !getMessage(e).contains(cause.getMessage())) {
-            addnInfo = cause.getMessage();
-        }
-        return addnInfo;
+        StringWriter errors = new StringWriter();
+        e.printStackTrace(new PrintWriter(errors));
+        return errors.toString();
     }
 
     public FalconWebException(Response response) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index c135470..3d9078e 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -171,6 +171,18 @@ public abstract class AbstractEntityManager {
             EntityType entityType = EntityType.valueOf(type.toUpperCase());
             Entity entity = deserializeEntity(request, entityType);
             validate(entity);
+
+            //Validate that the entity can be scheduled in the cluster
+            if (entity.getEntityType().isSchedulable()) {
+                Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+                for (String cluster : clusters) {
+                    try {
+                        getWorkflowEngine().dryRun(entity, cluster);
+                    } catch (FalconException e) {
+                        throw new FalconException("dryRun failed on cluster " + cluster, e);
+                    }
+                }
+            }
             return new APIResult(APIResult.Status.SUCCEEDED,
                     "Validated successfully (" + entityType + ") " + entity.getName());
         } catch (Throwable e) {
@@ -231,9 +243,9 @@ public abstract class AbstractEntityManager {
             validateUpdate(oldEntity, newEntity);
             configStore.initiateUpdate(newEntity);
 
-            List<String> effectiveTimes = new ArrayList<String>();
             Date effectiveTime =
                 StringUtils.isEmpty(effectiveTimeStr) ? null : EntityUtil.parseDateUTC(effectiveTimeStr);
+            StringBuilder result = new StringBuilder("Updated successfully");
             //Update in workflow engine
             if (!DeploymentUtil.isPrism()) {
                 Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
@@ -243,10 +255,7 @@ public abstract class AbstractEntityManager {
 
                 for (String cluster : newClusters) {
                     Date myEffectiveTime = validateEffectiveTime(newEntity, cluster, effectiveTime);
-                    Date effectiveEndTime = getWorkflowEngine().update(oldEntity, newEntity, cluster, myEffectiveTime);
-                    if (effectiveEndTime != null) {
-                        effectiveTimes.add("(" + cluster + ", " + SchemaHelper.formatDateUTC(effectiveEndTime) + ")");
-                    }
+                    result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster, myEffectiveTime));
                 }
                 for (String cluster : oldClusters) {
                     getWorkflowEngine().delete(oldEntity, cluster);
@@ -255,8 +264,7 @@ public abstract class AbstractEntityManager {
 
             configStore.update(entityType, newEntity);
 
-            return new APIResult(APIResult.Status.SUCCEEDED, entityName + " updated successfully"
-                    + (effectiveTimes.isEmpty() ? "" : " with effect from " + effectiveTimes));
+            return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
         } catch (Throwable e) {
             LOG.error("Update failed", e);
             throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index e214392..6be5e25 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -19,7 +19,6 @@ package org.apache.falcon.resource;
 
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
-import org.apache.falcon.client.FalconClient;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -32,7 +31,6 @@ import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.entity.v0.process.Validity;
-import org.apache.falcon.resource.EntityList.EntityElement;
 import org.apache.falcon.util.BuildProperties;
 import org.apache.falcon.util.DeploymentProperties;
 import org.apache.falcon.util.OozieTestUtils;
@@ -64,6 +62,7 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -128,11 +127,7 @@ public class EntityManagerJerseyIT {
         context.assertSuccessful(response);
     }
 
-    private void update(TestContext context, Entity entity) throws Exception {
-        update(context, entity, null);
-    }
-
-    private void update(TestContext context, Entity entity, Date endTime) throws Exception {
+    private ClientResponse update(TestContext context, Entity entity, Date endTime) throws Exception {
         File tmpFile = TestContext.getTempFile();
         entity.getEntityType().getMarshaller().marshal(entity, tmpFile);
         WebResource resource = context.service.path("api/entities/update/"
@@ -140,11 +135,9 @@ public class EntityManagerJerseyIT {
         if (endTime != null) {
             resource = resource.queryParam("effective", SchemaHelper.formatDateUTC(endTime));
         }
-        ClientResponse response = resource
-                .header("Cookie", context.getAuthenticationToken())
+        return resource.header("Cookie", context.getAuthenticationToken())
                 .accept(MediaType.TEXT_XML)
                 .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
-        context.assertSuccessful(response);
     }
 
     @Test
@@ -167,7 +160,8 @@ public class EntityManagerJerseyIT {
 
         //change output feed path and update feed as another user
         feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
-        update(context, feed);
+        ClientResponse response = update(context, feed, null);
+        context.assertSuccessful(response);
 
         bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 2);
@@ -211,55 +205,45 @@ public class EntityManagerJerseyIT {
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
         context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
-        OozieTestUtils.waitForWorkflowStart(context, context.processName);
     }
 
-    public void testProcessDeleteAndSchedule() throws Exception {
-        //Submit process with invalid property so that coord submit fails and bundle goes to failed state
+    public void testDryRun() throws Exception {
+        //Schedule of invalid process should fail because of dryRun
         TestContext context = newContext();
         Map<String, String> overlay = context.getUniqueOverlay();
         String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
         Property prop = new Property();
         prop.setName("newProp");
-        prop.setValue("${formatTim()}");
+        prop.setValue("${instanceTim()}");  //invalid property
         process.getProperties().getProperties().add(prop);
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
-        OozieTestUtils.waitForBundleStart(context, Status.FAILED, Status.KILLED);
-
-        FalconClient client = new FalconClient(TestContext.BASE_URL);
-        EntityList deps = client.getDependency(EntityType.PROCESS.name(), process.getName());
-        for (EntityElement dep : deps.getElements()) {
-            if (dep.name.equals(process.getInputs().getInputs().get(0).getName())) {
-                Assert.assertEquals("Input", dep.tag);
-            } else if (dep.name.equals(process.getOutputs().getOutputs().get(0).getName())) {
-                Assert.assertEquals("Output", dep.tag);
-            }
-        }
 
-        //Delete and re-submit the process with correct workflow
-        ClientResponse clientResponse = context.service
-                .path("api/entities/delete/process/" + context.processName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .delete(ClientResponse.class);
-        context.assertSuccessful(clientResponse);
+        ClientResponse response = context.validate(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS);
+        context.assertFailure(response);
+
+        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false);
 
-        process.getWorkflow().setPath("/falcon/test/workflow");
+        //Fix the process and then submitAndSchedule should succeed
+        Iterator<Property> itr = process.getProperties().getProperties().iterator();
+        while (itr.hasNext()) {
+            Property myProp = itr.next();
+            if (myProp.getName().equals("newProp")) {
+                itr.remove();
+            }
+        }
         tmpFile = TestContext.getTempFile();
+        process.setName("process" + System.currentTimeMillis());
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        clientResponse = context.service.path("api/entities/submitAndSchedule/process")
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .type(MediaType.TEXT_XML)
-                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
-        context.assertSuccessful(clientResponse);
+        response = context.submitAndSchedule(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS);
+        context.assertSuccessful(response);
 
-        //Assert that new schedule creates new bundle
-        List<BundleJob> bundles = OozieTestUtils.getBundles(context);
-        Assert.assertEquals(bundles.size(), 2);
+        //Update with invalid property should fail again
+        process.getProperties().getProperties().add(prop);
+        updateEndtime(process);
+        response = update(context, process, null);
+        context.assertFailure(response);
     }
 
     @Test
@@ -278,7 +262,8 @@ public class EntityManagerJerseyIT {
         //update process should create new bundle
         Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
         updateEndtime(process);
-        update(context, process);
+        ClientResponse response = update(context, process, null);
+        context.assertSuccessful(response);
         bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 2);
     }
@@ -313,7 +298,8 @@ public class EntityManagerJerseyIT {
 
         updateEndtime(process);
         Date endTime = getEndTime();
-        update(context, process, endTime);
+        response = update(context, process, endTime);
+        context.assertSuccessful(response);
 
         //Assert that update creates new bundle and old coord is running
         bundles = OozieTestUtils.getBundles(context);
@@ -345,7 +331,8 @@ public class EntityManagerJerseyIT {
         Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
 
         updateEndtime(process);
-        update(context, process);
+        ClientResponse response = update(context, process, null);
+        context.assertSuccessful(response);
 
         //Assert that update does not create new bundle
         List<BundleJob> bundles = OozieTestUtils.getBundles(context);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
index dceb2f2..d4a1d8a 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
@@ -23,7 +23,6 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.util.OozieTestUtils;
 import org.apache.oozie.client.BundleJob;
-import org.apache.oozie.client.Job.Status;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -65,19 +64,18 @@ public class EntityManagerJerseySmokeIT {
 
     @Test (dependsOnMethods = "testFeedSchedule")
     public void testProcessDeleteAndSchedule() throws Exception {
-        //Submit process with invalid property so that coord submit fails and bundle goes to failed state
+        //Schedule process, delete and then submitAndSchedule again should create new bundle
         TestContext context = newContext();
         Map<String, String> overlay = context.getUniqueOverlay();
         String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
         Property prop = new Property();
         prop.setName("newProp");
-        prop.setValue("${formatTim()}");
+        prop.setValue("${instanceTime()}");
         process.getProperties().getProperties().add(prop);
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
         context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
-        OozieTestUtils.waitForBundleStart(context, Status.FAILED);
 
         //Delete and re-submit the process with correct workflow
         ClientResponse clientResponse = context.service

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index bc79609..9752518 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -192,6 +192,10 @@ public class TestContext {
     }
 
     public void scheduleProcess(String processTemplate, Map<String, String> overlay) throws Exception {
+        scheduleProcess(processTemplate, overlay, true);
+    }
+
+    public void scheduleProcess(String processTemplate, Map<String, String> overlay, boolean succeed) throws Exception {
         ClientResponse response = submitToFalcon(CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
         assertSuccessful(response);
 
@@ -201,15 +205,12 @@ public class TestContext {
         response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED);
         assertSuccessful(response);
 
-        response = submitToFalcon(processTemplate, overlay, EntityType.PROCESS);
-        assertSuccessful(response);
-
-        ClientResponse clientRepsonse = this.service
-                .path("api/entities/schedule/process/" + processName)
-                .header("Cookie", getAuthenticationToken())
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        assertSuccessful(clientRepsonse);
+        response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS);
+        if (succeed) {
+            assertSuccessful(response);
+        } else {
+            assertFailure(response);
+        }
     }
 
     public void scheduleProcess() throws Exception {
@@ -249,6 +250,18 @@ public class TestContext {
                 .post(ClientResponse.class, rawlogStream);
     }
 
+    public ClientResponse validate(String template, Map<String, String> overlay, EntityType entityType)
+        throws Exception {
+        String tmpFile = overlayParametersOverTemplate(template, overlay);
+        ServletInputStream rawlogStream = getServletInputStream(tmpFile);
+
+        return this.service.path("api/entities/validate/" + entityType.name().toLowerCase())
+            .header("Cookie", getAuthenticationToken())
+            .accept(MediaType.TEXT_XML)
+            .type(MediaType.TEXT_XML)
+            .post(ClientResponse.class, rawlogStream);
+    }
+
     public ClientResponse submitToFalcon(String template, Map<String, String> overlay, EntityType entityType)
         throws IOException {
         String tmpFile = overlayParametersOverTemplate(template, overlay);


Mime
View raw message