falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suh...@apache.org
Subject [2/2] git commit: FALCON-353 enable dry run feature of oozie for schedule and update. Contributed by Shwetha GS
Date Wed, 23 Jul 2014 09:03:31 GMT
FALCON-353 enable dry run feature of oozie for schedule and update. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/14476f9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/14476f9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/14476f9a

Branch: refs/heads/master
Commit: 14476f9ab30414cbddabb33bf67d5e4270952fda
Parents: 954cd95
Author: Suhas V <suhas.v@inmobi.com>
Authored: Wed Jul 23 14:33:14 2014 +0530
Committer: Suhas V <suhas.v@inmobi.com>
Committed: Wed Jul 23 14:33:14 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/falcon/entity/EntityUtil.java    |  54 ++-
 .../org/apache/falcon/update/UpdateHelper.java  |  49 ++-
 .../workflow/engine/AbstractWorkflowEngine.java |   4 +-
 .../entity/parser/ProcessEntityParserTest.java  |  19 +-
 .../apache/falcon/update/UpdateHelperTest.java  |  87 +++--
 .../resources/config/process/process-0.1.xml    |   2 +-
 .../apache/falcon/oozie/OozieBundleBuilder.java |  82 +++--
 .../falcon/oozie/OozieCoordinatorBuilder.java   |  16 +-
 .../apache/falcon/oozie/OozieEntityBuilder.java |   5 +-
 .../OozieOrchestrationWorkflowBuilder.java      |   6 +-
 .../falcon/oozie/feed/FeedBundleBuilder.java    |   2 +-
 .../feed/FeedReplicationCoordinatorBuilder.java |  17 +-
 .../feed/FeedReplicationWorkflowBuilder.java    |   7 +-
 .../feed/FeedRetentionCoordinatorBuilder.java   |   5 +-
 .../feed/FeedRetentionWorkflowBuilder.java      |   6 +-
 .../oozie/process/ProcessBundleBuilder.java     |  55 ++--
 .../ProcessExecutionCoordinatorBuilder.java     |   4 +-
 .../ProcessExecutionWorkflowBuilder.java        |   6 +-
 .../workflow/engine/OozieWorkflowEngine.java    | 330 ++++++++++---------
 .../feed/OozieFeedWorkflowBuilderTest.java      |  61 +---
 .../falcon/oozie/process/AbstractTestBase.java  |  45 ++-
 .../OozieProcessWorkflowBuilderTest.java        |  57 +---
 .../org/apache/falcon/FalconWebException.java   |  37 +--
 .../falcon/resource/AbstractEntityManager.java  |  22 +-
 .../falcon/resource/EntityManagerJerseyIT.java  |  81 ++---
 .../resource/EntityManagerJerseySmokeIT.java    |   6 +-
 .../org/apache/falcon/resource/TestContext.java |  31 +-
 28 files changed, 578 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 35eaf7e..5882329 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,9 @@ Trunk (Unreleased)
    FALCON-133 Upgrade to slf4j 1.7.5 and use SLF4J logger. (Jean-Baptiste Onofré
    via Shwetha GS)
 
