falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2228 Set output names into workflow builder and also maintain the order of the input and ouputs
Date Mon, 02 Jan 2017 09:06:53 GMT
Repository: falcon
Updated Branches:
  refs/heads/master ccc343fa8 -> 9fdfe713f


FALCON-2228 Set output names into workflow builder and also maintain the order of the input
and ouputs

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao

Closes #329 from sandeepSamudrala/FALCON-2228 and squashes the following commits:

b8a3e3e [sandeep] FALCON-2228 Incorporated review comments. Removed OUTPUT_STORAGE_TYPES
4d00d7f [sandeep] FALCON-2228 Set output names into workflow builder and also maintain the
order of the input and ouputs
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time
with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9fdfe713
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9fdfe713
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9fdfe713

Branch: refs/heads/master
Commit: 9fdfe713f8d72e63fb45aaed93071cbde39b9eec
Parents: ccc343f
Author: sandeep <sandysmdl@gmail.com>
Authored: Mon Jan 2 14:36:43 2017 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Jan 2 14:36:43 2017 +0530

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 .../falcon/workflow/WorkflowExecutionArgs.java  |  3 +-
 .../retention/AgeBasedWorkflowBuilder.java      |  5 ++-
 .../falcon/oozie/ExportWorkflowBuilder.java     |  1 +
 .../falcon/oozie/ImportWorkflowBuilder.java     |  1 +
 .../feed/FeedReplicationCoordinatorBuilder.java |  1 +
 .../feed/FeedRetentionWorkflowBuilder.java      |  5 ++-
 .../NativeOozieProcessWorkflowBuilder.java      | 21 ++++++----
 .../ProcessExecutionCoordinatorBuilder.java     | 18 +++++----
 .../feed/OozieFeedWorkflowBuilderTest.java      | 41 ++++++++++++--------
 .../OozieProcessWorkflowBuilderTest.java        | 18 ++++++---
 .../workflow/FalconPostProcessingTest.java      |  4 +-
 12 files changed, 75 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b6733d7..8589d94 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,6 +37,7 @@ activemq-data
 #log files
 logs
 *.log
+*.patch
 
 #Falcon UI NPM files
 falcon-ui/dist

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 682b14e..dcf7cb5 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -52,7 +52,7 @@ public enum WorkflowExecutionArgs {
     // workflow execution details
     WORKFLOW_ID("workflowId", "current workflow-id of the instance"),
     RUN_ID("runId", "current run-id of the instance"),
-    STATUS("status", "status of the user workflow isnstance"),
+    STATUS("status", "status of the user workflow instance"),
     WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie", false),
     USER_SUBFLOW_ID("subflowId", "external id of user workflow", false),
     PARENT_ID("parentId", "The parent of the current workflow, typically coord action", false),
@@ -70,6 +70,7 @@ public enum WorkflowExecutionArgs {
     // what outputs
     OUTPUT_FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
     OUTPUT_FEED_PATHS("feedInstancePaths", "comma separated feed instance paths"),
+    OUTPUT_NAMES("feedInstanceNames", "comma separated list of names of outputs", false),
 
     // broker related parameters
     TOPIC_NAME("topicName", "name of the topic to be used to send JMS message", false),

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
index dd0c6d2..f37f897 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
@@ -121,14 +121,15 @@ public final class AgeBasedWorkflowBuilder {
         props.put("frequency", feed.getFrequency().getTimeUnit().name());
         props.put("falconFeedStorageType", storage.getType().name());
         props.put("limit", new AgeBasedDelete().getRetentionLimit(feed, cluster.getName()).toString());
-        props.put("falconInputFeeds", feed.getName());
-        props.put("falconInPaths", OozieBuilderUtils.IGNORE);
+        props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), feed.getName());
+        props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), OozieBuilderUtils.IGNORE);
 
         String feedDataPath = storage.getUriTemplate();
         props.put("feedDataPath",
                 feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
 
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), feed.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), feed.getName());
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OozieBuilderUtils.IGNORE);
 
         return props;

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
index af7431a..7f4ba69 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
@@ -62,6 +62,7 @@ public abstract class ExportWorkflowBuilder extends OozieOrchestrationWorkflowBu
             props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
         }
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE);
+        props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), NONE);
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE);
 
         props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), entity.getName());

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
index 2d93189..fd97fa6 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
@@ -63,6 +63,7 @@ public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBu
             props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
         }
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), entity.getName());
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
                 String.format("${coord:dataOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
         props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), NONE);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 07d293c..77f2c75 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -262,6 +262,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
 
         // falcon post processing
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), entity.getName());
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "${coord:dataOut('output')}");
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index fd51ed0..553bf05 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -95,10 +95,11 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
         props.put("limit", feedCluster.getRetention().getLimit().toString());
 
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), entity.getName());
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
 
