falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1170 Falcon Native Scheduler - Refactor existing workflow/coord/bundle builder. Contributed by Pallavi Rao
Date Mon, 13 Jul 2015 09:02:08 GMT
Repository: falcon
Updated Branches:
  refs/heads/master b36a82394 -> 9f69ae271


FALCON-1170 Falcon Native Scheduler - Refactor existing workflow/coord/bundle builder. Contributed by Pallavi Rao


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9f69ae27
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9f69ae27
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9f69ae27

Branch: refs/heads/master
Commit: 9f69ae27159436721c3fa1fdc401bb0de0cdca80
Parents: b36a823
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Mon Jul 13 14:13:38 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Mon Jul 13 14:27:03 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/falcon/oozie/OozieBundleBuilder.java |  38 +------
 .../falcon/oozie/OozieCoordinatorBuilder.java   |  70 ++----------
 .../apache/falcon/oozie/OozieEntityBuilder.java |  32 ++++--
 .../OozieOrchestrationWorkflowBuilder.java      | 108 +++++++++++++++++--
 .../feed/FSReplicationWorkflowBuilder.java      |  12 +++
 .../feed/FeedReplicationCoordinatorBuilder.java |  47 ++------
 .../feed/FeedReplicationWorkflowBuilder.java    |  44 +++++++-
 .../feed/FeedRetentionCoordinatorBuilder.java   |  37 ++-----
 .../feed/FeedRetentionWorkflowBuilder.java      |  42 +++++++-
 .../feed/HCatReplicationWorkflowBuilder.java    |   8 ++
 .../ProcessExecutionCoordinatorBuilder.java     |  23 +---
 .../ProcessExecutionWorkflowBuilder.java        |  31 +++++-
 .../java/org/apache/falcon/util/OozieUtils.java |   6 +-
 .../feed/OozieFeedWorkflowBuilderTest.java      |  64 ++++++-----
 .../falcon/oozie/process/AbstractTestBase.java  |  55 +++++++---
 .../OozieProcessWorkflowBuilderTest.java        |  54 +++++-----
 17 files changed, 403 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e844a60..88fdfdd 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
     FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
     
   IMPROVEMENTS
+    FALCON-1170 Falcon Native Scheduler - Refactor existing workflow/coord/bundle builder(Pallavi Rao via Ajay Yadava)
+    
     FALCON-1031 Make post processing notifications to user topics optional (Pallavi Rao via Ajay Yadava)
     
     FALCON-1186 Add filtering capability to result of instance summary (Suhas Vasu)

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 03063f4..8026967 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -19,12 +19,10 @@
 package org.apache.falcon.oozie;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
 import org.apache.falcon.oozie.bundle.CONFIGURATION;
@@ -83,9 +81,9 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
             final String coordName = coordProps.getProperty(OozieEntityBuilder.ENTITY_NAME);
             coord.setName(coordName);
             coord.setAppPath(getStoragePath(coordPath));
-            Properties appProps = createAppProperties(cluster, buildPath, coordName);
-            appProps.putAll(coordProps);
-            coord.setConfiguration(getConfig(appProps));
+            coordProps.put(OozieClient.USER_NAME, CurrentUser.getUser());
+            coordProps.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
+            coord.setConfiguration(getConfig(coordProps));
             bundle.getCoordinator().add(coord);
         }
 
@@ -114,35 +112,9 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         return conf;
     }
 
-    protected Properties createAppProperties(Cluster cluster, Path buildPath,
-                                             String coordName) throws FalconException {
-        Properties properties = getEntityProperties(cluster);
-        properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
-        properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
-        properties.setProperty("colo.name", cluster.getColo());
-
-        properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
-        properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
-        properties.setProperty("falcon.libpath",
-                ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib");
-
-        if (EntityUtil.isTableStorageType(cluster, entity)) {
-            Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
-            if (tag == Tag.REPLICATION) {
-                // todo: kludge send source hcat creds for coord dependency check to pass
-                String srcClusterName = EntityUtil.getWorkflowNameSuffix(coordName, entity);
-                properties.putAll(getHiveCredentials(ClusterHelper.getCluster(srcClusterName)));
-            } else {
-                properties.putAll(getHiveCredentials(cluster));
-            }
-        }
-
-        return properties;
-    }
-
     protected Path marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
         return marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
-            OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
+                OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
     }
 
     //Used by coordinator builders to return multiple coords
@@ -152,7 +124,7 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
     public static BUNDLEAPP unmarshal(Cluster cluster, Path path) throws FalconException {
         try {
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                        path.toUri(), ClusterHelper.getConfiguration(cluster));
+                    path.toUri(), ClusterHelper.getConfiguration(cluster));
             Unmarshaller unmarshaller = OozieUtils.BUNDLE_JAXB_CONTEXT.createUnmarshaller();
             @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement =
                     unmarshaller.unmarshal(new StreamSource(fs.open(path)), BUNDLEAPP.class);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 92697b0..85f5330 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -21,23 +21,19 @@ package org.apache.falcon.oozie;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ExternalId;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.coordinator.CONFIGURATION;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.ObjectFactory;
 import org.apache.falcon.oozie.feed.FeedReplicationCoordinatorBuilder;
 import org.apache.falcon.oozie.feed.FeedRetentionCoordinatorBuilder;
 import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder;
 import org.apache.falcon.util.OozieUtils;
