falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject git commit: FALCON-630 late data rerun for process broken in trunk. Contributed by Shwetha GS
Date Tue, 26 Aug 2014 06:48:50 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 5000fbbd6 -> 06f52eca7


FALCON-630 late data rerun for process broken in trunk. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/06f52eca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/06f52eca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/06f52eca

Branch: refs/heads/master
Commit: 06f52eca7b324b779aa88bdaaa12df618a03fdb2
Parents: 5000fbb
Author: Shwetha GS <shwetha.gs@inmobi.com>
Authored: Tue Aug 26 12:17:56 2014 +0530
Committer: Shwetha GS <shwetha.gs@inmobi.com>
Committed: Tue Aug 26 12:18:32 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../falcon/workflow/WorkflowExecutionArgs.java  |  8 +++---
 .../workflow/WorkflowExecutionContext.java      |  8 +++---
 .../metadata/MetadataMappingServiceTest.java    |  4 +--
 .../workflow/WorkflowExecutionContextTest.java  |  4 +--
 .../WorkflowJobEndNotificationServiceTest.java  |  4 +--
 .../falcon/messaging/JMSMessageProducer.java    |  8 +++---
 .../falcon/messaging/FeedProducerTest.java      | 12 ++++-----
 .../messaging/JMSMessageConsumerTest.java       |  4 +--
 .../messaging/JMSMessageProducerTest.java       | 24 +++++++++---------
 .../falcon/messaging/ProcessProducerTest.java   | 12 ++++-----
 .../feed/FeedReplicationCoordinatorBuilder.java | 11 +++++----
 .../feed/FeedRetentionCoordinatorBuilder.java   |  4 +--
 .../ProcessExecutionCoordinatorBuilder.java     | 26 +++++++++++---------
 .../src/main/resources/action/post-process.xml  |  2 +-
 oozie/src/main/resources/action/pre-process.xml |  4 +--
 .../feed/OozieFeedWorkflowBuilderTest.java      |  3 +++
 .../OozieProcessWorkflowBuilderTest.java        | 15 +++++++----
 .../workflow/FalconPostProcessingTest.java      | 12 ++++-----
 .../metadata/LineageMetadataResourceTest.java   |  4 +--
 .../apache/falcon/latedata/LateDataHandler.java | 10 +++++---
 .../falcon/rerun/handler/LateRerunConsumer.java | 22 +++++++++--------
 .../apache/falcon/late/LateDataHandlerIT.java   |  4 +--
 23 files changed, 114 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e3f919..36ca573 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-630 late data rerun for process broken in trunk. (Shwetha GS)