-        props.put("falconInputFeeds", entity.getName());
-        props.put("falconInPaths", IGNORE);
+        props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), IGNORE);
         props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
index 78e049d..0a31b98 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
@@ -41,6 +41,7 @@ import org.joda.time.format.DateTimeFormatter;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 
@@ -110,13 +111,16 @@ public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuild
         if (entity.getOutputs() == null) {
             props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE);
             props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE);
+            props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), NONE);
             return props;
         }
-        List<String> feedNames = new ArrayList<>();
-        List<String> feedInstancePaths= new ArrayList<>();
+        List<String> falconOutputFeeds = new LinkedList<>();
+        List<String> feedInstancePaths= new LinkedList<>();
+        List<String> falconOutputNames = new LinkedList<>();
         for (Output output : entity.getOutputs().getOutputs()) {
             Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
-            feedNames.add(feed.getName());
+            falconOutputFeeds.add(feed.getName());
+            falconOutputNames.add(output.getName());
             String outputExp = output.getInstance();
             Date outTime = EXPRESSION_HELPER.evaluate(outputExp, Date.class);
             for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters())
{
@@ -139,7 +143,8 @@ public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuild
                 }
             }
         }
-        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(feedNames,
","));
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(falconOutputFeeds,
","));
+        props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), StringUtils.join(falconOutputNames,
","));
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(feedInstancePaths,
","));
         return props;
     }
@@ -154,10 +159,10 @@ public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuild
             props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), NONE);
             return props;
         }
-        List<String> falconInputFeeds = new ArrayList<>();
-        List<String> falconInputNames = new ArrayList<>();
-        List<String> falconInputPaths = new ArrayList<>();
-        List<String> falconInputFeedStorageTypes = new ArrayList<>();
+        List<String> falconInputFeeds = new LinkedList<>();
+        List<String> falconInputNames = new LinkedList<>();
+        List<String> falconInputPaths = new LinkedList<>();
+        List<String> falconInputFeedStorageTypes = new LinkedList<>();
         for (Input input : entity.getInputs().getInputs()) {
             Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
             Storage storage = FeedHelper.createStorage(clusterObj, feed);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index 91f4757..a45c9a9 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -52,8 +52,8 @@ import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.hadoop.fs.Path;
 
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 
@@ -150,10 +150,10 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
             return;
         }
 
-        List<String> inputFeeds = new ArrayList<String>();
-        List<String> inputNames = new ArrayList<String>();
-        List<String> inputPaths = new ArrayList<String>();
-        List<String> inputFeedStorageTypes = new ArrayList<String>();
+        List<String> inputFeeds = new LinkedList<>();
+        List<String> inputNames = new LinkedList<>();
+        List<String> inputPaths = new LinkedList<>();
+        List<String> inputFeedStorageTypes = new LinkedList<>();
         for (Input input : entity.getInputs().getInputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
@@ -254,6 +254,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         if (entity.getOutputs() == null) {
             props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE);
             props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE);
+            props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), NONE);
             return;
         }
 
@@ -265,8 +266,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
             coord.setOutputEvents(new OUTPUTEVENTS());
         }
 
-        List<String> outputFeeds = new ArrayList<String>();
-        List<String> outputPaths = new ArrayList<String>();
+        List<String> outputFeeds = new LinkedList<>();
+        List<String> outputPaths = new LinkedList<>();
+        List<String> falconOutputNames = new LinkedList<>();
         for (Output output : entity.getOutputs().getOutputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
@@ -282,6 +284,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
 
             String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
             outputFeeds.add(feed.getName());
+            falconOutputNames.add(output.getName());
             outputPaths.add(outputExpr);
 
             if (storage.getType() == Storage.TYPE.FILESYSTEM) {
@@ -295,6 +298,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
 
         // Output feed name and path for parent workflow
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(outputFeeds,
','));
+        props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), StringUtils.join(falconOutputNames,
','));
         props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(outputPaths,
','));
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index d753baf..5418562 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -195,7 +195,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(coord.getTimezone(), "UTC");
 
         HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(),