+   FALCON-353 enable dry run feature of oozie for schedule 
+   and update (Shwetha GS via Suhas Vasu)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index a38e553..6f50829 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -41,7 +41,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.text.DateFormat;
@@ -53,6 +56,8 @@ import java.util.*;
  * Helper to get entity object.
  */
 public final class EntityUtil {
+    public static final Logger LOG = LoggerFactory.getLogger(EntityUtil.class);
+
     private static final long MINUTE_IN_MS = 60000L;
     private static final long HOUR_IN_MS = 3600000L;
     private static final long DAY_IN_MS = 86400000L;
@@ -553,7 +558,7 @@ public final class EntityUtil {
     }
 
     //Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml
-    //Each entity update creates a new staging path and creates staging path/_SUCCESS when update is complete
+    //Each entity update creates a new staging path
     //Base staging path is the base path for all staging dirs
     public static Path getBaseStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
         return new Path(ClusterHelper.getLocation(cluster, "staging"),
@@ -561,16 +566,22 @@ public final class EntityUtil {
     }
 
     //Creates new staging path for entity schedule/update
-    public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
-        return new Path(getBaseStagingPath(cluster, entity), String.valueOf(System.currentTimeMillis()));
+    //Staging path containd md5 of the cluster view of the entity. This is required to check if update is required
+    public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
+        throws FalconException {
+        Entity clusterView = getClusterView(entity, cluster.getName());
+        return new Path(getBaseStagingPath(cluster, entity),
+            md5(clusterView) + "_" + String.valueOf(System.currentTimeMillis()));
     }
 
-    private static Path getStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Path path)
+    //Returns all staging paths for the entity
+    public static FileStatus[] getAllStagingPaths(org.apache.falcon.entity.v0.cluster.Cluster cluster,
+        Entity entity)
         throws FalconException {
+        Path basePath = getBaseStagingPath(cluster, entity);
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
         try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
-            FileStatus latest = null;
-            FileStatus[] files = fs.globStatus(path, new PathFilter() {
+            FileStatus[] filesArray = fs.listStatus(basePath, new PathFilter() {
                 @Override
                 public boolean accept(Path path) {
                     if (path.getName().equals("logs")) {
@@ -579,33 +590,16 @@ public final class EntityUtil {
                     return true;
                 }
             });
-            if (files == null || files.length == 0) {
-                return null;
-            }
 
-            for (FileStatus file : files) {
-                if (latest == null || latest.getModificationTime() < file.getModificationTime()) {
-                    latest = file;
-                }
-            }
-            return latest.getPath();
+            return filesArray;
+
+        } catch (FileNotFoundException e) {
+            LOG.info("Staging path doesn't exist, entity is not scheduled", e);
+            //Staging path doesn't exist if entity is not scheduled
         } catch (IOException e) {
             throw new FalconException(e);
         }
-    }
-
-    //Returns latest staging path - includes incomplete staging paths as well (updates are not complete)
-    public static Path getLatestStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
-        throws FalconException {
-        return getStagingPath(cluster, new Path(getBaseStagingPath(cluster, entity), "*"));
-    }
-
-    //Returns latest staging path for which update is complete (latest that contains _SUCCESS)
-    public static Path getLastCommittedStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
-        throws FalconException {
-        Path stagingPath = getStagingPath(cluster, new Path(getBaseStagingPath(cluster, entity),
-                "*/" + SUCCEEDED_FILE_NAME));
-        return stagingPath == null ? null : stagingPath.getParent();
+        return null;
     }
 
     public static LateProcess getLateProcess(Entity entity)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 7af77d0..b6e2893 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -61,9 +63,17 @@ public final class UpdateHelper {
 
     private UpdateHelper() {}
 
-    public static boolean isEntityUpdated(Entity oldEntity, Entity newEntity, String cluster) throws FalconException {
+    public static boolean isEntityUpdated(Entity oldEntity, Entity newEntity, String cluster,
+        Path oldStagingPath) throws FalconException {
         Entity oldView = EntityUtil.getClusterView(oldEntity, cluster);
         Entity newView = EntityUtil.getClusterView(newEntity, cluster);
+
+        //staging path contains md5 of the cluster view of entity
+        String[] parts = oldStagingPath.getName().split("_");
+        if (parts[0].equals(EntityUtil.md5(newView))) {
+            return false;
+        }
+
         switch (oldEntity.getEntityType()) {
         case FEED:
             return !EntityUtil.equals(oldView, newView, FEED_FIELDS);
@@ -77,10 +87,10 @@ public final class UpdateHelper {
     }
 
     //Read checksum file
-    private static Map<String, String> readChecksums(FileSystem fs, Path path) throws FalconException {
+    private static Map<String, String> readChecksums(FileSystem fs, Path oldStagingPath) throws FalconException {
         try {
             Map<String, String> checksums = new HashMap<String, String>();
-            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
+            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(oldStagingPath)));
             try {
                 String line;
                 while ((line = reader.readLine()) != null) {
@@ -97,25 +107,24 @@ public final class UpdateHelper {
     }
 
     //Checks if the user workflow or lib is updated
-    public static boolean isWorkflowUpdated(String cluster, Entity entity) throws FalconException {
+    public static boolean isWorkflowUpdated(String cluster, Entity entity, Path bundleAppPath) throws FalconException {
         if (entity.getEntityType() != EntityType.PROCESS) {
             return false;
         }
 
         try {
-            Process process = (Process) entity;
-            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
-                    ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-            Path bundlePath = EntityUtil.getLastCommittedStagingPath(clusterEntity, process);
-            if (bundlePath == null) {
+            if (bundleAppPath == null) {
                 return true;
             }
 
-            Path checksum = new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE);
+            Process process = (Process) entity;
+            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+            Path checksum = new Path(bundleAppPath, EntityUtil.PROCESS_CHECKSUM_FILE);
             Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             if (!fs.exists(checksum)) {
-                //Update if there is no checksum file(for migration)
+                //Update if there is no checksum file(for backward compatibility)
                 return true;
             }
             Map<String, String> checksums = readChecksums(fs, checksum);
@@ -184,8 +193,15 @@ public final class UpdateHelper {
             Process affectedProcess = (Process) affectedEntity;
 
             //check if affectedProcess is defined for this cluster
-            if (ProcessHelper.getCluster(affectedProcess, cluster) == null) {
-                LOG.debug("Process {} is not defined for cluster {}", affectedProcess.getName(), cluster);
+            Cluster processCluster = ProcessHelper.getCluster(affectedProcess, cluster);
+            if (processCluster == null) {
+                LOG.debug("Process {} is not defined for cluster {}. Skipping", affectedProcess.getName(), cluster);
+                return false;
+            }
+
+            if (processCluster.getValidity().getEnd().before(new Date())) {
+                LOG.debug("Process {} validity {} is in the past. Skipping...", affectedProcess.getName(),
+                    processCluster.getValidity().getEnd());
                 return false;
             }
 
@@ -199,6 +215,13 @@ public final class UpdateHelper {
                 return true;
             }
 
+            org.apache.falcon.entity.v0.feed.Cluster oldFeedCluster = FeedHelper.getCluster(oldFeed, cluster);
+            org.apache.falcon.entity.v0.feed.Cluster newFeedCluster = FeedHelper.getCluster(newFeed, cluster);
+            if (!oldFeedCluster.getValidity().getStart().equals(newFeedCluster.getValidity().getStart())) {
+                LOG.debug("{}: Start time for cluster {} has changed. Updating...", oldFeed.toShortString(), cluster);
+                return true;
+            }
+
             Storage oldFeedStorage = FeedHelper.createStorage(cluster, oldFeed);
             Storage newFeedStorage = FeedHelper.createStorage(cluster, newFeed);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index eedd81f..ae158f7 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -61,6 +61,8 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract void reRun(String cluster, String wfId, Properties props) throws FalconException;
 
+    public abstract void dryRun(Entity entity, String clusterName) throws FalconException;
+
     public abstract boolean isActive(Entity entity) throws FalconException;
 
     public abstract boolean isSuspended(Entity entity) throws FalconException;
@@ -86,7 +88,7 @@ public abstract class AbstractWorkflowEngine {
     public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
                                                       List<LifeCycle> lifeCycles) throws FalconException;
 
-    public abstract Date update(Entity oldEntity, Entity newEntity, String cluster, Date effectiveTime)
+    public abstract String update(Entity oldEntity, Entity newEntity, String cluster, Date effectiveTime)
         throws FalconException;
 
     public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 520541a..cd6c713 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.falcon.entity.parser;
 
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
@@ -44,6 +35,14 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Tests for validating process entity parser.
  */
@@ -109,7 +108,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
 
         Cluster processCluster = process.getClusters().getClusters().get(0);
         Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()), "2011-11-02T00:00Z");
-        Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()), "2011-12-30T00:00Z");
+        Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()), "2091-12-30T00:00Z");
         Assert.assertEquals(process.getTimezone().getID(), "UTC");
 
         Assert.assertEquals(process.getWorkflow().getEngine().name().toLowerCase(), "oozie");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index e532b24..6366aca 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -57,10 +57,9 @@ import java.io.InputStream;
  * Test for Update helper methods.
  */
 public class UpdateHelperTest extends AbstractTestBase {
-    private final FeedEntityParser parser = (FeedEntityParser)
-            EntityParserFactory.getParser(EntityType.FEED);
-    private final ProcessEntityParser processParser = (ProcessEntityParser)
-            EntityParserFactory.getParser(EntityType.PROCESS);
+    private final FeedEntityParser parser = (FeedEntityParser)EntityParserFactory.getParser(EntityType.FEED);
+    private final ProcessEntityParser processParser =
+        (ProcessEntityParser)EntityParserFactory.getParser(EntityType.PROCESS);
 
     @BeforeClass
     public void init() throws Exception {
@@ -115,7 +114,7 @@ public class UpdateHelperTest extends AbstractTestBase {
     }
 
     @Test
-    public void testWorkflowUpdate() throws IOException, FalconException {
+    public void testIsWorkflowUpdated() throws IOException, FalconException {
         FileSystem fs = dfsCluster.getFileSystem();
         Process process = processParser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML));
         String cluster = "testCluster";
@@ -123,14 +122,13 @@ public class UpdateHelperTest extends AbstractTestBase {
         Path staging = EntityUtil.getNewStagingPath(clusterEntity, process);
         fs.mkdirs(staging);
         fs.create(new Path(staging, "workflow.xml")).close();
-        fs.create(new Path(staging, EntityUtil.SUCCEEDED_FILE_NAME)).close();
 
         //Update if there is no checksum file
-        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process));
+        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
 
         //No update if there is no new file
         fs.create(new Path(staging, "checksums")).close();
-        Assert.assertFalse(UpdateHelper.isWorkflowUpdated(cluster, process));
+        Assert.assertFalse(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
 
         //Update if there is new lib
         Path libpath = new Path("/falcon/test/lib");
@@ -138,7 +136,7 @@ public class UpdateHelperTest extends AbstractTestBase {
         fs.mkdirs(libpath);
         Path lib = new Path(libpath, "new.jar");
         fs.create(lib).close();
-        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process));
+        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
 
         //Don't Update if the lib is not updated
         fs.delete(new Path(staging, "checksums"), true);
@@ -146,52 +144,68 @@ public class UpdateHelperTest extends AbstractTestBase {
         stream.write((dfsCluster.getConf().get("fs.default.name") + lib.toString() + "="
                 + fs.getFileChecksum(lib).toString() + "\n").getBytes());
         stream.close();
-        Assert.assertFalse(UpdateHelper.isWorkflowUpdated(cluster, process));
+        Assert.assertFalse(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
 
         //Update if the lib is updated
         fs.delete(lib, true);
         stream = fs.create(lib);
         stream.writeChars("some new jar");
         stream.close();
-        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process));
+        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
 
         //Update if the lib is deleted
         fs.delete(lib, true);
-        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process));
+        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
     }
 
     @Test
-    public void testShouldUpdateProcess() throws Exception {
-        Feed oldFeed = parser.parseAndValidate(this.getClass()
-                .getResourceAsStream(FEED_XML));
+    public void testIsEntityUpdated() throws Exception {
+        Feed oldFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
         String cluster = "testCluster";
         Feed newFeed = (Feed) oldFeed.copy();
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster));
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+
+        Path feedPath = EntityUtil.getNewStagingPath(clusterEntity, oldFeed);
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
 
         newFeed.setGroups("newgroups");
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
         newFeed.getLateArrival().setCutOff(Frequency.fromString("hours(8)"));
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
         newFeed.setFrequency(Frequency.fromString("days(1)"));
-        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
 
-        Process oldProcess = processParser.parseAndValidate(this.getClass().
-                getResourceAsStream(PROCESS_XML));
+        Process oldProcess = processParser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML));
         prepare(oldProcess);
         Process newProcess = (Process) oldProcess.copy();
+        Path procPath = EntityUtil.getNewStagingPath(clusterEntity, oldProcess);
 
         newProcess.getRetry().setPolicy(PolicyType.FINAL);
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
         newProcess.getLateProcess().getLateInputs().remove(1);
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
         newProcess.getLateProcess().setPolicy(PolicyType.PERIODIC);
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
+        newProcess.setFrequency(Frequency.fromString("days(1)"));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
+
+        //Adding new cluster shouldn't cause update in the old cluster
+        newProcess = (Process) oldProcess.copy();
+        org.apache.falcon.entity.v0.process.Cluster procCluster = new org.apache.falcon.entity.v0.process.Cluster();
+        procCluster.setName("newcluster");
+        procCluster.setValidity(newProcess.getClusters().getClusters().get(0).getValidity());
+        newProcess.getClusters().getClusters().add(procCluster);
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
+
+        //In the case of incomplete update, where new entity is scheduled but still not updated in config store,
+        //another update call shouldn't cause update in workflow engine
         newProcess.setFrequency(Frequency.fromString("days(1)"));
-        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
+        procPath = EntityUtil.getNewStagingPath(clusterEntity, newProcess);
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
     }
 
     @Test
-    public void testShouldUpdateFeed() throws Exception {
+    public void testShouldUpdateAffectedEntities() throws Exception {
         Feed oldFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
 
         Feed newFeed = (Feed) oldFeed.copy();
@@ -229,9 +243,10 @@ public class UpdateHelperTest extends AbstractTestBase {
         newFeed.getProperties().getProperties().remove(0);
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
+        //Change in start time should trigger process update as instance time changes
         FeedHelper.getCluster(newFeed, process.getClusters().getClusters().get(0).getName()).getValidity().setStart(
                 SchemaHelper.parseDateUTC("2012-11-01T00:00Z"));
-        Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
+        Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process, cluster));
 
         FeedHelper.getCluster(newFeed, process.getClusters().getClusters().get(0).getName()).getValidity().
                 setStart(FeedHelper.getCluster(oldFeed,
@@ -246,39 +261,41 @@ public class UpdateHelperTest extends AbstractTestBase {
     }
 
     @Test
-    public void testShouldUpdateTable() throws Exception {
+    public void testIsEntityUpdatedTable() throws Exception {
         InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
         Feed oldTableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream);
         getStore().publish(EntityType.FEED, oldTableFeed);
 
         String cluster = "testCluster";
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        Path feedPath = EntityUtil.getNewStagingPath(clusterEntity, oldTableFeed);
         Feed newTableFeed = (Feed) oldTableFeed.copy();
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster, feedPath));
 
         newTableFeed.setGroups("newgroups");
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster, feedPath));
         newTableFeed.setFrequency(Frequency.fromString("days(1)"));
-        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster, feedPath));
 
         final CatalogTable table = new CatalogTable();
         table.setUri("catalog:default:clicks-blah#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}");
         newTableFeed.setTable(table);
-        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldTableFeed, newTableFeed, cluster, feedPath));
 
         inputStream = getClass().getResourceAsStream("/config/process/process-table.xml");
         Process oldProcess = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(inputStream);
         FileSystem fs = dfsCluster.getFileSystem();
-        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, "testCluster");
         Path staging = EntityUtil.getNewStagingPath(clusterEntity, oldProcess);
         fs.mkdirs(staging);
         fs.create(new Path(staging, "workflow.xml")).close();
         fs.create(new Path(staging, "checksums")).close();
         Process newProcess = (Process) oldProcess.copy();