-import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.OozieClient;
 
@@ -52,9 +48,7 @@ import java.util.Properties;
 public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEntityBuilder<T> {
     protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
     protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
-    protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
-    protected static final String MR_QUEUE_NAME = "queueName";
-    protected static final String MR_JOB_PRIORITY = "jobPriority";
+
 
     protected static final String IGNORE = "IGNORE";
     private static final Object USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled";
@@ -111,84 +105,36 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
             OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
     }
 
-    protected Properties createCoordDefaultConfiguration(Cluster cluster,
-                                                         String coordName)  throws FalconException {
+    protected Properties createCoordDefaultConfiguration(String coordName)  throws FalconException {
         Properties props = new Properties();
-        props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
-        props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
-        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
         props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL);
         props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
-        props.put("falconDataOperation", getOperation().name());
-
-        props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
-                getStoragePath(EntityUtil.getLogPath(cluster, entity)));
         props.put(OozieClient.EXTERNAL_ID,
             new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
                 "${coord:nominalTime()}").getId());
-        props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster));
-
-        addLateDataProperties(props);
-        addBrokerProperties(cluster, props);
-
-        props.put(MR_QUEUE_NAME, "default");
-        props.put(MR_JOB_PRIORITY, "NORMAL");
         props.put(USER_JMS_NOTIFICATION_ENABLED, "true");
 
-        //props in entity override the set props.
-        props.putAll(getEntityProperties(entity));
         return props;
     }
 
-    protected abstract WorkflowExecutionContext.EntityOperations getOperation();
-
-    private void addLateDataProperties(Properties props) throws FalconException {
-        if (EntityUtil.getLateProcess(entity) == null
-            || EntityUtil.getLateProcess(entity).getLateInputs() == null
-            || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
-            props.put("shouldRecord", "false");
-        } else {
-            props.put("shouldRecord", "true");
-        }
+    public final Properties build(Cluster cluster, Path buildPath) throws FalconException {
+        throw new IllegalStateException("Not implemented for coordinator!");
     }
 
-    private void addBrokerProperties(Cluster cluster, Properties props) {
-        props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(),
-                ClusterHelper.getMessageBrokerUrl(cluster));
-        props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(),
-                ClusterHelper.getMessageBrokerImplClass(cluster));
-
-        String falconBrokerUrl = StartupProperties.get().getProperty(
-                "broker.url", "tcp://localhost:61616?daemon=true");
-        props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
-
-        String falconBrokerImplClass = StartupProperties.get().getProperty(
-                "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
-        props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
+    public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException;
 
-        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
-            DEFAULT_BROKER_MSG_TTL.toString());
-        props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
+    protected COORDINATORAPP unmarshal(String template) throws FalconException {
+        return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
     }
 
     protected CONFIGURATION getConfig(Properties props) {
         CONFIGURATION conf = new CONFIGURATION();
         for (Entry<Object, Object> prop : props.entrySet()) {
-            Property confProp = new Property();
+            CONFIGURATION.Property confProp = new CONFIGURATION.Property();
             confProp.setName((String) prop.getKey());
             confProp.setValue((String) prop.getValue());
             conf.getProperty().add(confProp);
         }
         return conf;
     }
-
-    public final Properties build(Cluster cluster, Path buildPath) throws FalconException {
-        throw new IllegalStateException("Not implemented for coordinator!");
-    }
-
-    public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException;
-
-    protected COORDINATORAPP unmarshal(String template) throws FalconException {
-        return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
-    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index f00290e..9ca0ac1 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Output;
@@ -36,9 +37,11 @@ import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.service.FalconPathFilter;
 import org.apache.falcon.service.SharedLibraryHostingService;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +73,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
 
     public static final String ENTITY_PATH = "ENTITY_PATH";
     public static final String ENTITY_NAME = "ENTITY_NAME";
+    protected static final String IGNORE = "IGNORE";
 
     private static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
         @Override
@@ -117,12 +121,12 @@ public abstract class OozieEntityBuilder<T extends Entity> {
     }
 
     public static OozieEntityBuilder get(Entity entity) {
-        switch(entity.getEntityType()) {
+        switch (entity.getEntityType()) {
         case FEED:
             return new FeedBundleBuilder((Feed) entity);
 
         case PROCESS:
-            return new ProcessBundleBuilder((Process)entity);
+            return new ProcessBundleBuilder((Process) entity);
 
         default:
         }
@@ -145,6 +149,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
                     outPath.toUri(), ClusterHelper.getConfiguration(cluster));
             OutputStream out = fs.create(outPath);
+
             try {
                 marshaller.marshal(jaxbElement, out);
             } finally {
@@ -158,11 +163,24 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         }
     }
 
+    protected Properties createAppProperties(Cluster cluster, String wfName) throws FalconException {
+        Properties properties = getEntityProperties(cluster);
+        properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
+        properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
+        properties.setProperty("colo.name", cluster.getColo());
+
+        properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
+        properties.setProperty("falcon.libpath",
+                ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath()  + "/lib");
+
+        return properties;
+    }
+
     protected Properties getHiveCredentials(Cluster cluster) {
         String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
         if (metaStoreUrl == null) {
             throw new IllegalStateException(
-                "Registry interface is not defined in cluster: " + cluster.getName());
+                    "Registry interface is not defined in cluster: " + cluster.getName());
         }
 
         Properties hiveCredentials = new Properties();
@@ -173,7 +191,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
 
         if (isSecurityEnabled) {
             String principal = ClusterHelper
-                .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+                    .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
             hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal);
             hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true");
             hiveCredentials.put("hcat.metastore.principal", principal);
@@ -236,9 +254,9 @@ public abstract class OozieEntityBuilder<T extends Entity> {
 
         //pig and java actions require partition expression as "key1=val1, key2=val2"
         props.put(prefix + "_partitions_pig",
-            "${coord:dataOutPartitions('" + output.getName() + "')}");
+                "${coord:dataOutPartitions('" + output.getName() + "')}");
         props.put(prefix + "_partitions_java",
-            "${coord:dataOutPartitions('" + output.getName() + "')}");
+                "${coord:dataOutPartitions('" + output.getName() + "')}");
 
         //hive requires partition expression as "key1='val1', key2='val2'" (with quotes around values)
         //there is no direct EL expression in oozie