coord);
-        Assert.assertEquals(wfProps.get("feedNames"), lifecycleRetentionFeed.getName());
+        Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
+                lifecycleRetentionFeed.getName());
         Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name()));
         Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
         Assert.assertEquals(wfProps.get("queueName"), "ageBasedDeleteQueue");
@@ -219,7 +220,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(coord.getTimezone(), "UTC");
 
         HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(),
coord);
-        Assert.assertEquals(wfProps.get("feedNames"), lifecycleLocalRetentionFeed.getName());
+        Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
+                lifecycleLocalRetentionFeed.getName());
         Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name()));
         Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
         Assert.assertEquals(wfProps.get("queueName"), "local");
@@ -370,15 +372,17 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
 
         // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()),
feed.getName());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), feed.getName());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()),
"${coord:dataIn('input')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()),
pathsWithPartitions);
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()),
+                Storage.TYPE.FILESYSTEM.name());
 
         // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), feed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
feed.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"${coord:dataOut('output')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), feed.getName());
 
         // verify workflow params
         Assert.assertEquals(wfProps.get("userWorkflowName"), "replication-policy");
@@ -643,14 +647,15 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
 
         // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()),
tableFeed.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()),
"${coord:dataIn('input')}");
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), tableFeed.getName());
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()),
Storage.TYPE.TABLE.name());
 
         // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
tableFeed.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), tableFeed.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"${coord:dataOut('output')}");
 
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
         assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
@@ -788,8 +793,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         }
 
         // verify the post processing params
-        Assert.assertEquals(wfProps.get("feedNames"), feed.getName());
-        Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE");
+        Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
feed.getName());
+        Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), feed.getName());
+        Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"IGNORE");
 
         assertWorkflowRetries(getWorkflowapp(srcMiniDFS.getFileSystem(), coord));
 
@@ -850,8 +856,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         }
 
         // verify the post processing params
-        Assert.assertEquals(wfProps.get("feedNames"), tableFeed.getName());
-        Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE");
+        Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
tableFeed.getName());
+        Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), tableFeed.getName());
+        Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"IGNORE");
 
         assertWorkflowRetries(coord);
         verifyBrokerProperties(srcCluster, wfProps);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 05b513e..840b332 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -195,8 +195,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
         assertEquals(wfProps.get("mapred.job.priority"), "LOW");
         List<Input> inputs = process.getInputs().getInputs();
+        List<Output> outputs = process.getOutputs().getOutputs();
         assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), inputs.get(0).getName()
+ "#" + inputs
             .get(1).getName());
+        assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), outputs.get(0).getName());
 
         verifyEntityProperties(process, cluster,
                 WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
@@ -685,15 +687,19 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         verifyBrokerProperties(cluster, wfProps);
 
         // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()),
+                process.getInputs().getInputs().get(0).getFeed());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()),
"${coord:dataIn('input')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()),
Storage.TYPE.TABLE.name());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()),
             process.getInputs().getInputs().get(0).getName());
 
         // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
+                process.getOutputs().getOutputs().get(0).getFeed());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"${coord:dataOut('output')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()),
+                process.getOutputs().getOutputs().get(0).getName());
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}",
"");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -853,6 +859,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
 
         String[] expected = {
             WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
+            WorkflowExecutionArgs.OUTPUT_NAMES.getName(),
             WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
             WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
             WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
@@ -893,6 +900,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
 
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()),
"clicks");
         Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
"NONE");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), "NONE");
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 4132c3a..8e7804e 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -213,13 +213,13 @@ public class FalconPostProcessingTest {
 
         // Verify user message
         if (checkUserMessage) {
-            verifyMesssage(consumer);
+            verifyMessage(consumer);
         }
 
         connection.close();
     }
 
-    private void verifyMesssage(MessageConsumer consumer) throws JMSException {
+    private void verifyMessage(MessageConsumer consumer) throws JMSException {
 
         String[] actualFeedNames = new String[outputFeedPaths.length];
         String[] actualFeedPaths = new String[outputFeedPaths.length];


Mime
View raw message