+
    FALCON-611 Post process arg status is in 'FAILED' state always
    (Shwetha GS via Suhas Vasu)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 92af3e1..514bafe 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -58,11 +58,13 @@ public enum WorkflowExecutionArgs {
 
     // what inputs
     INPUT_FEED_NAMES("falconInputFeeds", "name of the feeds which are used as inputs", false),
-    INPUT_FEED_PATHS("falconInputPaths", "comma separated input feed instance paths", false),
+    INPUT_FEED_PATHS("falconInPaths", "comma separated input feed instance paths", false),
+    INPUT_NAMES("falconInputNames", "name of the inputs", false),
+    INPUT_STORAGE_TYPES("falconInputFeedStorageTypes", "input storage types", false),
 
     // what outputs
-    FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
-    FEED_INSTANCE_PATHS("feedInstancePaths", "comma separated feed instance paths"),
+    OUTPUT_FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
+    OUTPUT_FEED_PATHS("feedInstancePaths", "comma separated feed instance paths"),
 
     // broker related parameters
     TOPIC_NAME("topicName", "name of the topic to be used to send JMS message", false),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 786e94f..f5bb782 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -80,8 +80,8 @@ public class WorkflowExecutionContext {
         WorkflowExecutionArgs.NOMINAL_TIME,
         WorkflowExecutionArgs.OPERATION,
 
-        WorkflowExecutionArgs.FEED_NAMES,
-        WorkflowExecutionArgs.FEED_INSTANCE_PATHS,
+        WorkflowExecutionArgs.OUTPUT_FEED_NAMES,
+        WorkflowExecutionArgs.OUTPUT_FEED_PATHS,
 
         WorkflowExecutionArgs.WORKFLOW_ID,
         WorkflowExecutionArgs.WORKFLOW_USER,
@@ -177,7 +177,7 @@ public class WorkflowExecutionContext {
     }
 
     public String getOutputFeedNames() {
-        return getValue(WorkflowExecutionArgs.FEED_NAMES);
+        return getValue(WorkflowExecutionArgs.OUTPUT_FEED_NAMES);
     }
 
     public String[] getOutputFeedNamesList() {
@@ -185,7 +185,7 @@ public class WorkflowExecutionContext {
     }
 
     public String getOutputFeedInstancePaths() {
-        return getValue(WorkflowExecutionArgs.FEED_INSTANCE_PATHS);
+        return getValue(WorkflowExecutionArgs.OUTPUT_FEED_PATHS);
     }
 
     public String[] getOutputFeedInstancePathsList() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index b51caf8..2b030fd 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -644,8 +644,8 @@ public class MetadataMappingServiceTest {
             "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
             "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
 
-            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
-            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
 
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index e97175e..a45633b 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -273,8 +273,8 @@ public class WorkflowExecutionContextTest {
             "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
             "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
 
-            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
-            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
 
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index 2fe08e4..9a6ad98 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -135,8 +135,8 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
             "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
             "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
 
-            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
-            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
 
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 39d6fab..fc31bab 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -205,13 +205,13 @@ public class JMSMessageProducer {
 
             // override default values
             if (context.getEntityType().equalsIgnoreCase("PROCESS")) {
-                change(message, WorkflowExecutionArgs.FEED_NAMES, feedNames[i]);
+                change(message, WorkflowExecutionArgs.OUTPUT_FEED_NAMES, feedNames[i]);
             } else {
-                change(message, WorkflowExecutionArgs.FEED_NAMES,
-                        message.get(WorkflowExecutionArgs.FEED_NAMES.getName()));
+                change(message, WorkflowExecutionArgs.OUTPUT_FEED_NAMES,
+                        message.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()));
             }
 
-            change(message, WorkflowExecutionArgs.FEED_INSTANCE_PATHS, feedPaths[i]);
+            change(message, WorkflowExecutionArgs.OUTPUT_FEED_PATHS, feedPaths[i]);
             messages.add(message);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 1c10be5..c45ea1e 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -71,8 +71,8 @@ public class FeedProducerTest {
 
         args = new String[] {
             "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), TOPIC_NAME,
-            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs",
-            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs",
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
             "/click-logs/10/05/05/00/20",
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
@@ -179,7 +179,7 @@ public class FeedProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
                 "/falcon/feed/agg-logs/path1/2010/10/10/20");
 
         for (m = null; m == null;) {
@@ -187,7 +187,7 @@ public class FeedProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
                 "/falcon/feed/agg-logs/path1/2010/10/10/21");
 
         for (m = null; m == null;) {
@@ -195,7 +195,7 @@ public class FeedProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
                 "/falcon/feed/agg-logs/path1/2010/10/10/22");
 
         for (m = null; m == null;) {
@@ -203,7 +203,7 @@ public class FeedProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
                 "/falcon/feed/agg-logs/path1/2010/10/10/23");
 
         connection.close();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index b1f8271..9a4a6f7 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -105,9 +105,9 @@ public class JMSMessageConsumerTest {
         message.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), "cluster1");
         message.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), "process1");
         message.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), "PROCESS");
-        message.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+        message.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
                 "/clicks/hour/00/0" + i);
-        message.put(WorkflowExecutionArgs.FEED_NAMES.getName(), "clicks");
+        message.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "clicks");
         message.put(WorkflowExecutionArgs.LOG_FILE.getName(), "/logfile");
         message.put(WorkflowExecutionArgs.LOG_DIR.getName(), "/tmp/log");
         message.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2012-10-10-10-10");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