+        Path procPath = EntityUtil.getNewStagingPath(clusterEntity, oldProcess);
 
         newProcess.getRetry().setPolicy(PolicyType.FINAL);
-        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
         newProcess.setFrequency(Frequency.fromString("days(1)"));
-        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster));
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
     }
 
     private static Location getLocation(Feed feed, LocationType type, String cluster) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/common/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml
index 0aa249b..6e27577 100644
--- a/common/src/test/resources/config/process/process-0.1.xml
+++ b/common/src/test/resources/config/process/process-0.1.xml
@@ -19,7 +19,7 @@
 <process name="sample" xmlns="uri:falcon:process:0.1">
     <clusters>
         <cluster name="testCluster">
-            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+            <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>
         </cluster>
     </clusters>
     <parallel>1</parallel>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 2018db2..62d95fa 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.oozie;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -25,10 +26,12 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.CONFIGURATION;
+import org.apache.falcon.oozie.bundle.CONFIGURATION.Property;
 import org.apache.falcon.oozie.bundle.COORDINATOR;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.OozieUtils;
-import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -37,8 +40,14 @@ import org.apache.oozie.client.OozieClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 /**
@@ -59,7 +68,7 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
             return null;
         }
 
-        List<Properties> coords = doBuild(cluster, buildPath);
+        List<Properties> coords = buildCoords(cluster, buildPath);
         if (coords == null || coords.isEmpty()) {
             return null;
         }
@@ -76,33 +85,37 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
             String coordPath = coordProps.getProperty(OozieEntityBuilder.ENTITY_PATH);
             coord.setName(coordProps.getProperty(OozieEntityBuilder.ENTITY_NAME));
             coord.setAppPath(getStoragePath(coordPath));
+            Properties appProps = createAppProperties(cluster, buildPath);
+            appProps.putAll(coordProps);
+            coord.setConfiguration(getConfig(appProps));
             bundle.getCoordinator().add(coord);
         }
 
-        marshal(cluster, bundle, buildPath); // write the bundle
-        Properties properties = createAppProperties(cluster, buildPath);
-
-        //Add libpath
-        Path libPath = getLibPath(cluster, buildPath);
-        if (libPath != null) {
-            properties.put(OozieClient.LIBPATH, getStoragePath(libPath));
-        }
+        Path marshalPath = marshal(cluster, bundle, buildPath); // write the bundle
+        Properties properties = getProperties(marshalPath, entity.getName());
+        properties.setProperty(OozieClient.BUNDLE_APP_PATH, getStoragePath(buildPath));
+        properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
 
-        properties.putAll(getAdditionalProperties(cluster));
         return properties;
     }
 
-    protected Properties getAdditionalProperties(Cluster cluster) throws FalconException {
-        return new Properties();
-    }
-
     protected abstract Path getLibPath(Cluster cluster, Path buildPath) throws FalconException;
 
+    protected CONFIGURATION getConfig(Properties props) {
+        CONFIGURATION conf = new CONFIGURATION();
+        for (Entry<Object, Object> prop : props.entrySet()) {
+            Property confProp = new Property();
+            confProp.setName((String) prop.getKey());
+            confProp.setValue((String) prop.getValue());
+            conf.getProperty().add(confProp);
+        }
+        return conf;
+    }
+
     protected Properties createAppProperties(Cluster cluster, Path buildPath) throws FalconException {
         Properties properties = getEntityProperties(cluster);
-        properties.setProperty(OozieWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
-        properties.setProperty(OozieWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
-        properties.setProperty(OozieClient.BUNDLE_APP_PATH, getStoragePath(buildPath));
+        properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
+        properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
         properties.setProperty("colo.name", cluster.getColo());
 
         properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
@@ -113,7 +126,12 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
             properties.putAll(getHiveCredentials(cluster));
         }
 
-        LOG.info("Cluster: {}, PROPS: {}", cluster.getName(), properties);
+        //Add libpath
+        Path libPath = getLibPath(cluster, buildPath);
+        if (libPath != null) {
+            properties.put(OozieClient.LIBPATH, getStoragePath(libPath));
+        }
+
         return properties;
     }
 
@@ -134,10 +152,30 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         }
     }
 
-    protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
-        marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
+    protected Path marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
+        return marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
             OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
     }
 
-    protected abstract List<Properties> doBuild(Cluster cluster, Path bundlePath) throws FalconException;
+    //Used by coordinator builders to return multiple coords
+    //TODO Can avoid separate interface that returns list by building at lifecycle level
+    protected abstract List<Properties> buildCoords(Cluster cluster, Path bundlePath) throws FalconException;
+
+    public static BUNDLEAPP unmarshal(Cluster cluster, Path path) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            FileSystem fs =
+                HadoopClientFactory.get().createFileSystem(path.toUri(), ClusterHelper.getConfiguration(cluster));
+            Unmarshaller unmarshaller = OozieUtils.BUNDLE_JAXB_CONTEXT.createUnmarshaller();
+            @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement = (JAXBElement<BUNDLEAPP>)
+                unmarshaller.unmarshal(new StreamSource(fs.open(path)), BUNDLEAPP.class);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException(e);
+        } catch (IOException e) {
+            throw new FalconException(e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 5a29683..5f483f0 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -28,6 +28,8 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.ObjectFactory;
 import org.apache.falcon.oozie.feed.FeedReplicationCoordinatorBuilder;
@@ -97,8 +99,8 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
         return EntityUtil.getWorkflowName(lifecycle, entity).toString();
     }
 
-    protected void marshal(Cluster cluster, COORDINATORAPP coord, Path outPath) throws FalconException {
-        marshal(cluster, new ObjectFactory().createCoordinatorApp(coord),
+    protected Path marshal(Cluster cluster, COORDINATORAPP coord, Path outPath) throws FalconException {
+        return marshal(cluster, new ObjectFactory().createCoordinatorApp(coord),
             OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
     }
 
@@ -144,12 +146,10 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
         return props;
     }
 
-    protected org.apache.falcon.oozie.coordinator.CONFIGURATION getConfig(Properties props) {
-        org.apache.falcon.oozie.coordinator.CONFIGURATION conf
-            = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
+    protected CONFIGURATION getConfig(Properties props) {
+        CONFIGURATION conf = new CONFIGURATION();
         for (Entry<Object, Object> prop : props.entrySet()) {
-            org.apache.falcon.oozie.coordinator.CONFIGURATION.Property confProp
-                = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
+            Property confProp = new Property();
             confProp.setName((String) prop.getKey());
             confProp.setValue((String) prop.getValue());
             conf.getProperty().add(confProp);
@@ -163,7 +163,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
 
     public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException;
 
-    protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
+    protected COORDINATORAPP unmarshal(String template) throws FalconException {
         InputStream resourceAsStream = null;
         try {
             resourceAsStream = OozieCoordinatorBuilder.class.getResourceAsStream(template);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index bb8dfcc..1238b82 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -125,7 +125,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
     }
 
-    protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
+    protected Path marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
         throws FalconException {
         try {
             Marshaller marshaller = jaxbContext.createMarshaller();
@@ -146,6 +146,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
             }
 
             LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
+            return outPath;
         } catch (Exception e) {
             throw new FalconException("Unable to marshall app object", e);
         }
@@ -299,7 +300,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         }
 
         Properties prop = new Properties();
-        prop.setProperty(OozieEntityBuilder.ENTITY_PATH, getStoragePath(path));
+        prop.setProperty(OozieEntityBuilder.ENTITY_PATH, path.toString());
         prop.setProperty(OozieEntityBuilder.ENTITY_NAME, name);
         return prop;
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index ac78297..2ef162b 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -110,12 +110,12 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + lifecycle);
     }
 
-    protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
-        marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+    protected Path marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+        return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
             OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
     }
 
-    protected WORKFLOWAPP getWorkflow(String template) throws FalconException {
+    protected WORKFLOWAPP unmarshal(String template) throws FalconException {
         InputStream resourceAsStream = null;
         try {
             resourceAsStream = OozieOrchestrationWorkflowBuilder.class.getResourceAsStream(template);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index 6917f4e..08a4e5c 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -42,7 +42,7 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
         return new Path(buildPath, "lib");
     }
 
-    @Override protected List<Properties> doBuild(Cluster cluster, Path buildPath) throws FalconException {
+    @Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
         List<Properties> props = new ArrayList<Properties>();
         List<Properties> evictionProps =
             OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 3226cf2..0b582ef 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -78,7 +78,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
         if (feedCluster.getType() == ClusterType.TARGET) {
             List<Properties> props = new ArrayList<Properties>();
-            OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, Tag.REPLICATION);
             for (org.apache.falcon.entity.v0.feed.Cluster srcFeedCluster : entity.getClusters().getClusters()) {
 
                 if (srcFeedCluster.getType() == ClusterType.SOURCE) {
@@ -86,9 +85,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
                     // workflow is serialized to a specific dir
                     Path coordPath = new Path(buildPath, Tag.REPLICATION.name() + "/" + srcCluster.getName());
 
-                    // Different workflow for each source since hive credentials vary for each cluster
-                    builder.build(cluster, coordPath);
-
                     props.add(doBuild(srcCluster, cluster, coordPath));
                 }
             }
@@ -98,6 +94,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     }
 
     private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
+
+        // Different workflow for each source since hive credentials vary for each cluster
+        OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, Tag.REPLICATION);
+        builder.build(trgCluster, buildPath);
+
         long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster);
         Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis);
         Date sourceEndDate = getEndDate(srcCluster);
@@ -112,7 +113,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
             return null;
         }
 
-        COORDINATORAPP coord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
+        COORDINATORAPP coord = unmarshal(REPLICATION_COORD_TEMPLATE);
 
         String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()),
             entity).toString();
@@ -134,8 +135,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
             srcCluster, trgCluster, buildPath, coordName, sourceStorage, targetStorage);
         coord.setAction(replicationWorkflowAction);
 
-        marshal(trgCluster, coord, buildPath);
-        return getProperties(buildPath, coordName);
+        Path marshalPath = marshal(trgCluster, coord, buildPath);
+        return getProperties(marshalPath, coordName);
     }
 
     private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath,
@@ -143,7 +144,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         ACTION action = new ACTION();
         WORKFLOW workflow = new WORKFLOW();
 
-        workflow.setAppPath(getStoragePath(buildPath.toString()));
+        workflow.setAppPath(getStoragePath(new Path(buildPath, "workflow.xml")));
         Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
         props.put("srcClusterName", srcCluster.getName());
         props.put("srcClusterColo", srcCluster.getColo());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index 00fab99..2537725 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -44,7 +44,7 @@ public class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBu
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
-        WORKFLOWAPP workflow = getWorkflow(REPLICATION_WF_TEMPLATE);
+        WORKFLOWAPP workflow = unmarshal(REPLICATION_WF_TEMPLATE);
         Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName());
         String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
         workflow.setName(wfName);
@@ -57,9 +57,8 @@ public class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBu
             setupHiveCredentials(cluster, srcCluster, workflow);
         }
 
-        marshal(cluster, workflow, buildPath);
-
-        return getProperties(buildPath, wfName);
+        Path marshalPath = marshal(cluster, workflow, buildPath);
+        return getProperties(marshalPath, wfName);
     }
 
     private void setupHiveCredentials(Cluster targetCluster, Cluster sourceCluster, WORKFLOWAPP workflowApp) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index 4393c94..ac38532 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -103,8 +103,7 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
 
         coord.setAction(action);
 
-        marshal(cluster, coord, coordPath);
-
-        return Arrays.asList(getProperties(coordPath, coordName));
+        Path marshalPath = marshal(cluster, coord, coordPath);
+        return Arrays.asList(getProperties(marshalPath, coordName));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index 4a7f96b..eee4fe9 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -40,7 +40,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
-        WORKFLOWAPP workflow = getWorkflow(RETENTION_WF_TEMPLATE);
+        WORKFLOWAPP workflow = unmarshal(RETENTION_WF_TEMPLATE);
         String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
         workflow.setName(wfName);
         addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
@@ -50,8 +50,8 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
             setupHiveCredentials(cluster, buildPath, workflow);
         }
 
-        marshal(cluster, workflow, buildPath);
-        return getProperties(buildPath, wfName);
+        Path marshalPath = marshal(cluster, workflow, buildPath);
+        return getProperties(marshalPath, wfName);
     }
 
     private void setupHiveCredentials(Cluster cluster, Path wfPath,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index 86cea93..d0b75fc 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -55,39 +55,38 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
         super(entity);
     }
 
-    @Override protected Properties getAdditionalProperties(Cluster cluster) throws FalconException {
+    private Properties getAdditionalProperties(Cluster cluster) throws FalconException {
         Properties properties = new Properties();
+
+        //Properties for optional inputs
         if (entity.getInputs() != null) {
             for (Input in : entity.getInputs().getInputs()) {
                 if (in.isOptional()) {
-                    properties.putAll(getOptionalInputProperties(in, cluster.getName()));
+                    Feed feed = EntityUtil.getEntity(EntityType.FEED, in.getFeed());
+                    org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                        FeedHelper.getCluster(feed, cluster.getName());
+                    String inName = in.getName();
+                    properties.put(inName + ".frequency", String.valueOf(feed.getFrequency().getFrequency()));
+                    properties.put(inName + ".freq_timeunit",
+                        mapToCoordTimeUnit(feed.getFrequency().getTimeUnit()).name());
+                    properties.put(inName + ".timezone", feed.getTimezone().getID());
+                    properties.put(inName + ".end_of_duration", Timeunit.NONE.name());
+                    properties.put(inName + ".initial-instance",
+                        SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+                    properties.put(inName + ".done-flag", "notused");
+
+                    String locPath = FeedHelper.createStorage(cluster.getName(), feed)
+                        .getUriTemplate(LocationType.DATA).replace('$', '%');
+                    properties.put(inName + ".uri-template", locPath);
+
+                    properties.put(inName + ".start-instance", in.getStart());
+                    properties.put(inName + ".end-instance", in.getEnd());
                 }
             }
         }
         return  properties;
     }
 
-    private Properties getOptionalInputProperties(Input in, String clusterName) throws FalconException {
-        Properties properties = new Properties();
-        Feed feed = EntityUtil.getEntity(EntityType.FEED, in.getFeed());
-        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
-        String inName = in.getName();
-        properties.put(inName + ".frequency", String.valueOf(feed.getFrequency().getFrequency()));
-        properties.put(inName + ".freq_timeunit", mapToCoordTimeUnit(feed.getFrequency().getTimeUnit()).name());
-        properties.put(inName + ".timezone", feed.getTimezone().getID());
-        properties.put(inName + ".end_of_duration", Timeunit.NONE.name());
-        properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
-        properties.put(inName + ".done-flag", "notused");
-
-        String locPath = FeedHelper.createStorage(clusterName, feed)
-            .getUriTemplate(LocationType.DATA).replace('$', '%');
-        properties.put(inName + ".uri-template", locPath);
-
-        properties.put(inName + ".start-instance", in.getStart());
-        properties.put(inName + ".end-instance", in.getEnd());
-        return  properties;
-    }
-
     private Timeunit mapToCoordTimeUnit(TimeUnit tu) {
         switch (tu) {
         case days:
@@ -111,10 +110,16 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
         return ProcessHelper.getUserLibPath(entity, cluster, buildPath);
     }
 
-    @Override protected List<Properties> doBuild(Cluster cluster, Path buildPath) throws FalconException {
+    @Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
         copyUserWorkflow(cluster, buildPath);
 
-        return OozieCoordinatorBuilder.get(entity, Tag.DEFAULT).buildCoords(cluster, buildPath);
+        List<Properties> props = OozieCoordinatorBuilder.get(entity, Tag.DEFAULT).buildCoords(cluster, buildPath);
+        if (props != null) {
+            assert props.size() == 1 : "Process should have only 1 coord";
+            props.get(0).putAll(getAdditionalProperties(cluster));
+        }
+
+        return props;
     }
 
     private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index c87bc86..e46ae6e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -100,8 +100,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         action.setWorkflow(wf);
         coord.setAction(action);
 
-        marshal(cluster, coord, coordPath);
-        return Arrays.asList(getProperties(coordPath, coordName));
+        Path marshalPath = marshal(cluster, coord, coordPath);
+        return Arrays.asList(getProperties(marshalPath, coordName));
     }
 
     private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/14476f9a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 0d9abdb..2e3a5c1 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -62,7 +62,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
-        WORKFLOWAPP wfApp = getWorkflow(DEFAULT_WF_TEMPLATE);
+        WORKFLOWAPP wfApp = unmarshal(DEFAULT_WF_TEMPLATE);
         String wfName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
         wfApp.setName(wfName);
 
@@ -92,8 +92,8 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         }
 
         //Create parent workflow
-        marshal(cluster, wfApp, buildPath);
-        return getProperties(buildPath, wfName);
+        Path marshalPath = marshal(cluster, wfApp, buildPath);
+        return getProperties(marshalPath, wfName);
     }
 
     protected abstract void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException;


Mime
View raw message