@@ -246,7 +264,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         for (String key : tableStorage.getDatedPartitionKeys()) {
             StringBuilder expr = new StringBuilder();
             expr.append("${coord:dataOutPartitionValue('").append(output.getName()).append("', '").append(key)
-                .append("')}");
+                    .append("')}");
             props.put(prefix + "_dated_partition_value_" + key, expr.toString());
             partitions.add(key + "='" + expr + "'");
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 49f9e07..f8220ec 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -37,6 +37,7 @@ import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.CREDENTIAL;
 import org.apache.falcon.oozie.workflow.CREDENTIALS;
 import org.apache.falcon.oozie.workflow.END;
@@ -46,17 +47,23 @@ import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import javax.xml.bind.JAXBElement;
+import javax.xml.namespace.QName;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
@@ -79,9 +86,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml";
 
     public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(
-        new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
+            new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
 
-    private final LifeCycle lifecycle;
+    private LifeCycle lifecycle;
+
+    protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
+    protected static final String MR_QUEUE_NAME = "queueName";
+    protected static final String MR_JOB_PRIORITY = "jobPriority";
 
     public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) {
         super(entity);
@@ -96,6 +107,10 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         return lifecycle.getTag();
     }
 
+    public OozieOrchestrationWorkflowBuilder(T entity) {
+        super(entity);
+    }
+
     public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
         throws FalconException {
         switch (entity.getEntityType()) {
@@ -115,7 +130,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
 
             default:
                 throw new IllegalArgumentException("Unhandled type " + entity.getEntityType()
-                    + ", lifecycle " + lifecycle);
+                       + ", lifecycle " + lifecycle);
             }
 
         case PROCESS:
@@ -192,7 +207,19 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
 
     protected Path marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
         return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
-            OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+                OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+    }
+
+    protected Path marshal(Cluster cluster, WORKFLOWAPP workflowapp, CONFIGURATION config, Path outPath)
+        throws FalconException {
+        QName workflowQName = new org.apache.falcon.oozie.workflow.ObjectFactory()
+                .createWorkflowApp(workflowapp).getName();
+        JAXBElement<CONFIGURATION> configJaxbElement =
+                new JAXBElement(new QName(workflowQName.getNamespaceURI(), "configuration", workflowQName.getPrefix()),
+                        CONFIGURATION.class, config);
+
+        return marshal(cluster, configJaxbElement, OozieUtils.CONFIG_JAXB_CONTEXT,
+                new Path(outPath, "config-default.xml"));
     }
 
     protected WORKFLOWAPP unmarshal(String template) throws FalconException {
@@ -212,13 +239,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
         String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-            ClusterHelper.getConfiguration(cluster));
+                ClusterHelper.getConfiguration(cluster));
         try {
             addExtensionJars(fs, new Path(libext), wf);
             addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
             if (tag != null) {
                 addExtensionJars(fs, new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()),
-                    wf);
+                        wf);
             }
         } catch (IOException e) {
             throw new FalconException(e);
@@ -316,7 +343,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
      * @param cluster     cluster entity
      */
     protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName,
-        Set<String> actions) {
+                                          Set<String> actions) {
         addHCatalogCredentials(workflowApp, cluster, credentialName);
 
         // add credential to each action
@@ -349,7 +376,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
 
         credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl));
         credential.getProperty().add(createProperty("hcat.metastore.principal",
-            ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
+                ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
 
         return credential;
     }
@@ -366,4 +393,69 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
         action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
     }