index 34cff77..e4ea22f 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
@@ -72,8 +72,8 @@ public class JMSMessageProducerTest {
         List<String> args = createCommonArgs();
         List<String> newArgs = new ArrayList<String>(Arrays.asList(
                 "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
-                "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs,raw-logs",
-                "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs,raw-logs",
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
                 "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
         args.addAll(newArgs);
         List<String[]> messages = new ArrayList<String[]>();
@@ -81,10 +81,10 @@ public class JMSMessageProducerTest {
         testProcessMessageCreator(messages, TOPIC_NAME);
         for (MapMessage m : mapMessages) {
             assertMessage(m);
-            Assert.assertTrue((m.getString(WorkflowExecutionArgs.FEED_NAMES.getName())
+            Assert.assertTrue((m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName())
                     .equals("click-logs,raw-logs")));
             Assert.assertTrue(m
-                    .getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName())
+                    .getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName())
                     .equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
         }
     }
@@ -94,8 +94,8 @@ public class JMSMessageProducerTest {
         List<String> args = createCommonArgs();
         List<String> newArgs = new ArrayList<String>(Arrays.asList(
                 "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
-                "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "null",
-                "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), "null"));
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "null",
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "null"));
         args.addAll(newArgs);
         List<String[]> messages = new ArrayList<String[]>();
         messages.add(args.toArray(new String[args.size()]));
@@ -103,9 +103,9 @@ public class JMSMessageProducerTest {
         for (MapMessage m : mapMessages) {
             assertMessage(m);
             assertMessage(m);
-            Assert.assertTrue(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()).equals(
+            Assert.assertTrue(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()).equals(
                     "null"));
-            Assert.assertTrue(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName())
+            Assert.assertTrue(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName())
                     .equals("null"));
         }
     }
@@ -116,8 +116,8 @@ public class JMSMessageProducerTest {
         List<String> args = createCommonArgs();
         List<String> newArgs = new ArrayList<String>(Arrays.asList(
                 "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
-                "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "raw-logs",
-                "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "raw-logs",
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
                 "/raw-logs/10/05/05/00/20"));
         args.addAll(newArgs);
         messages.add(args.toArray(new String[args.size()]));
@@ -125,8 +125,8 @@ public class JMSMessageProducerTest {
         args = createCommonArgs();
         newArgs = new ArrayList<String>(Arrays.asList(
                 "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
-                "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs",
-                "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs",
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
                 "/click-logs/10/05/05/00/20"));
         args.addAll(newArgs);
         messages.add(args.toArray(new String[args.size()]));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index ccb47df..80c2701 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -52,8 +52,8 @@ public class ProcessProducerTest {
     public void setup() throws Exception {
         args = new String[] {
             "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
-            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs,raw-logs",
-            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs,raw-logs",
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
             "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
@@ -133,8 +133,8 @@ public class ProcessProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "click-logs");
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "click-logs");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
                 "/click-logs/10/05/05/00/20");
 
         for (m = null; m == null;) {
@@ -142,8 +142,8 @@ public class ProcessProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "raw-logs");
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "raw-logs");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
                 "/raw-logs/10/05/05/00/20");
         connection.close();
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index f0864db..966f90e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -261,16 +261,17 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
                                              String falconFeedStorageType, Properties props) {
         // todo these pairs are the same but used in different context
         // late data handler - should-record action
-        props.put("falconInputFeeds", entity.getName());
-        props.put("falconInPaths", instancePaths);
+        props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), instancePaths);
+        props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), entity.getName());
 
         // storage type for each corresponding feed - in this case only one feed is involved
         // needed to compute usage based on storage type in LateDataHandler
-        props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+        props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), falconFeedStorageType);
 
         // falcon post processing
-        props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
-        props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), "${coord:dataOut('output')}");
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "${coord:dataOut('output')}");
     }
 
     private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index 3c74485..c896d5a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -86,8 +86,8 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
 
         props.put("limit", feedCluster.getRetention().getLimit().toString());
 
-        props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
-        props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
 
         props.put("falconInputFeeds", entity.getName());
         props.put("falconInPaths", IGNORE);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index a33fa62..1fa6758 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -152,12 +152,14 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
 
     private void initializeInputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
         if (entity.getInputs() == null) {
-            props.put("falconInputFeeds", "NONE");
-            props.put("falconInPaths", IGNORE);
+            props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), "NONE");
+            props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), IGNORE);
+            props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), IGNORE);
             return;
         }
 
         List<String> inputFeeds = new ArrayList<String>();
+        List<String> inputNames = new ArrayList<String>();
         List<String> inputPaths = new ArrayList<String>();
         List<String> inputFeedStorageTypes = new ArrayList<String>();
         for (Input input : entity.getInputs().getInputs()) {
@@ -190,21 +192,23 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
 
             inputFeeds.add(feed.getName());
             inputPaths.add(inputExpr);
+            inputNames.add(input.getName());
             inputFeedStorageTypes.add(storage.getType().name());
         }
 
-        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
+        propagateLateDataProperties(inputFeeds, inputNames, inputPaths, inputFeedStorageTypes, props);
     }
 
