falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [2/2] git commit: FALCON-206 Process update for wf changes. Contributed by Shwetha GS
Date Mon, 06 Jan 2014 05:51:43 GMT
FALCON-206 Process update for wf changes. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2ded6291
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2ded6291
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2ded6291

Branch: refs/heads/master
Commit: 2ded629145df0d9c537b3d65d2457c09083dfc9f
Parents: 8b47942
Author: Shwetha GS <shwethags@gmail.com>
Authored: Mon Jan 6 11:21:30 2014 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Mon Jan 6 11:21:30 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/falcon/entity/ClusterHelper.java |  12 +
 .../org/apache/falcon/entity/EntityUtil.java    |  91 +++++--
 .../org/apache/falcon/update/UpdateHelper.java  | 109 ++++++++-
 common/src/main/resources/log4j.xml             |  14 --
 common/src/main/resources/startup.properties    |   3 +-
 .../falcon/entity/parser/FeedUpdateTest.java    |   3 +
 .../entity/parser/ProcessEntityParserTest.java  |   4 +-
 .../apache/falcon/update/UpdateHelperTest.java  | 100 ++++++--
 .../resources/config/process/process-0.1.xml    |   2 +-
 .../resources/config/process/process-0.2.xml    |   2 +-
 .../resources/config/process/process-table.xml  |   2 +-
 .../workflow/OozieFeedWorkflowBuilder.java      |   3 +-
 .../service/SharedLibraryHostingService.java    |   3 +-
 .../falcon/workflow/OozieWorkflowBuilder.java   |  13 +-
 .../engine/OozieHouseKeepingService.java        |   3 +-
 .../workflow/engine/OozieWorkflowEngine.java    | 238 +++++++++----------
 .../falcon/resource/AbstractEntityManager.java  |  34 ++-
 prism/src/main/resources/log4j.xml              |  14 --
 .../falcon/converter/OozieProcessMapper.java    | 127 +++++++---
 .../workflow/OozieProcessWorkflowBuilder.java   |   3 +-
 .../converter/OozieProcessMapperTest.java       |  24 +-
 .../falcon/replication/CustomReplicator.java    |   2 +-
 .../falcon/replication/FeedReplicator.java      |   6 +-
 .../falcon/replication/FilteredCopyListing.java |   6 +-
 src/bin/service-start.sh                        |   2 +-
 src/conf/log4j.xml                              |  14 --
 src/conf/startup.properties                     |   1 -
 webapp/src/main/resources/log4j.xml             |  14 --
 .../org/apache/falcon/logging/LogMoverIT.java   |   4 +-
 .../falcon/resource/EntityManagerJerseyIT.java  | 104 ++++----
 .../org/apache/falcon/resource/TestContext.java |  21 +-
 .../org/apache/falcon/util/OozieTestUtils.java  |   4 +-
 33 files changed, 611 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 35bd763..87cdf71 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-206 Process update for wf changes. (Shwetha GS)