+
+    public Properties createDefaultConfiguration(Cluster cluster)  throws FalconException {
+        Properties props = new Properties();
+        props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+        props.put("falconDataOperation", getOperation().name());
+
+        props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
+                getStoragePath(EntityUtil.getLogPath(cluster, entity)));
+        props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster));
+
+        addLateDataProperties(props);
+        addBrokerProperties(cluster, props);
+
+        props.put(MR_QUEUE_NAME, "default");
+        props.put(MR_JOB_PRIORITY, "NORMAL");
+
+        //props in entity override the set props.
+        props.putAll(getEntityProperties(entity));
+        props.putAll(createAppProperties(cluster, entity.getName()));
+        return props;
+    }
+
+    private void addLateDataProperties(Properties props) throws FalconException {
+        if (EntityUtil.getLateProcess(entity) == null
+                || EntityUtil.getLateProcess(entity).getLateInputs() == null
+                || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+            props.put("shouldRecord", "false");
+        } else {
+            props.put("shouldRecord", "true");
+        }
+    }
+
+    private void addBrokerProperties(Cluster cluster, Properties props) {
+        props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(),
+                ClusterHelper.getMessageBrokerUrl(cluster));
+        props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(),
+                ClusterHelper.getMessageBrokerImplClass(cluster));
+
+        String falconBrokerUrl = StartupProperties.get().getProperty(
+                "broker.url", "tcp://localhost:61616?daemon=true");
+        props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
+
+        String falconBrokerImplClass = StartupProperties.get().getProperty(
+                "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+        props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
+
+        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+                DEFAULT_BROKER_MSG_TTL.toString());
+        props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
+    }
+
+    protected abstract WorkflowExecutionContext.EntityOperations getOperation();
+
+    protected CONFIGURATION getConfig(Properties props) {
+        CONFIGURATION conf = new CONFIGURATION();
+        for (Map.Entry<Object, Object> prop : props.entrySet()) {
+            CONFIGURATION.Property confProp = new CONFIGURATION.Property();
+            confProp.setName((String) prop.getKey());
+            confProp.setValue((String) prop.getValue());
+            conf.getProperty().add(confProp);
+        }
+        return conf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 1d97204..0381e59 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -27,6 +27,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 
 import java.util.Arrays;
+import java.util.Properties;
 
 /**
  * Builds replication workflow for filesystem based feed.
@@ -71,4 +72,15 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
         decorateWorkflow(workflow, wfName, start);
         return workflow;
     }
+
+    protected Properties getWorkflowProperties(Feed feed) throws FalconException {
+        Properties props = super.getWorkflowProperties(feed);
+        if (entity.getAvailabilityFlag() == null) {
+            props.put("availabilityFlag", "NA");
+        } else {
+            props.put("availabilityFlag", entity.getAvailabilityFlag());
+        }
+
+        return props;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index de6f373..f5cc2c3 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -45,7 +45,6 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.oozie.coordinator.ACTION;
-import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
@@ -73,8 +72,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
 
     private static final String PARALLEL = "parallel";
     private static final String TIMEOUT = "timeout";
-    private static final String MR_MAX_MAPS = "maxMaps";
-    private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
     private static final String ORDER = "order";
 
     public FeedReplicationCoordinatorBuilder(Feed entity) {
@@ -101,18 +98,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         return null;
     }
 
-    @Override
-    protected WorkflowExecutionContext.EntityOperations getOperation() {
-        return WorkflowExecutionContext.EntityOperations.REPLICATE;
-    }
-
     private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
-
-        // Different workflow for each source since hive credentials vary for each cluster
-        OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster,
-            Tag.REPLICATION);
-        Properties wfProps = builder.build(trgCluster, buildPath);
-
         long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster);
         Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis);
         Date sourceEndDate = getEndDate(srcCluster);
@@ -127,6 +113,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
             return null;
         }
 
+        // Different workflow for each source since hive credentials vary for each cluster
+        OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster,
+                Tag.REPLICATION);
+        Properties wfProps = builder.build(trgCluster, buildPath);
+
         COORDINATORAPP coord = unmarshal(REPLICATION_COORD_TEMPLATE);
 
         String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()),
@@ -155,24 +146,18 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     }
 
     private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath,
-        String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
+             String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
         ACTION action = new ACTION();
         WORKFLOW workflow = new WORKFLOW();
 
         workflow.setAppPath(getStoragePath(buildPath));
-        Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+        Properties props = createCoordDefaultConfiguration(wfName);
         // Override CLUSTER_NAME property to include both source and target cluster pair
         String clusterProperty = trgCluster.getName()
                 + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
         props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
         props.put("srcClusterName", srcCluster.getName());
         props.put("srcClusterColo", srcCluster.getColo());
-        if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
-            props.put(MR_MAX_MAPS, getDefaultMaxMaps());
-        }
-        if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
-            props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
-        }
 
         // the storage type is uniform across source and target feeds for replication
         props.put("falconFeedStorageType", sourceStorage.getType().name());
@@ -183,12 +168,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
             instancePaths = pathsWithPartitions;
 
             propagateFileSystemCopyProperties(pathsWithPartitions, props);
-
-            if (entity.getAvailabilityFlag() == null) {
-                props.put("availabilityFlag", "NA");
-            } else {
-                props.put("availabilityFlag", entity.getAvailabilityFlag());
-            }
         } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
             instancePaths = "${coord:dataIn('input')}";
             final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
@@ -197,25 +176,17 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
             propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
             propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props);
             setupHiveConfiguration(srcCluster, trgCluster, buildPath);
-            props.put("availabilityFlag", "NA");
         }
 
         propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);
-        props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
-
+        // Add the custom properties set in feed. Else, dryrun won't catch any missing props.
+        props.putAll(getEntityProperties(entity));
         workflow.setConfiguration(getConfig(props));
         action.setWorkflow(workflow);
 
         return action;
     }
 
-    private String getDefaultMaxMaps() {
-        return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
-    }
-
-    private String getDefaultMapBandwidth() {
-        return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100");
-    }
 
     private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster) throws FalconException {
         String srcPart = FeedHelper.normalizePartitionExpression(

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index aa936ad..fb41b96 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -23,6 +23,7 @@ import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -31,6 +32,8 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 
 import java.util.Properties;
@@ -41,6 +44,8 @@ import java.util.Properties;
 public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
     protected static final String REPLICATION_ACTION_TEMPLATE = "/action/feed/replication-action.xml";
     protected static final String REPLICATION_ACTION_NAME = "replication";
+    private static final String MR_MAX_MAPS = "maxMaps";
+    private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
 
     public FeedReplicationWorkflowBuilder(Feed entity) {
         super(entity, LifeCycle.REPLICATION);
@@ -56,8 +61,32 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
         addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION);
 
         marshal(cluster, workflow, buildPath);
-        return getProperties(buildPath, wfName);
+        Properties props = getProperties(buildPath, wfName);
+        props.putAll(createDefaultConfiguration(cluster));
+        if (EntityUtil.isTableStorageType(cluster, entity)) {
+            // todo: kludge send source hcat creds for coord dependency check to pass
+            props.putAll(getHiveCredentials(srcCluster));
+            props.putAll(getHiveCredentials(cluster));
+        }
+        props.putAll(getWorkflowProperties(entity));
+        props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+        // Write out the config to config-default.xml
+        marshal(cluster, workflow, getConfig(props), buildPath);
+        return props;
     }
+
+    protected Properties getWorkflowProperties(Feed feed) throws FalconException {
+        Properties props = FeedHelper.getFeedProperties(feed);
+        if (props.getProperty(MR_MAX_MAPS) == null) { // set default if user has not overridden
+            props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+        }
+        if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
+            props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
+        }
+
+        return props;
+    }
+
     protected ACTION addHDFSServersConfig(ACTION action, Cluster sourceCluster, Cluster targetCluster) {
         if (isSecurityEnabled) {
             // this is to ensure that the delegation tokens are checked out for both clusters
@@ -70,4 +99,17 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
         return action;
     }
     protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException;
+
+    @Override
+    protected WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.REPLICATE;
+    }
+
+    private String getDefaultMaxMaps() {
+        return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+    }
+
+    private String getDefaultMapBandwidth() {
+        return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100");
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index c896d5a..ce9ef9a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -22,7 +22,6 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -33,8 +32,6 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.coordinator.ACTION;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 
 import java.util.Arrays;
@@ -73,32 +70,15 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
         }
 
         Path coordPath = getBuildPath(buildPath);
-        Properties props = createCoordDefaultConfiguration(cluster, coordName);
-        props.put("timeZone", entity.getTimezone().getID());
-        props.put("frequency", entity.getFrequency().getTimeUnit().name());
-
-        final Storage storage = FeedHelper.createStorage(cluster, entity);
-        props.put("falconFeedStorageType", storage.getType().name());
-
-        String feedDataPath = storage.getUriTemplate();
-        props.put("feedDataPath",
-            feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-
-        props.put("limit", feedCluster.getRetention().getLimit().toString());
-
-        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
-        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
-
-        props.put("falconInputFeeds", entity.getName());
-        props.put("falconInPaths", IGNORE);
-
-        props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+        Properties props = createCoordDefaultConfiguration(coordName);
 
         WORKFLOW workflow = new WORKFLOW();
-        Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster,
+        Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster,
             coordPath);
-        workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH)));
-        props.putAll(wfProp);
+        workflow.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+        props.putAll(getProperties(coordPath, coordName));
+        // Add the custom properties set in feed. Else, dryrun won't catch any missing props.
+        props.putAll(getEntityProperties(entity));
         workflow.setConfiguration(getConfig(props));
         ACTION action = new ACTION();
         action.setWorkflow(workflow);
@@ -108,9 +88,4 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
         Path marshalPath = marshal(cluster, coord, coordPath);
         return Arrays.asList(getProperties(marshalPath, coordName));
     }
-
-    @Override
-    protected WorkflowExecutionContext.EntityOperations getOperation() {
-        return WorkflowExecutionContext.EntityOperations.DELETE;
-    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index 51e081f..b56f0dd 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -22,11 +22,15 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 
 import java.util.Properties;
@@ -64,20 +68,47 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
         decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
         addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
 
+        Properties props = getProperties(buildPath, wfName);
+        props.putAll(getWorkflowProperties(cluster));
+        props.putAll(createDefaultConfiguration(cluster));
+        props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+
         if (EntityUtil.isTableStorageType(cluster, entity)) {
             setupHiveCredentials(cluster, buildPath, workflow);
+            // todo: kludge send source hcat creds for coord dependency check to pass
+            props.putAll(getHiveCredentials(cluster));
         }
 
         marshal(cluster, workflow, buildPath);
-        Properties props = getProperties(buildPath, wfName);
-        props.putAll(getWorkflowProperties());
+
+        // Write out the config to config-default.xml
+        marshal(cluster, workflow, getConfig(props), buildPath);
         return props;
     }
 
-    private Properties getWorkflowProperties() {
+    private Properties getWorkflowProperties(Cluster cluster) throws FalconException {
         Properties props = new Properties();
         props.setProperty("srcClusterName", "NA");
         props.setProperty("availabilityFlag", "NA");
+
+        props.put("timeZone", entity.getTimezone().getID());
+        props.put("frequency", entity.getFrequency().getTimeUnit().name());
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        final Storage storage = FeedHelper.createStorage(cluster, entity);
+        props.put("falconFeedStorageType", storage.getType().name());
+
+        String feedDataPath = storage.getUriTemplate();
+        props.put("feedDataPath",
+                feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+        props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
+
+        props.put("falconInputFeeds", entity.getName());
+        props.put("falconInPaths", IGNORE);
         return props;
     }
 
@@ -110,4 +141,9 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
             }
         }
     }
+
+    @Override
+    protected WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.DELETE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 72bbca4..347ddaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -27,6 +27,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 
 import java.util.Arrays;
+import java.util.Properties;
 
 /**
  * Builds replication workflow for hcat based feed.
@@ -135,4 +136,11 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
             }
         }
     }
+
+    protected Properties getWorkflowProperties(Feed feed) throws FalconException {
+        Properties props = super.getWorkflowProperties(feed);
+        props.put("availabilityFlag", "NA");
+
+        return props;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index 60f9fe1..d6d42e1 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -36,7 +36,6 @@ import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.oozie.OozieCoordinatorBuilder;
 import org.apache.falcon.oozie.OozieEntityBuilder;
@@ -51,7 +50,6 @@ import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
@@ -82,37 +80,31 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         coord.setControls(controls);
 
         // Configuration
-        Properties props = createCoordDefaultConfiguration(cluster, coordName);
+        Properties props = createCoordDefaultConfiguration(coordName);
 
         initializeInputPaths(cluster, coord, props); // inputs
         initializeOutputPaths(cluster, coord, props);  // outputs
 
-        Workflow processWorkflow = entity.getWorkflow();
-        propagateUserWorkflowProperties(processWorkflow, props);
-
         // create parent wf
         Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT).build(cluster,
             coordPath);
 
         WORKFLOW wf = new WORKFLOW();
         wf.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
-        props.putAll(wfProps);
+        // Add the custom properties set in feed. Else, dryrun won't catch any missing props.
+        props.putAll(getEntityProperties(entity));
         wf.setConfiguration(getConfig(props));
 
         // set coord action to parent wf
         org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
         action.setWorkflow(wf);
+
         coord.setAction(action);
 
         Path marshalPath = marshal(cluster, coord, coordPath);
         return Arrays.asList(getProperties(marshalPath, coordName));
     }
 
-    @Override
-    protected WorkflowExecutionContext.EntityOperations getOperation() {
-        return WorkflowExecutionContext.EntityOperations.GENERATE;
-    }
-
     private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) {
         coord.setName(coordName);
         org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity,
@@ -351,12 +343,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
-    private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) {
-        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
-            processWorkflow.getName(), entity.getName()));
-        props.put("userWorkflowVersion", processWorkflow.getVersion());
-        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
-    }
+
 
     protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
         String prefix = "falcon_" + input.getName();

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 8b18ecc..ac436ca 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -33,15 +34,18 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.oozie.client.OozieClient;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -102,8 +106,21 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         }
 
         marshal(cluster, wfApp, buildPath);
-        Properties props = getProperties(buildPath, wfName);
+        Properties props =  createDefaultConfiguration(cluster);
+        props.putAll(getProperties(buildPath, wfName));
         props.putAll(getWorkflowProperties());
+        props.setProperty(OozieClient.APP_PATH, buildPath.toString());
+
+        //Add libpath
+        Path libPath = new Path(buildPath, "lib");
+        copySharedLibs(cluster, libPath);
+        props.put(OozieClient.LIBPATH, libPath.toString());
+
+        Workflow processWorkflow = ((Process)(entity)).getWorkflow();
+        propagateUserWorkflowProperties(processWorkflow, props);
+
+        // Write out the config to config-default.xml
+        marshal(cluster, wfApp, getConfig(props), buildPath);
 
         return props;
     }
@@ -251,4 +268,16 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
             throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
         }
     }
+
+    private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) {
+        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+                processWorkflow.getName(), entity.getName()));
+        props.put("userWorkflowVersion", processWorkflow.getVersion());
+        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+    }
+
+    @Override
+    protected WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.GENERATE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 0ae229c..149a7e6 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -20,6 +20,7 @@ package org.apache.falcon.util;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.hive.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.xerces.dom.ElementNSImpl;
@@ -43,6 +44,7 @@ public final class OozieUtils {
     public static final JAXBContext ACTION_JAXB_CONTEXT;
     public static final JAXBContext COORD_JAXB_CONTEXT;
     public static final JAXBContext BUNDLE_JAXB_CONTEXT;
+    public static final JAXBContext CONFIG_JAXB_CONTEXT;
     protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
 
     static {
@@ -51,6 +53,7 @@ public final class OozieUtils {
             ACTION_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.ACTION.class);
             COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
             BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
+            CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(CONFIGURATION.class);
             HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
                 org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
         } catch (JAXBException e) {
@@ -72,7 +75,7 @@ public final class OozieUtils {
     }
 
     @SuppressWarnings("unchecked")
-    public static  JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) {
+    public static JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) {
         try {
             Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
             unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
@@ -94,5 +97,4 @@ public final class OozieUtils {
             throw new RuntimeException("Unable to marshall hive action.", e);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index b223447..2f7787d 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -201,9 +201,12 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        verifyEntityProperties(feed, trgCluster, srcCluster,
+        verifyEntityProperties(trgCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
-        verifyBrokerProperties(trgCluster, props);
+
+        HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
+        verifyEntityProperties(feed, trgCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
+        verifyBrokerProperties(trgCluster, wfProps);
 
         // verify the replication param that feed replicator depends on
         String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
@@ -226,15 +229,15 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
 
         // verify workflow params
-        Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
-        Assert.assertEquals(props.get("userWorkflowVersion"), "0.6");
-        Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
+        Assert.assertEquals(wfProps.get("userWorkflowName"), "replication-policy");
+        Assert.assertEquals(wfProps.get("userWorkflowVersion"), "0.6");
+        Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
 
         // verify default params
-        Assert.assertEquals(props.get("queueName"), "default");
-        Assert.assertEquals(props.get("jobPriority"), "NORMAL");
-        Assert.assertEquals(props.get("maxMaps"), "5");
-        Assert.assertEquals(props.get("mapBandwidth"), "100");
+        Assert.assertEquals(wfProps.get("queueName"), "default");
+        Assert.assertEquals(wfProps.get("jobPriority"), "NORMAL");
+        Assert.assertEquals(wfProps.get("maxMaps"), "5");
+        Assert.assertEquals(wfProps.get("mapBandwidth"), "100");
 
         assertLibExtensions(coord, "replication");
         WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
@@ -340,12 +343,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
         Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
-        Assert.assertEquals(props.get("maxMaps"), "33");
-        Assert.assertEquals(props.get("mapBandwidth"), "2");
 
-        verifyEntityProperties(aFeed, aCluster, srcCluster,
+        verifyEntityProperties(aCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
-        verifyBrokerProperties(trgCluster, props);
+
+        HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
+        verifyEntityProperties(aFeed, aCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
+        verifyBrokerProperties(aCluster, wfProps);
+
+        Assert.assertEquals(wfProps.get("maxMaps"), "33");
+        Assert.assertEquals(wfProps.get("mapBandwidth"), "2");
     }
 
     public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) {
@@ -484,9 +491,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
                 wfPath.toString());
 
-        verifyEntityProperties(tableFeed, trgCluster, srcCluster,
-                WorkflowExecutionContext.EntityOperations.REPLICATE, props);
-        verifyBrokerProperties(trgCluster, props);
+        HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
+        verifyEntityProperties(tableFeed, trgCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
+        verifyBrokerProperties(trgCluster, wfProps);
     }
 
     private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
@@ -592,10 +599,13 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        String feedDataPath = props.get("feedDataPath");
-        String storageType = props.get("falconFeedStorageType");
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+
+        String feedDataPath = wfProps.get("feedDataPath");
+        String storageType = wfProps.get("falconFeedStorageType");
 
         // verify the param that feed evictor depends on
+
         Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
 
         final Storage storage = FeedHelper.createStorage(cluster, feed);
@@ -609,8 +619,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         }
 
         // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), feed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+        Assert.assertEquals(wfProps.get("feedNames"), feed.getName());
+        Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE");
 
         assertWorkflowRetries(getWorkflowapp(srcMiniDFS.getFileSystem(), coord));
 
@@ -651,8 +661,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        String feedDataPath = props.get("feedDataPath");
-        String storageType = props.get("falconFeedStorageType");
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+
+        String feedDataPath = wfProps.get("feedDataPath");
+        String storageType = wfProps.get("falconFeedStorageType");
 
         // verify the param that feed evictor depends on
         Assert.assertEquals(storageType, Storage.TYPE.TABLE.name());
@@ -668,13 +680,13 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         }
 
         // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+        Assert.assertEquals(wfProps.get("feedNames"), tableFeed.getName());
+        Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE");
 
         assertWorkflowRetries(coord);
-        verifyBrokerProperties(srcCluster, props);
+        verifyBrokerProperties(srcCluster, wfProps);
         verifyEntityProperties(tableFeed, trgCluster,
-                WorkflowExecutionContext.EntityOperations.DELETE, props);
+                WorkflowExecutionContext.EntityOperations.DELETE, wfProps);
 
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
         assertHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index 6488682..ce76594 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -31,10 +31,11 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -46,6 +47,9 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
 import javax.xml.transform.stream.StreamSource;
 import javax.xml.validation.Schema;
 import javax.xml.validation.SchemaFactory;
@@ -116,7 +120,7 @@ public class AbstractTestBase {
         Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
         unmarshaller.setSchema(schema);
         JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
-            new StreamSource(new ByteArrayInputStream(coordStr.trim().getBytes())), COORDINATORAPP.class);
+                new StreamSource(new ByteArrayInputStream(coordStr.trim().getBytes())), COORDINATORAPP.class);
         return jaxbBundle.getValue();
     }
 
@@ -128,7 +132,7 @@ public class AbstractTestBase {
         Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
         unmarshaller.setSchema(schema);
         JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
-            new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
+                new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
         return jaxbBundle.getValue();
     }
 
@@ -153,7 +157,7 @@ public class AbstractTestBase {
     }
 
     protected void assertLibExtensions(FileSystem fs, COORDINATORAPP coord, EntityType type,
-        String lifecycle) throws Exception {
+                                       String lifecycle) throws Exception {
         WORKFLOWAPP wf = getWorkflowapp(fs, coord);
         List<Object> actions = wf.getDecisionOrForkOrJoin();
         String lifeCyclePath = lifecycle == null ? "" : "/" + lifecycle;
@@ -172,7 +176,7 @@ public class AbstractTestBase {
             }
             if (files != null) {
                 Assert.assertTrue(files.get(files.size() - 1).endsWith(
-                    "/projects/falcon/working/libext/" + type.name() + lifeCyclePath + "/ext.jar"));
+                        "/projects/falcon/working/libext/" + type.name() + lifeCyclePath + "/ext.jar"));
             }
         }
     }
@@ -209,36 +213,57 @@ public class AbstractTestBase {
 
     protected HashMap<String, String> getCoordProperties(COORDINATORAPP coord) {
         HashMap<String, String> props = new HashMap<String, String>();
-        for (CONFIGURATION.Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+        for (org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
+                : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
             props.put(prop.getName(), prop.getValue());
         }
         return props;
     }
 
-    protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
+    protected HashMap<String, String> getWorkflowProperties(FileSystem fs, COORDINATORAPP coord)
+        throws JAXBException, IOException, XMLStreamException {
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        StreamSource xml = new StreamSource(fs.open(new Path(wfPath + "/config-default.xml")));
+        XMLInputFactory xif = XMLInputFactory.newFactory();
+        XMLStreamReader xsr = xif.createXMLStreamReader(xml);
+        JAXBContext jaxbContext = OozieUtils.CONFIG_JAXB_CONTEXT;
+        CONFIGURATION jaxbConfig =  ((JAXBElement<CONFIGURATION>) jaxbContext.createUnmarshaller().
+                unmarshal(xsr, CONFIGURATION.class)).getValue();
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (CONFIGURATION.Property prop : jaxbConfig.getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+        return props;
+    }
+
+    protected void verifyEntityProperties(Cluster cluster, Cluster srcCluster,
                                           WorkflowExecutionContext.EntityOperations operation,
                                           HashMap<String, String> props) throws Exception {
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
-                entity.getName());
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
-                entity.getEntityType().name());
         if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
             Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
                     cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
         } else {
             Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
         }
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
-        Assert.assertEquals(props.get("falconDataOperation"), operation.name());
     }
 
     protected void verifyEntityProperties(Entity entity, Cluster cluster,
                                           WorkflowExecutionContext.EntityOperations operation,
                                           HashMap<String, String> props) throws Exception {
-        verifyEntityProperties(entity, cluster, null, operation, props);
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
+                entity.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
+                entity.getEntityType().name());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
+        Assert.assertEquals(props.get("falconDataOperation"), operation.name());
+        Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName()));
+        Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName()));
+        Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName()));
     }
 
-    private String getLogPath(Cluster cluster, Entity entity) {
+    protected String getLogPath(Cluster cluster, Entity entity) {
         Path logPath = EntityUtil.getLogPath(cluster, entity);
         return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 4e5c3f0..3aaf304 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -188,14 +188,15 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
 
         HashMap<String, String> props = getCoordProperties(coord);
-        assertEquals(props.get("mapred.job.priority"), "LOW");
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+        assertEquals(wfProps.get("mapred.job.priority"), "LOW");
         List<Input> inputs = process.getInputs().getInputs();
         assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), inputs.get(0).getName() + "#" + inputs
             .get(1).getName());
 
         verifyEntityProperties(process, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         assertLibExtensions(fs, coord, EntityType.PROCESS, null);
     }
@@ -285,10 +286,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
         HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
 
         verifyEntityProperties(process, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         // verify table and hive props
         Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
@@ -347,11 +349,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
 
         verifyEntityProperties(process, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -401,11 +403,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
 
         verifyEntityProperties(process, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -451,10 +453,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
         verifyEntityProperties(process, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -550,9 +552,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
             }
         }
+
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
         verifyEntityProperties(process, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         // verify the late data params
         Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
@@ -684,9 +688,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
         HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
         verifyEntityProperties(processEntity, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         String[] expected = {
             WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
@@ -694,9 +699,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
             WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
             WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
             WorkflowExecutionArgs.INPUT_NAMES.getName(),
-            WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
-            WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
-            WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
         };
 
         for (String property : expected) {
@@ -726,9 +728,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
         HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
         verifyEntityProperties(processEntity, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
         Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "IGNORE");
@@ -756,9 +759,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
         HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
         verifyEntityProperties(processEntity, cluster,
-                WorkflowExecutionContext.EntityOperations.GENERATE, props);
-        verifyBrokerProperties(cluster, props);
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
 
         Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "impressions");
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "NONE");


Mime
View raw message