-    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputNames, List<String> inputPaths,
         List<String> inputFeedStorageTypes, Properties props) {
         // populate late data handler - should-record action
-        props.put("falconInputFeeds", StringUtils.join(inputFeeds, '#'));
-        props.put("falconInPaths", StringUtils.join(inputPaths, '#'));
+        props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(inputFeeds, '#'));
+        props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), StringUtils.join(inputNames, '#'));
+        props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), StringUtils.join(inputPaths, '#'));
 
         // storage type for each corresponding feed sent as a param to LateDataHandler
         // needed to compute usage based on storage type in LateDataHandler
-        props.put("falconInputFeedStorageTypes", StringUtils.join(inputFeedStorageTypes, '#'));
+        props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), StringUtils.join(inputFeedStorageTypes, '#'));
     }
 
     private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
@@ -250,8 +254,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
 
     private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
         if (entity.getOutputs() == null) {
-            props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), "NONE");
-            props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
+            props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "NONE");
+            props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
             return;
         }
 
@@ -289,8 +293,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         }
 
         // Output feed name and path for parent workflow
-        props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), StringUtils.join(outputFeeds, ','));
-        props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), StringUtils.join(outputPaths, ','));
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(outputFeeds, ','));
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(outputPaths, ','));
     }
 
     private DATAOUT createDataOut(Output output) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index 2714859..979d4f0 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -80,7 +80,7 @@
         <arg>${wf:user()}</arg>
         <arg>-falconInputFeeds</arg>
         <arg>${falconInputFeeds}</arg>
-        <arg>-falconInputPaths</arg>
+        <arg>-falconInPaths</arg>
         <arg>${falconInPaths}</arg>
         <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
         <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/resources/action/pre-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/pre-process.xml b/oozie/src/main/resources/action/pre-process.xml
index 127ab80..070c42b 100644
--- a/oozie/src/main/resources/action/pre-process.xml
+++ b/oozie/src/main/resources/action/pre-process.xml
@@ -39,8 +39,8 @@
         <arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
         <arg>-paths</arg>
         <arg>${falconInPaths}</arg>
-        <arg>-falconInputFeeds</arg>
-        <arg>${falconInputFeeds}</arg>
+        <arg>-falconInputNames</arg>
+        <arg>${falconInputNames}</arg>
         <arg>-falconInputFeedStorageTypes</arg>
         <arg>${falconInputFeedStorageTypes}</arg>
         <capture-output/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index e47895f..3c49353 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -48,6 +48,7 @@ import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -206,6 +207,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         // verify the late data params
         Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), feed.getName());
         Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
         Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
@@ -443,6 +445,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         // verify the late data params
         Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
         Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), tableFeed.getName());
         Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
 
         // verify the post processing params

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 4daf5d8..b1d7a8a 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -188,6 +188,9 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
         assertEquals(props.get("mapred.job.priority"), "LOW");
+        List<Input> inputs = process.getInputs().getInputs();
+        assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), inputs.get(0).getName() + "#" + inputs
+            .get(1).getName());
 
         verifyEntityProperties(process, cluster,
                 WorkflowExecutionContext.EntityOperations.GENERATE, props);
@@ -558,6 +561,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
         Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+        Assert.assertEquals(props.get("falconInputs"), process.getInputs().getInputs().get(0).getName());
 
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
@@ -688,10 +692,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         verifyBrokerProperties(cluster, props);
 
         String[] expected = {
-            WorkflowExecutionArgs.FEED_NAMES.getName(),
-            WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+            WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
+            WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
             WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
-            "falconInPaths",
+            WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
+            WorkflowExecutionArgs.INPUT_NAMES.getName(),
             WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
             WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
             WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
@@ -729,7 +734,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         verifyBrokerProperties(cluster, props);
 
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.FEED_NAMES.getName()), "NONE");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "NONE");
     }
 
     @Test