+
     FALCON-236 Falcon process output events which is optional should have at least one event 
     if defined. (Shaik Idris Ali via Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index e332aba..38b5c5b 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,10 +18,14 @@
 
 package org.apache.falcon.entity;
 
+import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.cluster.*;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
+
 /**
  * Helper to get end points relating to the cluster.
  */
@@ -43,6 +47,14 @@ public final class ClusterHelper {
         return conf;
     }
 
+    public static FileSystem getFileSystem(Cluster cluster) throws FalconException {
+        try {
+            return FileSystem.get(getConfiguration(cluster));
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
     public static String getOozieUrl(Cluster cluster) {
         return getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint();
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index ba80cac..a3ad83d 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -36,8 +36,12 @@ import org.apache.falcon.entity.v0.process.*;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.text.DateFormat;
 import java.text.ParseException;
@@ -53,6 +57,11 @@ public final class EntityUtil {
     private static final long DAY_IN_MS = 86400000L;
     private static final long MONTH_IN_MS = 2592000000L;
 
+    public static final String PROCESS_CHECKSUM_FILE = "checksums";
+    public static final String PROCESS_USER_DIR = "user";
+    public static final String PROCESS_USERLIB_DIR = "userlib";
+    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+
     private EntityUtil() {}
 
     public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException {
@@ -395,15 +404,6 @@ public final class EntityUtil {
         }
     }
 
-    public static String getStagingPath(Entity entity) throws FalconException {
-        try {
-            return "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName() + "/"
-                    + md5(entity);
-        } catch (Exception e) {
-            throw new FalconException(e);
-        }
-    }
-
     public static WorkflowName getWorkflowName(Tag tag, List<String> suffixes,
                                                Entity entity) {
         WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
@@ -537,33 +537,75 @@ public final class EntityUtil {
         }
     }
 
-    public static Path getStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
-        throws FalconException {
+    //Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml
+    //Each entity update creates a new staging path and creates staging path/_SUCCESS when update is complete
+    //Base staging path is the base path for all staging dirs
+    public static Path getBaseStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
+        return new Path(ClusterHelper.getLocation(cluster, "staging"),
+                "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
+    }
+
+    //Creates new staging path for entity schedule/update
+    public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
+        return new Path(getBaseStagingPath(cluster, entity), String.valueOf(System.currentTimeMillis()));
+    }
 
+    private static Path getStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Path path)
+        throws FalconException {
         try {
-            return new Path(ClusterHelper.getLocation(cluster, "staging"),
-                    EntityUtil.getStagingPath(entity));
-        } catch (Exception e) {
+            FileSystem fs = ClusterHelper.getFileSystem(cluster);
+            FileStatus latest = null;
+            FileStatus[] files = fs.globStatus(path, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    if (path.getName().equals("logs")) {
+                        return false;
+                    }
+                    return true;
+                }
+            });
+            if (files == null || files.length == 0) {
+                return null;
+            }
+
+            for (FileStatus file : files) {
+                if (latest == null || latest.getModificationTime() < file.getModificationTime()) {
+                    latest = file;
+                }
+            }
+            return latest.getPath();
+        } catch (IOException e) {
             throw new FalconException(e);
         }
     }
 
+    //Returns latest staging path - includes incomplete staging paths as well (updates are not complete)
+    public static Path getLatestStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
+        throws FalconException {
+        return getStagingPath(cluster, new Path(getBaseStagingPath(cluster, entity), "*"));
+    }
+
+    //Returns latest staging path for which update is complete (latest that contains _SUCCESS)
+    public static Path getLastCommittedStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
+        throws FalconException {
+        Path stagingPath = getStagingPath(cluster, new Path(getBaseStagingPath(cluster, entity),
+                "*/" + SUCCEEDED_FILE_NAME));
+        return stagingPath == null ? null : stagingPath.getParent();
+    }
+
     public static LateProcess getLateProcess(Entity entity)
         throws FalconException {
 
         switch (entity.getEntityType()) {
         case FEED:
-            if (!RuntimeProperties.get()
-                    .getProperty("feed.late.allowed", "true")
-                    .equalsIgnoreCase("true")) {
+            if (!RuntimeProperties.get().getProperty("feed.late.allowed", "true").equalsIgnoreCase("true")) {
                 return null;
             }
 
             LateProcess lateProcess = new LateProcess();
-            lateProcess.setDelay(new Frequency(RuntimeProperties.get()
-                    .getProperty("feed.late.frequency", "hours(3)")));
-            lateProcess.setPolicy(PolicyType.fromValue(RuntimeProperties.get()
-                    .getProperty("feed.late.policy", "exp-backoff")));
+            lateProcess.setDelay(new Frequency(RuntimeProperties.get().getProperty("feed.late.frequency", "hours(3)")));
+            lateProcess.setPolicy(
+                    PolicyType.fromValue(RuntimeProperties.get().getProperty("feed.late.policy", "exp-backoff")));
             LateInput lateInput = new LateInput();
             lateInput.setInput(entity.getName());
             //TODO - Assuming the late workflow is not used
@@ -578,11 +620,8 @@ public final class EntityUtil {
         }
     }
 
-    public static Path getLogPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
-        throws FalconException {
-
-        return new Path(ClusterHelper.getLocation(cluster,
-                "staging"), EntityUtil.getStagingPath(entity) + "/../logs");
+    public static Path getLogPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
+        return new Path(getBaseStagingPath(cluster, entity), "logs");
     }
 
     public static String fromUTCtoURIDate(String utc) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index fc69933..84501de 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -19,9 +19,11 @@
 package org.apache.falcon.update;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
@@ -31,10 +33,17 @@ import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.*;
 
 /**
  * Helper methods to facilitate entity updates.
@@ -52,7 +61,7 @@ public final class UpdateHelper {
 
     private UpdateHelper() {}
 
-    public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, String cluster) throws FalconException {
+    public static boolean isEntityUpdated(Entity oldEntity, Entity newEntity, String cluster) throws FalconException {
         Entity oldView = EntityUtil.getClusterView(oldEntity, cluster);
         Entity newView = EntityUtil.getClusterView(newEntity, cluster);
         switch (oldEntity.getEntityType()) {
@@ -61,14 +70,106 @@ public final class UpdateHelper {
 
         case PROCESS:
             return !EntityUtil.equals(oldView, newView, PROCESS_FIELDS);
+
         default:
         }
         throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType());
     }
 
+    //Read checksum file
+    private static Map<String, String> readChecksums(FileSystem fs, Path path) throws FalconException {
+        try {
+            Map<String, String> checksums = new HashMap<String, String>();
+            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
+            try {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    String[] parts = line.split("=");
+                    checksums.put(parts[0], parts[1]);
+                }
+            } finally {
+                reader.close();
+            }
+            return checksums;
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    //Checks if the user workflow or lib is updated
+    public static boolean isWorkflowUpdated(String cluster, Entity entity) throws FalconException {
+        if (entity.getEntityType() != EntityType.PROCESS) {
+            return false;
+        }
+
+        try {
+            Process process = (Process) entity;
+            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                    ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+            Path bundlePath = EntityUtil.getLastCommittedStagingPath(clusterEntity, process);
+            if (bundlePath == null) {
+                return true;
+            }
+
+            Path checksum = new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE);
+            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(clusterEntity));
+            if (!fs.exists(checksum)) {
+                //Update if there is no checksum file(for migration)
+                return true;
+            }
+            Map<String, String> checksums = readChecksums(fs, checksum);
+
+            //Get checksum from user wf/lib
+            Map<String, String> wfPaths = checksumAndCopy(fs, new Path(process.getWorkflow().getPath()), null);
+            if (process.getWorkflow().getLib() != null) {
+                wfPaths.putAll(checksumAndCopy(fs, new Path(process.getWorkflow().getLib()), null));
+            }
+
+            //Update if the user wf/lib is updated i.e., if checksums are different
+            if (!wfPaths.equals(checksums)) {
+                return true;
+            }
+
+            return false;
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    //Recursively traverses each file and tracks checksum
+    //If dest != null, each traversed file is copied to dest
+    public static Map<String, String> checksumAndCopy(FileSystem fs, Path src, Path dest) throws FalconException {
+        try {
+            if (dest != null && !fs.exists(dest) && !fs.mkdirs(dest)) {
+                throw new FalconException("mkdir failed on " + dest);
+            }
+
+            Configuration conf = new Configuration();
+            Map<String, String> paths = new HashMap<String, String>();
+            if (fs.isFile(src)) {
+                paths.put(src.toString(), fs.getFileChecksum(src).toString());
+                if (dest != null) {
+                    Path target = new Path(dest, src.getName());
+                    FileUtil.copy(fs, src, fs, target, false, conf);
+                    LOG.debug("Copied " + src + " to " + target);
+                }
+            } else {
+                FileStatus[] files = fs.listStatus(src);
+                if (files != null) {
+                    for (FileStatus file : files) {
+                        paths.putAll(checksumAndCopy(fs, file.getPath(),
+                                ((dest == null) ? null : new Path(dest, file.getPath().getName()))));
+                    }
+                }
+            }
+            return paths;
+        } catch(IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
     public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity)
         throws FalconException {
-
         if (oldEntity.getEntityType() == EntityType.FEED && affectedEntity.getEntityType() == EntityType.PROCESS) {
             return shouldUpdate((Feed) oldEntity, (Feed) newEntity, (Process) affectedEntity);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml
index 734d17c..dd1ef70 100644
--- a/common/src/main/resources/log4j.xml
+++ b/common/src/main/resources/log4j.xml
@@ -45,15 +45,6 @@
         </layout>
     </appender>
 
-    <appender name="TRANSACTIONLOG" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/target/logs/tranlog.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %x %m%n"/>
-        </layout>
-    </appender>
-
     <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${user.dir}/target/logs/metric.log"/>
         <param name="Append" value="true"/>
@@ -73,11 +64,6 @@
         <appender-ref ref="AUDIT"/>
     </logger>
 
-    <logger name="TRANSACTIONLOG">
-        <level value="info"/>
-        <appender-ref ref="TRANSACTIONLOG"/>
-    </logger>
-
     <logger name="METRIC">
         <level value="info"/>
         <appender-ref ref="METRIC"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 5473f5d..3014418 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -23,7 +23,6 @@
 *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
 *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
 *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
-*.journal.impl=org.apache.falcon.transaction.SharedFileSystemJournal
 *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
 *.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
 *.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
@@ -40,7 +39,7 @@
                         org.apache.falcon.group.FeedGroupMap,\
                         org.apache.falcon.service.SharedLibraryHostingService
 *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
-*.shared.libs=activemq-core,geronimo-j2ee-management,hadoop-distcp,jms,json-simple,oozie-client,spring-jms,s4fs-0.1.jar
+*.shared.libs=activemq-core,geronimo-j2ee-management,hadoop-distcp,jms,json-simple,oozie-client,spring-jms
 
 ######### Implementation classes #########
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/test/java/org/apache/falcon/entity/parser/FeedUpdateTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedUpdateTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedUpdateTest.java
index f39f300..3bf0b58 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedUpdateTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedUpdateTest.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -123,6 +124,8 @@ public class FeedUpdateTest extends AbstractTestBase {
         Feed feed = parser.parseAndValidate(this.getClass()
                 .getResourceAsStream(FEED_XML));
         ConfigurationStore.get().publish(EntityType.FEED, feed);
+
+        dfsCluster.getFileSystem().mkdirs(new Path("/falcon/test/workflow"));
         Process process = processParser.parseAndValidate(this.getClass()
                 .getResourceAsStream(PROCESS1_XML));
         ConfigurationStore.get().publish(EntityType.PROCESS, process);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index e656772..520541a 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -37,6 +37,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -76,6 +77,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         storeEntity(EntityType.FEED, "imp-click-join1");
         storeEntity(EntityType.FEED, "imp-click-join2");
         storeEntity(EntityType.PROCESS, "sample");
+        dfsCluster.getFileSystem().mkdirs(new Path("/falcon/test/workflow"));
     }
 
     @Test
@@ -111,7 +113,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         Assert.assertEquals(process.getTimezone().getID(), "UTC");
 
         Assert.assertEquals(process.getWorkflow().getEngine().name().toLowerCase(), "oozie");
-        Assert.assertEquals(process.getWorkflow().getPath(), "/path/to/workflow");
+        Assert.assertEquals(process.getWorkflow().getPath(), "/falcon/test/workflow");
 
         StringWriter stringWriter = new StringWriter();
         Marshaller marshaller = EntityType.PROCESS.getMarshaller();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index 42bcee0..7718109 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -18,15 +18,19 @@
 
 package org.apache.falcon.update;
 
+import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.parser.EntityParserFactory;
 import org.apache.falcon.entity.parser.FeedEntityParser;
 import org.apache.falcon.entity.parser.ProcessEntityParser;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
@@ -37,12 +41,16 @@ import org.apache.falcon.entity.v0.feed.Properties;
 import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.process.PolicyType;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
 import java.io.InputStream;
 
 /**
@@ -76,33 +84,89 @@ public class UpdateHelperTest extends AbstractTestBase {
         storeEntity(EntityType.FEED, "imp-click-join2");
     }
 
+    private void prepare(Process process) throws IOException, FalconException {
+        FileSystem fs = dfsCluster.getFileSystem();
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, "testCluster");
+        Path staging = EntityUtil.getNewStagingPath(clusterEntity, process);
+        fs.mkdirs(staging);
+        fs.create(new Path(staging, "workflow.xml")).close();
+        fs.create(new Path(staging, "checksums")).close();
+    }
+
+    @Test
+    public void testWorkflowUpdate() throws IOException, FalconException {
+        FileSystem fs = dfsCluster.getFileSystem();
+        Process process = processParser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML));
+        String cluster = "testCluster";
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        Path staging = EntityUtil.getNewStagingPath(clusterEntity, process);
+        fs.mkdirs(staging);
+        fs.create(new Path(staging, "workflow.xml")).close();
+        fs.create(new Path(staging, EntityUtil.SUCCEEDED_FILE_NAME)).close();
+
+        //Update if there is no checksum file
+        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process));
+
+        //No update if there is no new file
+        fs.create(new Path(staging, "checksums")).close();
+        Assert.assertFalse(UpdateHelper.isWorkflowUpdated(cluster, process));
+
+        //Update if there is new lib
+        Path libpath = new Path("/falcon/test/lib");
+        process.getWorkflow().setLib(libpath.toString());
+        fs.mkdirs(libpath);
+        Path lib = new Path(libpath, "new.jar");
+        fs.create(lib).close();
+        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process));
+
+        //Don't Update if the lib is not updated
+        fs.delete(new Path(staging, "checksums"), true);
+        FSDataOutputStream stream = fs.create(new Path(staging, "checksums"));
+        stream.write((dfsCluster.getConf().get("fs.default.name") + lib.toString() + "="
+                + fs.getFileChecksum(lib).toString() + "\n").getBytes());
+        stream.close();
+        Assert.assertFalse(UpdateHelper.isWorkflowUpdated(cluster, process));
+
+        //Update if the lib is updated
+        fs.delete(lib, true);
+        stream = fs.create(lib);
+        stream.writeChars("some new jar");
+        stream.close();
+        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process));
+
+        //Update if the lib is deleted
+        fs.delete(lib, true);
+        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process));
+    }
+
     @Test
     public void testShouldUpdate2() throws Exception {
         Feed oldFeed = parser.parseAndValidate(this.getClass()
                 .getResourceAsStream(FEED_XML));
         String cluster = "testCluster";
         Feed newFeed = (Feed) oldFeed.copy();
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster));
 
         newFeed.setGroups("newgroups");
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster));
         newFeed.getLateArrival().setCutOff(Frequency.fromString("hours(8)"));
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster));
         newFeed.setFrequency(Frequency.fromString("days(1)"));
-        Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster));
 
         Process oldProcess = processParser.parseAndValidate(this.getClass().
                 getResourceAsStream(PROCESS_XML));
+        prepare(oldProcess);
         Process newProcess = (Process) oldProcess.copy();
 
         newProcess.getRetry().setPolicy(PolicyType.FINAL);
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldProcess, newProcess, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
         newProcess.getLateProcess().getLateInputs().remove(1);
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldProcess, newProcess, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
         newProcess.getLateProcess().setPolicy(PolicyType.PERIODIC);
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldProcess, newProcess, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
         newProcess.setFrequency(Frequency.fromString("days(1)"));
-        Assert.assertTrue(UpdateHelper.shouldUpdate(oldProcess, newProcess, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
     }
 
     @Test
@@ -113,6 +177,8 @@ public class UpdateHelperTest extends AbstractTestBase {
         Feed newFeed = (Feed) oldFeed.copy();
         Process process = processParser.parseAndValidate(this.getClass().
                 getResourceAsStream(PROCESS_XML));
+        prepare(process);
+        Process newProcess = (Process) process.copy();
 
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
@@ -168,26 +234,32 @@ public class UpdateHelperTest extends AbstractTestBase {
 
         String cluster = "testCluster";
         Feed newTableFeed = (Feed) oldTableFeed.copy();
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldTableFeed, newTableFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster));
 
         newTableFeed.setGroups("newgroups");
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldTableFeed, newTableFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster));
         newTableFeed.setFrequency(Frequency.fromString("days(1)"));
-        Assert.assertTrue(UpdateHelper.shouldUpdate(oldTableFeed, newTableFeed, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster));
 
         final CatalogTable table = new CatalogTable();
         table.setUri("catalog:default:clicks-blah#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}");
         newTableFeed.setTable(table);
-        Assert.assertTrue(UpdateHelper.shouldUpdate(oldTableFeed, newTableFeed, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster));
 
         inputStream = getClass().getResourceAsStream("/config/process/process-table.xml");
         Process oldProcess = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(inputStream);
+        FileSystem fs = dfsCluster.getFileSystem();
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, "testCluster");
+        Path staging = EntityUtil.getNewStagingPath(clusterEntity, oldProcess);
+        fs.mkdirs(staging);
+        fs.create(new Path(staging, "workflow.xml")).close();
+        fs.create(new Path(staging, "checksums")).close();
         Process newProcess = (Process) oldProcess.copy();
 
         newProcess.getRetry().setPolicy(PolicyType.FINAL);
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldProcess, newProcess, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
         newProcess.setFrequency(Frequency.fromString("days(1)"));
-        Assert.assertTrue(UpdateHelper.shouldUpdate(oldProcess, newProcess, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
     }
 
     private static Location getLocation(Feed feed, LocationType type) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml
index bb5cd35..0aa249b 100644
--- a/common/src/test/resources/config/process/process-0.1.xml
+++ b/common/src/test/resources/config/process/process-0.1.xml
@@ -43,7 +43,7 @@
         <property name="name2" value="value2"/>
     </properties>
 
-    <workflow engine="oozie" path="/path/to/workflow"/>
+    <workflow engine="oozie" path="/falcon/test/workflow"/>
 
     <retry policy="periodic" delay="minutes(10)" attempts="3"/>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/test/resources/config/process/process-0.2.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.2.xml b/common/src/test/resources/config/process/process-0.2.xml
index c4cd83e..e1d5113 100644
--- a/common/src/test/resources/config/process/process-0.2.xml
+++ b/common/src/test/resources/config/process/process-0.2.xml
@@ -47,7 +47,7 @@
         <property name="name2" value="value2"/>
     </properties>
 
-    <workflow engine="oozie" path="/path/to/workflow"/>
+    <workflow engine="oozie" path="/falcon/test/workflow"/>
 
     <retry policy="periodic" delay="minutes(10)" attempts="3"/>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/common/src/test/resources/config/process/process-table.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-table.xml b/common/src/test/resources/config/process/process-table.xml
index 1d6a8f0..9408973 100644
--- a/common/src/test/resources/config/process/process-table.xml
+++ b/common/src/test/resources/config/process/process-table.xml
@@ -40,7 +40,7 @@
     </outputs>
 
     <!-- how -->
-    <workflow engine="oozie" path="/path/to/workflow"/>
+    <workflow engine="oozie" path="/falcon/test/workflow"/>
 
     <retry policy="periodic" delay="minutes(10)" attempts="3"/>
 </process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 7b9095f..5e3a30e 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -22,7 +22,6 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.converter.AbstractOozieEntityMapper;
 import org.apache.falcon.converter.OozieFeedMapper;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.v0.EntityType;
@@ -64,7 +63,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
         }
 
         Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
-        Path bundlePath = new Path(ClusterHelper.getLocation(cluster, "staging"), EntityUtil.getStagingPath(feed));
+        Path bundlePath = EntityUtil.getNewStagingPath(cluster, feed);
         Feed feedClone = (Feed) feed.copy();
         EntityUtil.setStartDate(feedClone, clusterName, startDate);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index c47ec01..37f8cfa 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -89,12 +89,11 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
 
     public static void pushLibsToHDFS(String src, Path target, Cluster cluster, FalconPathFilter pathFilter)
         throws IOException, FalconException {
-        LOG.debug("Copying libs from " + src);
-
         if (StringUtils.isEmpty(src)) {
             return;
         }
 
+        LOG.debug("Copying libs from " + src);
         Configuration conf = ClusterHelper.getConfiguration(cluster);
         conf.setInt("ipc.client.connect.max.retries", 10);
         FileSystem fs = null;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index 1978c53..e5a01ca 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -45,17 +45,13 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
     protected static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
 
     protected Properties createAppProperties(String clusterName, Path bundlePath, String user) throws FalconException {
-
         Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
         Properties properties = new Properties();
         if (cluster.getProperties() != null) {
-            addClusterProperties(properties, cluster.getProperties()
-                    .getProperties());
+            addClusterProperties(properties, cluster.getProperties().getProperties());
         }
-        properties.setProperty(OozieWorkflowEngine.NAME_NODE,
-                ClusterHelper.getStorageUrl(cluster));
-        properties.setProperty(OozieWorkflowEngine.JOB_TRACKER,
-                ClusterHelper.getMREndPoint(cluster));
+        properties.setProperty(OozieWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
+        properties.setProperty(OozieWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
         properties.setProperty(OozieClient.BUNDLE_APP_PATH,
                 "${" + OozieWorkflowEngine.NAME_NODE + "}" + bundlePath.toString());
         properties.setProperty("colo.name", cluster.getColo());
@@ -67,8 +63,7 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         return properties;
     }
 
-    private void addClusterProperties(Properties properties,
-                                      List<Property> clusterProperties) {
+    private void addClusterProperties(Properties properties, List<Property> clusterProperties) {
         for (Property prop : clusterProperties) {
             properties.setProperty(prop.getName(), prop.getValue());
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index 7e2f8a4..068e980 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -52,8 +52,7 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener {
     public void afterDelete(Entity entity, String clusterName) throws FalconException {
         try {
             Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
-            Path entityPath = new Path(ClusterHelper.getLocation(cluster, "staging"),
-                    EntityUtil.getStagingPath(entity)).getParent();
+            Path entityPath = EntityUtil.getBaseStagingPath(cluster, entity);
             LOG.info("Deleting entity path " + entityPath + " on cluster " + clusterName);
 
             Configuration conf = ClusterHelper.getConfiguration(cluster);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 1ed1ff7..b0af401 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -21,7 +21,9 @@ package org.apache.falcon.workflow.engine;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.*;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -33,12 +35,15 @@ import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.workflow.OozieWorkflowBuilder;
 import org.apache.falcon.workflow.WorkflowBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.*;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.client.Job.Status;
 import org.apache.oozie.client.rest.RestConstants;
 
+import java.io.IOException;
 import java.util.*;
 import java.util.Map.Entry;
 
@@ -102,10 +107,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) {
             String cluster = entry.getKey();
             BundleJob bundleJob = entry.getValue();
-            if (bundleJob == MISSING || bundleJob.getStatus().equals(Job.Status.KILLED)) {
-                if (bundleJob != MISSING) {
-                    LOG.warn("Bundle id: " + bundleJob.getId() + " is in killed state, so allowing schedule");
-                }
+            if (bundleJob == MISSING) {
                 schedClusters.add(cluster);
             } else {
                 LOG.debug("The entity " + entity.getName() + " is already scheduled on cluster " + cluster);
@@ -113,19 +115,28 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
 
         if (!schedClusters.isEmpty()) {
-            WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(
-                    ENGINE, entity);
-            Map<String, Properties> newFlows = builder.newWorkflowSchedule(
-                    entity, schedClusters);
+            WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
+            Map<String, Properties> newFlows = builder.newWorkflowSchedule(entity, schedClusters);
             for (Map.Entry<String, Properties> entry : newFlows.entrySet()) {
                 String cluster = entry.getKey();
-                LOG.info("Scheduling " + entity.toShortString()
-                        + " on cluster " + cluster);
+                LOG.info("Scheduling " + entity.toShortString() + " on cluster " + cluster);
                 scheduleEntity(cluster, entry.getValue(), entity);
+                commitStagingPath(cluster, entry.getValue().getProperty(OozieClient.BUNDLE_APP_PATH));
             }
         }
     }
 
+    private void commitStagingPath(String cluster, String path) throws FalconException {
+        path = StringUtils.removeStart(path, "${nameNode}");
+        FileSystem fs =
+                ClusterHelper.getFileSystem((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, cluster));
+        try {
+            fs.create(new Path(path, EntityUtil.SUCCEEDED_FILE_NAME)).close();
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
     @Override
     public boolean isActive(Entity entity) throws FalconException {
         return isBundleInState(entity, BundleStatus.ACTIVE);
@@ -173,31 +184,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return true;
     }
 
-    private BundleJob findBundle(Entity entity, String cluster)
-        throws FalconException {
-
-        String stPath = EntityUtil.getStagingPath(entity);
-        LOG.info("Staging path for entity " + stPath);
-        List<BundleJob> bundles = findBundles(entity, cluster);
-        for (BundleJob job : bundles) {
-            if (job.getAppPath().endsWith(stPath)) {
-                return getBundleInfo(cluster, job.getId());
-            }
-        }
-        return MISSING;
-    }
-
-    private List<BundleJob> findBundles(Entity entity, String cluster)
-        throws FalconException {
-
+    //Return all bundles for the entity in the requested cluster
+    private List<BundleJob> findBundles(Entity entity, String cluster) throws FalconException {
         try {
             OozieClient client = OozieClientFactory.get(cluster);
             List<BundleJob> jobs = client.getBundleJobsInfo(
-                    OozieClient.FILTER_NAME + "="
-                            + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
+                    OozieClient.FILTER_NAME + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
             if (jobs != null) {
                 List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
                 for (BundleJob job : jobs) {
+                    //Filtering bundles that correspond to deleted entities(endtime is set when an entity is deleted)
                     if (job.getEndTime() == null) {
                         filteredJobs.add(job);
                         LOG.debug("Found bundle " + job.getId());
@@ -211,9 +207,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private Map<String, List<BundleJob>> findBundles(Entity entity)
-        throws FalconException {
-
+    //Return all bundles for the entity for each cluster
+    private Map<String, List<BundleJob>> findBundles(Entity entity) throws FalconException {
         Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
         Map<String, List<BundleJob>> jobMap = new HashMap<String, List<BundleJob>>();
         for (String cluster : clusters) {
@@ -222,29 +217,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return jobMap;
     }
 
-    // During update, a new bundle may not be created if next start time >= end
-    // time
-    // In this case, there will not be a bundle with the latest entity md5
-    // So, pick last created bundle
-    private Map<String, BundleJob> findLatestBundle(Entity entity)
-        throws FalconException {
-
-        Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
-        Map<String, BundleJob> bundleMap = new HashMap<String, BundleJob>();
-        for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet()) {
-            String cluster = entry.getKey();
-            Date latest = null;
-            bundleMap.put(cluster, MISSING);
-            for (BundleJob job : entry.getValue()) {
-                if (latest == null || latest.before(job.getCreatedTime())) {
-                    bundleMap.put(cluster, job);
-                    latest = job.getCreatedTime();
-                }
-            }
+    //Return latest bundle(last created) for the entity for each cluster
+    private Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException {
+        Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+        Map<String, BundleJob> jobMap = new HashMap<String, BundleJob>();
+        for (String cluster : clusters) {
+            jobMap.put(cluster, findLatestBundle(entity, cluster));
         }
-        return bundleMap;
+        return jobMap;
     }
 
+    //Return latest bundle(last created) for the entity in the requested cluster
     private BundleJob findLatestBundle(Entity entity, String cluster) throws FalconException {
         List<BundleJob> bundles = findBundles(entity, cluster);
         Date latest = null;
@@ -800,20 +783,26 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         });
     }
 
-    private boolean canUpdateBundle(Entity oldEntity, Entity newEntity)
-        throws FalconException {
-        return EntityUtil.equals(oldEntity, newEntity, BUNDLE_UPDATEABLE_PROPS);
+    private boolean canUpdateBundle(Entity oldEntity, Entity newEntity, boolean wfUpdated) throws FalconException {
+        return !wfUpdated && EntityUtil.equals(oldEntity, newEntity, BUNDLE_UPDATEABLE_PROPS);
     }
 
     @Override
     public void update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException {
-        if (!UpdateHelper.shouldUpdate(oldEntity, newEntity, cluster)) {
+        boolean entityUpdated = UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster);
+        boolean wfUpdated = UpdateHelper.isWorkflowUpdated(cluster, newEntity);
+
+        if (!entityUpdated && !wfUpdated) {
             LOG.debug("Nothing to update for cluster " + cluster);
             return;
         }
 
-        BundleJob bundle = findLatestBundle(oldEntity, cluster);
-        if (bundle != MISSING) {
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        Path stagingPath = EntityUtil.getLastCommittedStagingPath(clusterEntity, oldEntity);
+
+        if (stagingPath != null) {  //update if entity is scheduled
+            BundleJob bundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
+            bundle = getBundleInfo(cluster, bundle.getId());
             LOG.info("Updating entity through Workflow Engine" + newEntity.toShortString());
             Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
             if (newEndTime.before(now())) {
@@ -823,19 +812,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             LOG.debug("Updating for cluster : " + cluster + ", bundle: " + bundle.getId());
 
-            if (canUpdateBundle(oldEntity, newEntity)) {
+            if (canUpdateBundle(oldEntity, newEntity, wfUpdated)) {
                 // only concurrency and endtime are changed. So, change coords
                 LOG.info("Change operation is adequate! : " + cluster + ", bundle: " + bundle.getId());
-                updateCoords(cluster, bundle.getId(), EntityUtil.getParallel(newEntity),
+                updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity),
                         EntityUtil.getEndTime(newEntity, cluster));
                 return;
             }
 
-            LOG.debug("Going to update ! : " + newEntity.toShortString() + cluster + ", bundle: "
+            LOG.debug("Going to update ! : " + newEntity.toShortString() + "for cluster " + cluster + ", bundle: "
                     + bundle.getId());
             updateInternal(oldEntity, newEntity, cluster, bundle, false);
-            LOG.info("Entity update complete : " + newEntity.toShortString() + cluster + ", bundle: "
-                    + bundle.getId());
+            LOG.info("Entity update complete : " + newEntity.toShortString() + cluster + ", bundle: " + bundle.getId());
         }
 
         //Update affected entities
@@ -874,6 +862,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         LOG.info("Entity update and all dependent entities updated: " + oldEntity.toShortString());
     }
 
+    //Returns bundle whose app path is same as the staging path(argument)
+    private BundleJob findBundleForStagingPath(String cluster, Entity entity, Path stagingPath) throws FalconException {
+        List<BundleJob> bundles = findBundles(entity, cluster);
+        String bundlePath = stagingPath.toUri().getPath();
+        for (BundleJob bundle : bundles) {
+            if (bundle.getAppPath().endsWith(bundlePath)) {
+                return bundle;
+            }
+        }
+        return null;
+    }
+
     private Date now() {
         Calendar cal = Calendar.getInstance();
         cal.set(Calendar.SECOND, 0);
@@ -897,123 +897,104 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return null;
     }
 
-    private void updateCoords(String cluster, String bundleId, int concurrency,
+    private void updateCoords(String cluster, BundleJob bundle, int concurrency,
                               Date endTime) throws FalconException {
         if (endTime.compareTo(now()) <= 0) {
-            throw new FalconException("End time "
-                    + SchemaHelper.formatDateUTC(endTime)
-                    + " can't be in the past");
+            throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
         }
 
-        BundleJob bundle = getBundleInfo(cluster, bundleId);
         // change coords
         for (CoordinatorJob coord : bundle.getCoordinators()) {
             LOG.debug("Updating endtime of coord " + coord.getId() + " to "
-                    + SchemaHelper.formatDateUTC(endTime) + " on cluster "
-                    + cluster);
+                    + SchemaHelper.formatDateUTC(endTime) + " on cluster " + cluster);
             Date lastActionTime = getCoordLastActionTime(coord);
             if (lastActionTime == null) { // nothing is materialized
-                LOG.info("Nothing is materialized for this coord: "
-                        + coord.getId());
+                LOG.info("Nothing is materialized for this coord: " + coord.getId());
                 if (endTime.compareTo(coord.getStartTime()) <= 0) {
-                    LOG.info("Setting end time to START TIME "
-                            + SchemaHelper.formatDateUTC(coord.getStartTime()));
-                    change(cluster, coord.getId(), concurrency,
-                            coord.getStartTime(), null);
+                    LOG.info("Setting end time to START TIME " + SchemaHelper.formatDateUTC(coord.getStartTime()));
+                    change(cluster, coord.getId(), concurrency, coord.getStartTime(), null);
                 } else {
-                    LOG.info("Setting end time to START TIME "
-                            + SchemaHelper.formatDateUTC(endTime));
+                    LOG.info("Setting end time to START TIME " + SchemaHelper.formatDateUTC(endTime));
                     change(cluster, coord.getId(), concurrency, endTime, null);
                 }
             } else {
-                LOG.info("Actions have materialized for this coord: "
-                        + coord.getId() + ", last action "
+                LOG.info("Actions have materialized for this coord: " + coord.getId() + ", last action "
                         + SchemaHelper.formatDateUTC(lastActionTime));
                 if (!endTime.after(lastActionTime)) {
                     Date pauseTime = offsetTime(endTime, -1);
                     // set pause time which deletes future actions
-                    LOG.info("Setting pause time on coord : " + coord.getId()
-                            + " to " + SchemaHelper.formatDateUTC(pauseTime));
-                    change(cluster, coord.getId(), concurrency, null,
-                            SchemaHelper.formatDateUTC(pauseTime));
+                    LOG.info("Setting pause time on coord : " + coord.getId() + " to "
+                            + SchemaHelper.formatDateUTC(pauseTime));
+                    change(cluster, coord.getId(), concurrency, null, SchemaHelper.formatDateUTC(pauseTime));
                 }
                 change(cluster, coord.getId(), concurrency, endTime, "");
             }
         }
     }
 
-    private void suspend(String cluster, BundleJob bundle)
-        throws FalconException {
-
-        bundle = getBundleInfo(cluster, bundle.getId());
+    private void suspendCoords(String cluster, BundleJob bundle) throws FalconException {
         for (CoordinatorJob coord : bundle.getCoordinators()) {
             suspend(cluster, coord.getId());
         }
     }
 
-    private void resume(String cluster, BundleJob bundle) throws FalconException {
-        bundle = getBundleInfo(cluster, bundle.getId());
+    private void resumeCoords(String cluster, BundleJob bundle) throws FalconException {
         for (CoordinatorJob coord : bundle.getCoordinators()) {
             resume(cluster, coord.getId());
         }
     }
 
-    private void updateInternal(Entity oldEntity, Entity newEntity,
-                                String cluster, BundleJob bundle, boolean alreadyCreated)
-        throws FalconException {
-
-        OozieWorkflowBuilder<Entity> builder = (OozieWorkflowBuilder<Entity>) WorkflowBuilder
-                .getBuilder(ENGINE, oldEntity);
+    private void updateInternal(Entity oldEntity, Entity newEntity, String cluster, BundleJob oldBundle,
+                                boolean alreadyCreated) throws FalconException {
+        OozieWorkflowBuilder<Entity> builder =
+                (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
 
         // Change end time of coords and schedule new bundle
-        Job.Status oldBundleStatus = bundle.getStatus();
-        suspend(cluster, bundle);
+        Job.Status oldBundleStatus = oldBundle.getStatus();
+        suspendCoords(cluster, oldBundle);
 
-        BundleJob newBundle = findBundle(newEntity, cluster);
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        Path stagingPath = EntityUtil.getLatestStagingPath(clusterEntity, oldEntity);
+        BundleJob newBundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
         Date endTime;
-        if (newBundle == MISSING || !alreadyCreated) { // new entity is not
-            // scheduled yet
+        if (oldBundle.getAppPath().endsWith(stagingPath.toUri().getPath()) || newBundle == null || !alreadyCreated) {
+            // new entity is not scheduled yet, create new bundle
             LOG.info("New bundle hasn't been created yet. So will create one");
             endTime = offsetTime(now(), 3);
-            Date newStartTime = builder.getNextStartTime(newEntity, cluster,
-                    endTime);
-            scheduleForUpdate(newEntity, cluster, newStartTime, bundle.getUser());
-            LOG.info("New bundle scheduled successfully "
+            Date newStartTime = builder.getNextStartTime(newEntity, cluster, endTime);
+            newBundle =
+                    getBundleInfo(cluster, scheduleForUpdate(newEntity, cluster, newStartTime, oldBundle.getUser()));
+            LOG.info("New bundle " + newBundle.getId() + " scheduled successfully with start time "
                     + SchemaHelper.formatDateUTC(newStartTime));
         } else {
-            LOG.info("New bundle has already been created. Bundle Id: "
-                    + newBundle.getId() + ", Start: "
-                    + SchemaHelper.formatDateUTC(newBundle.getStartTime())
-                    + ", End: " + newBundle.getEndTime());
+            LOG.info("New bundle has already been created. Bundle Id: " + newBundle.getId() + ", Start: "
+                    + SchemaHelper.formatDateUTC(newBundle.getStartTime()) + ", End: " + newBundle.getEndTime());
             endTime = getMinStartTime(newBundle);
-            LOG.info("Will set old coord end time to "
-                    + SchemaHelper.formatDateUTC(endTime));
+            LOG.info("Will set old coord end time to " + SchemaHelper.formatDateUTC(endTime));
         }
         if (endTime != null) {
-            updateCoords(cluster, bundle.getId(),
-                    EntityUtil.getParallel(oldEntity), endTime);
+            //set endtime for old coords
+            updateCoords(cluster, oldBundle, EntityUtil.getParallel(oldEntity), endTime);
         }
 
-        if (oldBundleStatus != Job.Status.SUSPENDED
-                && oldBundleStatus != Job.Status.PREPSUSPENDED) {
-            resume(cluster, bundle);
+        if (oldBundleStatus != Job.Status.SUSPENDED && oldBundleStatus != Job.Status.PREPSUSPENDED) {
+            resumeCoords(cluster, oldBundle);
         }
+
+        //create _SUCCESS in staging path to mark update is complete(to handle roll-forward for updates)
+        commitStagingPath(cluster, newBundle.getAppPath());
     }
 
-    private void scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
+    private String scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
         throws FalconException {
-
-        WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE,
-                entity);
-        Properties bundleProps = builder.newWorkflowSchedule(entity, startDate,
-                cluster, user);
-        LOG.info("Scheduling " + entity.toShortString() + " on cluster "
-                + cluster + " with props " + bundleProps);
+        WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
+        Properties bundleProps = builder.newWorkflowSchedule(entity, startDate, cluster, user);
+        LOG.info("Scheduling " + entity.toShortString() + " on cluster " + cluster + " with props " + bundleProps);
         if (bundleProps != null) {
-            scheduleEntity(cluster, bundleProps, entity);
+            return scheduleEntity(cluster, bundleProps, entity);
         } else {
-            LOG.info("No new workflow to be scheduled for this "
-                    + entity.toShortString());
+            LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
+            return null;
         }
     }
 
@@ -1137,8 +1118,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private String scheduleEntity(String cluster, Properties props,
-                                  Entity entity) throws FalconException {
+    private String scheduleEntity(String cluster, Properties props, Entity entity) throws FalconException {
         for (WorkflowEngineActionListener listener : listeners) {
             listener.beforeSchedule(entity, cluster);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 7ec6cd1..d6f9df8 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -189,7 +189,6 @@ public abstract class AbstractEntityManager {
      * @return APIResult
      */
     public APIResult delete(HttpServletRequest request, String type, String entity, String colo) {
-
         checkColo(colo);
         try {
             EntityType entityType = EntityType.valueOf(type.toUpperCase());
@@ -230,26 +229,25 @@ public abstract class AbstractEntityManager {
             validate(newEntity);
 
             validateUpdate(oldEntity, newEntity);
-            if (!EntityUtil.equals(oldEntity, newEntity)) {
-                configStore.initiateUpdate(newEntity);
-                //Update in workflow engine
-                if (!DeploymentUtil.isPrism()) {
-                    Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
-                    Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity);
-                    newClusters.retainAll(oldClusters); //common clusters for update
-                    oldClusters.removeAll(newClusters); //deleted clusters
-
-                    for (String cluster : newClusters) {
-                        getWorkflowEngine().update(oldEntity, newEntity, cluster);
-                    }
-                    for (String cluster : oldClusters) {
-                        getWorkflowEngine().delete(oldEntity, cluster);
-                    }
-                }
+            configStore.initiateUpdate(newEntity);
 
-                configStore.update(entityType, newEntity);
+            //Update in workflow engine
+            if (!DeploymentUtil.isPrism()) {
+                Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
+                Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity);
+                newClusters.retainAll(oldClusters); //common clusters for update
+                oldClusters.removeAll(newClusters); //deleted clusters
+
+                for (String cluster : newClusters) {
+                    getWorkflowEngine().update(oldEntity, newEntity, cluster);
+                }
+                for (String cluster : oldClusters) {
+                    getWorkflowEngine().delete(oldEntity, cluster);
+                }
             }
 
+            configStore.update(entityType, newEntity);
+
             return new APIResult(APIResult.Status.SUCCEEDED, entityName + " updated successfully");
         } catch (Throwable e) {
             LOG.error("Updation failed", e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/prism/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/prism/src/main/resources/log4j.xml b/prism/src/main/resources/log4j.xml
index ac1d9e4..7ab9aef 100644
--- a/prism/src/main/resources/log4j.xml
+++ b/prism/src/main/resources/log4j.xml
@@ -45,15 +45,6 @@
         </layout>
     </appender>
 
-    <appender name="TRANSACTIONLOG" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/target/logs/prsim-tranlog.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %x %m%n"/>
-        </layout>
-    </appender>
-
     <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${user.dir}/target/logs/prism-metric.log"/>
         <param name="Append" value="true"/>
@@ -73,11 +64,6 @@
         <appender-ref ref="AUDIT"/>
     </logger>
 
-    <logger name="TRANSACTIONLOG">
-        <level value="info"/>
-        <appender-ref ref="TRANSACTIONLOG"/>
-    </logger>
-
     <logger name="METRIC">
         <level value="info"/>
         <appender-ref ref="METRIC"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 8749f07..b72e243 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -56,10 +56,8 @@ import org.apache.falcon.oozie.workflow.DELETE;
 import org.apache.falcon.oozie.workflow.PIG;
 import org.apache.falcon.oozie.workflow.PREPARE;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
+import org.apache.falcon.update.UpdateHelper;
+import org.apache.hadoop.fs.*;
 import org.apache.xerces.dom.ElementNSImpl;
 import org.w3c.dom.Document;
 
@@ -79,7 +77,6 @@ import java.util.Map;
  * This class maps the Falcon entities into Oozie artifacts.
  */
 public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
-
     private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
     private static final int THIRTY_MINUTES = 30 * 60 * 1000;
 
@@ -87,14 +84,89 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         super(entity);
     }
 
+    private void mkdir(FileSystem fs, Path path) throws FalconException {
+        try {
+            if (!fs.exists(path) && !fs.mkdirs(path)) {
+                throw new FalconException("mkdir failed for " + path);
+            }
+        } catch (IOException e) {
+            throw new FalconException("mkdir failed for " + path, e);
+        }
+    }
+
     @Override
     protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            FileSystem fs = ClusterHelper.getFileSystem(cluster);
+            Process process = getEntity();
+
+            //Copy user workflow and lib to staging dir
+            Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getPath()),
+                    new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
+            if (process.getWorkflow().getLib() != null && fs.exists(new Path(process.getWorkflow().getLib()))) {
+                checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getLib()),
+                        new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
+            }
+
+            writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy user workflow/lib", e);
+        }
+
         List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
         apps.add(createDefaultCoordinator(cluster, bundlePath));
 
         return apps;
     }
 
+    private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
+        try {
+            FSDataOutputStream stream = fs.create(path);
+            try {
+                for (Map.Entry<String, String> entry : checksums.entrySet()) {
+                    stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+                }
+            } finally {
+                stream.close();
+            }
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy user workflow/lib", e);
+        }
+    }
+
+    private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+            Process process = getEntity();
+            Path wfPath = new Path(process.getWorkflow().getPath());
+            if (fs.isFile(wfPath)) {
+                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName().toString());
+            } else {
+                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
+            }
+        } catch(IOException e) {
+            throw new FalconException("Failed to get workflow path", e);
+        }
+    }
+
+    private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+            Process process = getEntity();
+            if (process.getWorkflow().getLib() == null) {
+                return null;
+            }
+            Path libPath = new Path(process.getWorkflow().getLib());
+            if (fs.isFile(libPath)) {
+                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName().toString());
+            } else {
+                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
+            }
+        } catch(IOException e) {
+            throw new FalconException("Failed to get user lib path", e);
+        }
+    }
+
     /**
      * Creates default oozie coordinator.
      *
@@ -423,7 +495,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
     }
 
     protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
-                                  String wfName, Path wfPath) throws FalconException {
+                                  String wfName, Path parentWfPath) throws FalconException {
         WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
         wfApp.setName(wfName);
         try {
@@ -432,31 +504,32 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             throw new FalconException("Failed to add library extensions for the workflow", e);
         }
 
+        String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
         EngineType engineType = processWorkflow.getEngine();
         for (Object object : wfApp.getDecisionOrForkOrJoin()) {
             if (!(object instanceof ACTION)) {
                 continue;
             }
 
-            String storagePath = getStoragePath(getEntity().getWorkflow().getPath());
             ACTION action = (ACTION) object;
             String actionName = action.getName();
             if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
-                action.getSubWorkflow().setAppPath(storagePath);
+                action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
             } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
-                decoratePIGAction(cluster, process, processWorkflow, storagePath, action.getPig(), wfPath);
+                decoratePIGAction(cluster, process, processWorkflow, action.getPig(), parentWfPath);
             } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
-                decorateHiveAction(cluster, process, processWorkflow, storagePath, action, wfPath);
+                decorateHiveAction(cluster, process, processWorkflow, action, parentWfPath);
             }
         }
 
-        marshal(cluster, wfApp, wfPath);
+        //Create parent workflow
+        marshal(cluster, wfApp, parentWfPath);
     }
 
     private void decoratePIGAction(Cluster cluster, Process process,
-                                   Workflow processWorkflow, String storagePath,
-                                   PIG pigAction, Path wfPath) throws FalconException {
-        pigAction.setScript(storagePath);
+                                   Workflow processWorkflow, PIG pigAction, Path parentWfPath) throws FalconException {
+        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
+        pigAction.setScript("${nameNode}" + userWfPath.toString());
 
         addPrepareDeleteOutputPath(process, pigAction);
 
@@ -469,21 +542,22 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         Storage.TYPE storageType = getStorageType(cluster, process);
         if (Storage.TYPE.TABLE == storageType) {
             // adds hive-site.xml in pig classpath
-            setupHiveConfiguration(cluster, wfPath, ""); // DO NOT ADD PREFIX!!!
+            setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
             pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
         }
 
-        addArchiveForCustomJars(cluster, processWorkflow, pigAction.getArchive());
+        addArchiveForCustomJars(cluster, processWorkflow, pigAction.getArchive(),
+                getUserLibPath(cluster, parentWfPath.getParent()));
     }
 
-    private void decorateHiveAction(Cluster cluster, Process process,
-                                    Workflow processWorkflow, String storagePath,
-                                    ACTION wfAction, Path wfPath) throws FalconException {
+    private void decorateHiveAction(Cluster cluster, Process process, Workflow processWorkflow, ACTION wfAction,
+                                    Path parentWfPath) throws FalconException {
 
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
 
-        hiveAction.setScript(storagePath);
+        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
+        hiveAction.setScript("${nameNode}" + userWfPath.toString());
 
         addPrepareDeleteOutputPath(process, hiveAction);
 
@@ -493,9 +567,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
         propagateProcessProperties(hiveAction, process);
 
-        setupHiveConfiguration(cluster, wfPath, "falcon-");
+        setupHiveConfiguration(cluster, parentWfPath, "falcon-");
 
-        addArchiveForCustomJars(cluster, processWorkflow, hiveAction.getArchive());
+        addArchiveForCustomJars(cluster, processWorkflow, hiveAction.getArchive(),
+                getUserLibPath(cluster, parentWfPath.getParent()));
 
         marshalHiveAction(wfAction, actionJaxbElement);
     }
@@ -682,17 +757,15 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
     }
 
     private void addArchiveForCustomJars(Cluster cluster, Workflow processWorkflow,
-                                         List<String> archiveList) throws FalconException {
-        String processWorkflowLib = processWorkflow.getLib();
-        if (processWorkflowLib == null) {
+                                         List<String> archiveList, Path libPath) throws FalconException {
+        if (libPath == null) {
             return;
         }
 
-        Path libPath = new Path(processWorkflowLib);
         try {
             final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {  // File, not a Dir
-                archiveList.add(processWorkflowLib);
+                archiveList.add(libPath.toString());
                 return;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 1329733..4e5e8c6 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -22,7 +22,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.converter.OozieProcessMapper;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
@@ -109,7 +108,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         }
 
         Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
-        Path bundlePath = new Path(ClusterHelper.getLocation(cluster, "staging"), EntityUtil.getStagingPath(process));
+        Path bundlePath = EntityUtil.getNewStagingPath(cluster, process);
         Process processClone = (Process) process.copy();
         EntityUtil.setStartDate(processClone, clusterName, startDate);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 7d5f4d1..224397e 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -62,10 +62,7 @@ import javax.xml.bind.Unmarshaller;
 import javax.xml.transform.stream.StreamSource;
 import javax.xml.validation.Schema;
 import javax.xml.validation.SchemaFactory;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.InputStreamReader;
+import java.io.*;
 import java.net.URL;
 import java.util.Collections;
 import java.util.HashMap;
@@ -199,6 +196,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
         Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
 
+        prepare(process);
         WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
         testParentWorkflow(process, parentWorkflow);
 
@@ -208,7 +206,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         Assert.assertEquals("user-pig-job", pigActionNode.getName());
 
         final PIG pigAction = pigActionNode.getPig();
-        Assert.assertEquals("${nameNode}/apps/pig/id.pig", pigAction.getScript());
+        Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
         Assert.assertNotNull(pigAction.getPrepare());
         Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
         Assert.assertFalse(pigAction.getParam().isEmpty());
@@ -236,8 +234,9 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         ConfigurationStore.get().publish(EntityType.PROCESS, process);
 
         Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        prepare(process);
         OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/tmp/seetharam", EntityUtil.getStagingPath(process));
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
         mapper.map(cluster, bundlePath);
         assertTrue(fs.exists(bundlePath));
 
@@ -274,13 +273,20 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = mapper.unMarshalHiveAction(hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
 
-        Assert.assertEquals("${nameNode}/apps/hive/script.hql", hiveAction.getScript());
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
         Assert.assertNull(hiveAction.getPrepare());
         Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
         Assert.assertFalse(hiveAction.getParam().isEmpty());
         Assert.assertEquals(11, hiveAction.getParam().size());
     }
 
+    private void prepare(Process process) throws IOException {
+        Path wf = new Path(process.getWorkflow().getPath());
+        fs.mkdirs(wf.getParent());
+        fs.create(wf).close();
+    }
+
     @Test
     public void testProcessMapperForTableStorage() throws Exception {
         URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
@@ -297,7 +303,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
 
         Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
         OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/", EntityUtil.getStagingPath(process));
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
         mapper.map(cluster, bundlePath);
         assertTrue(fs.exists(bundlePath));
 
@@ -396,7 +402,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         throws Exception {
         Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
         OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/", EntityUtil.getStagingPath(process));
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
         mapper.map(cluster, bundlePath);
         assertTrue(fs.exists(bundlePath));
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2ded6291/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
index ba01c80..0074c1a 100644
--- a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 
 /**
  * A custom implementation of DistCp that overrides the behavior of CopyListing
- * interface to copy FileOutputCommitter.SUCCEEDED_FILE_NAME last so downstream apps
+ * interface to copy EntityUtil.SUCCEEDED_FILE_NAME last so downstream apps
  * depending on data availability will work correctly.
  */
 public class CustomReplicator extends DistCp {


Mime
View raw message