@@ -758,7 +763,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 WorkflowExecutionContext.EntityOperations.GENERATE, props);
         verifyBrokerProperties(cluster, props);
 
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.FEED_NAMES.getName()), "impressions");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "impressions");
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "NONE");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 91559a5..201b682 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -46,8 +46,8 @@ public class FalconPostProcessingTest {
     public void setup() throws Exception {
         args = new String[]{
             "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
-            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "out-click-logs,out-raw-logs",
-            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "out-click-logs,out-raw-logs",
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
             "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20",
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
@@ -131,13 +131,13 @@ public class FalconPostProcessingTest {
 
         assertMessage(m);
         if (topic.equals(FALCON_TOPIC_NAME)) {
-            Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()),
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
                     "out-click-logs,out-raw-logs");
-            Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
                     "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20");
         } else {
-            Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "out-click-logs");
-            Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "out-click-logs");
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
                     "/out-click-logs/10/05/05/00/20");
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index d9e9f86..9bef7f5 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -542,8 +542,8 @@ public class LineageMetadataResourceTest {
             "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
             "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
 
-            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
-            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
 
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 75de12e..d854bdd 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -72,12 +73,12 @@ public class LateDataHandler extends Configured implements Tool {
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("falconInputFeeds", true,
+        opt = new Option(WorkflowExecutionArgs.INPUT_NAMES.getName(), true,
                 "Input feed names, further separated by #");
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("falconInputFeedStorageTypes", true,
+        opt = new Option(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), true,
                 "Feed storage types corresponding to Input feed names, separated by #");
         opt.setRequired(true);
         options.addOption(opt);
@@ -94,9 +95,10 @@ public class LateDataHandler extends Configured implements Tool {
             return 0;
         }
 
-        String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split("#");
+        String[] inputFeeds = getOptionValue(command, WorkflowExecutionArgs.INPUT_NAMES.getName()).split("#");
         String[] pathGroups = pathStr.split("#");
-        String[] inputFeedStorageTypes = getOptionValue(command, "falconInputFeedStorageTypes").split("#");
+        String[] inputFeedStorageTypes =
+            getOptionValue(command, WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()).split("#");
 
         Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 16e340e..80a3b83 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -26,6 +26,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.latedata.LateDataHandler;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -87,11 +88,12 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
         LateDataHandler late = new LateDataHandler();
         Properties properties = handler.getWfEngine().getWorkflowProperties(
                 message.getClusterName(), message.getWfId());
-        String falconInputFeeds = properties.getProperty("falconInputFeeds");
-        String falconInPaths = properties.getProperty("falconInPaths");
-        String falconInputFeedStorageTypes = properties.getProperty("falconInputFeedStorageTypes");
-        String logDir = properties.getProperty("logDir");
-        String nominalTime = properties.getProperty("nominalTime");
+        String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
+        String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());
+        String falconInputFeedStorageTypes =
+            properties.getProperty(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName());
+        String logDir = properties.getProperty(WorkflowExecutionArgs.LOG_DIR.getName());
+        String nominalTime = properties.getProperty(WorkflowExecutionArgs.NOMINAL_TIME.getName());
         String srcClusterName = properties.getProperty("srcClusterName");
         Path lateLogPath = handler.getLateLogPath(logDir, nominalTime, srcClusterName);
 
@@ -104,22 +106,22 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
         }
 
         String[] pathGroups = falconInPaths.split("#");
-        String[] inputFeeds = falconInputFeeds.split("#");
+        String[] inputs = falconInputs.split("#");
         String[] inputFeedStorageTypes = falconInputFeedStorageTypes.split("#");
 
         Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
         Entity entity = EntityUtil.getEntity(message.getEntityType(), message.getEntityName());
         if (EntityUtil.getLateProcess(entity) != null) {
-            List<String> lateFeed = new ArrayList<String>();
+            List<String> lateInput = new ArrayList<String>();
             for (LateInput li : EntityUtil.getLateProcess(entity).getLateInputs()) {
-                lateFeed.add(li.getInput());
+                lateInput.add(li.getInput());
             }
 
             for (int index = 0; index < pathGroups.length; index++) {
-                if (lateFeed.contains(inputFeeds[index])) {
+                if (lateInput.contains(inputs[index])) {
                     long computedMetric = late.computeStorageMetric(
                             pathGroups[index], inputFeedStorageTypes[index], conf);
-                    computedMetrics.put(inputFeeds[index], computedMetric);
+                    computedMetrics.put(inputs[index], computedMetric);
                 }
             }
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index ab60307..96c99c5 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -118,7 +118,7 @@ public class LateDataHandlerIT {
         String[] args = {
             "-out", lateDataDir,
             "-paths", feedUriTemplate,
-            "-falconInputFeeds", "foo",
+            "-falconInputNames", "foo",
             "-falconInputFeedStorageTypes", "TABLE",
         };
 
@@ -158,7 +158,7 @@ public class LateDataHandlerIT {
         String[] args = {
             "-out", lateDataDir,
             "-paths", feedUriTemplate,
-            "-falconInputFeeds", "foo",
+            "-falconInputNames", "foo",
             "-falconInputFeedStorageTypes", "TABLE",
         };
 


Mime
View raw message