falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [1/2] FACON-481 Simplify process parent workflow. Contributed by Shwetha GS
Date Thu, 31 Jul 2014 10:14:14 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master ed100c8ac -> 3bb5a62af


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/process/pig-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/process/pig-action.xml b/oozie/src/main/resources/action/process/pig-action.xml
new file mode 100644
index 0000000..65b548c
--- /dev/null
+++ b/oozie/src/main/resources/action/process/pig-action.xml
@@ -0,0 +1,40 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<action name='user-action' xmlns="uri:oozie:workflow:0.3">
+    <pig>
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+            <property>
+                <name>oozie.action.sharelib.for.pig</name>
+                <value>pig,hcatalog</value>
+            </property>
+        </configuration>
+        <script>#USER_WF_PATH#</script>
+    </pig>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/workflow/falcon-table-export.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/falcon-table-export.hql b/oozie/src/main/resources/workflow/falcon-table-export.hql
deleted file mode 100644
index 37fd1b7..0000000
--- a/oozie/src/main/resources/workflow/falcon-table-export.hql
+++ /dev/null
@@ -1,18 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-export table ${falconSourceDatabase}.${falconSourceTable} partition ${falconSourcePartition} to '${falconSourceStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/workflow/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/falcon-table-import.hql b/oozie/src/main/resources/workflow/falcon-table-import.hql
deleted file mode 100644
index 653d580..0000000
--- a/oozie/src/main/resources/workflow/falcon-table-import.hql
+++ /dev/null
@@ -1,20 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-use ${falconTargetDatabase};
-alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
-import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/process-parent-workflow.xml b/oozie/src/main/resources/workflow/process-parent-workflow.xml
deleted file mode 100644
index 4a2495c..0000000
--- a/oozie/src/main/resources/workflow/process-parent-workflow.xml
+++ /dev/null
@@ -1,278 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-process-parent-workflow'>
-    <start to='should-record'/>
-    <decision name='should-record'>
-        <switch>
-            <case to="recordsize">
-                ${shouldRecord=="true"}
-            </case>
-            <default to="user-workflow"/>
-        </switch>
-    </decision>
-    <action name='recordsize'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-                <!-- HCatalog jars -->
-                <property>
-                    <name>oozie.action.sharelib.for.java</name>
-                    <value>hcatalog</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
-            <arg>-out</arg>
-            <arg>${logDir}/latedata/${nominalTime}</arg>
-            <arg>-paths</arg>
-            <arg>${falconInPaths}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputFeedStorageTypes</arg>
-            <arg>${falconInputFeedStorageTypes}</arg>
-            <capture-output/>
-        </java>
-        <ok to="user-workflow"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <decision name='user-workflow'>
-        <switch>
-            <case to="user-oozie-workflow">
-                ${userWorkflowEngine=="oozie"}
-            </case>
-            <case to="user-pig-job">
-                ${userWorkflowEngine=="pig"}
-            </case>
-            <case to="user-hive-job">
-                ${userWorkflowEngine=="hive"}
-            </case>
-            <default to="user-oozie-workflow"/>
-        </switch>
-    </decision>
-    <action name='user-pig-job'>
-        <pig>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-                <property>
-                    <name>oozie.action.sharelib.for.pig</name>
-                    <value>pig,hcatalog</value>
-                </property>
-            </configuration>
-            <script>#USER_WF_PATH#</script>
-        </pig>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name="user-hive-job">
-        <hive xmlns="uri:oozie:hive-action:0.2">
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <script>#USER_WF_PATH#</script>
-        </hive>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name='user-oozie-workflow'>
-        <sub-workflow>
-            <app-path>#USER_WF_PATH#</app-path>
-            <propagate-configuration/>
-        </sub-workflow>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name='succeeded-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>GENERATE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>SUCCEEDED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
-            <arg>-userWorkflowEngine</arg>
-            <arg>${userWorkflowEngine}</arg>
-            <arg>-userWorkflowName</arg>
-            <arg>${userWorkflowName}</arg>
-            <arg>-userWorkflowVersion</arg>
-            <arg>${userWorkflowVersion}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputPaths</arg>
-            <arg>${falconInPaths}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="end"/>
-        <error to="fail"/>
-    </action>
-    <action name='failed-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>GENERATE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>FAILED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
-            <arg>-userWorkflowEngine</arg>
-            <arg>${userWorkflowEngine}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="fail"/>
-        <error to="fail"/>
-    </action>
-    <kill name="fail">
-        <message>
-            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-        </message>
-    </kill>
-    <end name='end'/>
-</workflow-app>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/replication-workflow.xml b/oozie/src/main/resources/workflow/replication-workflow.xml
deleted file mode 100644
index 0748acf..0000000
--- a/oozie/src/main/resources/workflow/replication-workflow.xml
+++ /dev/null
@@ -1,330 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-feed-parent-workflow'>
-    <start to='should-record'/>
-    <decision name='should-record'>
-        <switch>
-            <case to="recordsize">
-                ${shouldRecord=="true"}
-            </case>
-            <default to="replication-decision"/>
-        </switch>
-    </decision>
-    <action name='recordsize'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-                <!-- HCatalog jars -->
-                <property>
-                    <name>oozie.action.sharelib.for.java</name>
-                    <value>hcatalog</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
-            <arg>-out</arg>
-            <arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
-            <arg>-paths</arg>
-            <arg>${falconInPaths}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputFeedStorageTypes</arg>
-            <arg>${falconInputFeedStorageTypes}</arg>
-            <capture-output/>
-        </java>
-        <ok to="replication-decision"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <decision name="replication-decision">
-        <switch>
-            <case to="table-export">
-                ${falconFeedStorageType == "TABLE"}
-            </case>
-            <default to="replication"/>
-        </switch>
-    </decision>
-    <!-- Table Replication - Export data and metadata to HDFS Staging from Source Hive -->
-    <action name="table-export">
-        <hive xmlns="uri:oozie:hive-action:0.2">
-            <job-tracker>${falconSourceJobTracker}</job-tracker>
-            <name-node>${falconSourceNameNode}</name-node>
-            <prepare>
-                <delete path="${distcpSourcePaths}"/>
-            </prepare>
-            <job-xml>${wf:appPath()}/conf/falcon-source-hive-site.xml</job-xml>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <script>${wf:appPath()}/scripts/falcon-table-export.hql</script>
-            <param>falconSourceDatabase=${falconSourceDatabase}</param>
-            <param>falconSourceTable=${falconSourceTable}</param>
-            <param>falconSourcePartition=${falconSourcePartition}</param>
-            <param>falconSourceStagingDir=${distcpSourcePaths}</param>
-        </hive>
-        <ok to="replication"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <!-- Replication action -->
-    <action name="replication">
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property> <!-- hadoop 2 parameter -->
-                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
-                    <value>true</value>
-                </property>
-                <property> <!-- hadoop 1 parameter -->
-                    <name>oozie.launcher.mapreduce.user.classpath.first</name>
-                    <value>true</value>
-                </property>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.replication.FeedReplicator</main-class>
-            <arg>-Dfalcon.include.path=${sourceRelativePaths}</arg>
-            <arg>-Dmapred.job.queue.name=${queueName}</arg>
-            <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-maxMaps</arg>
-            <arg>${maxMaps}</arg>
-            <arg>-mapBandwidthKB</arg>
-            <arg>${mapBandwidthKB}</arg>
-            <arg>-sourcePaths</arg>
-            <arg>${distcpSourcePaths}</arg>
-            <arg>-targetPath</arg>
-            <arg>${distcpTargetPaths}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
-            <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file>
-        </java>
-        <ok to="post-replication-decision"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <decision name="post-replication-decision">
-        <switch>
-            <case to="table-import">
-                ${falconFeedStorageType == "TABLE"}
-            </case>
-            <default to="succeeded-post-processing"/>
-        </switch>
-    </decision>
-    <!-- Table Replication - Import data and metadata from HDFS Staging into Target Hive -->
-    <action name="table-import">
-        <hive xmlns="uri:oozie:hive-action:0.2">
-            <job-tracker>${falconTargetJobTracker}</job-tracker>
-            <name-node>${falconTargetNameNode}</name-node>
-            <job-xml>${wf:appPath()}/conf/falcon-target-hive-site.xml</job-xml>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <script>${wf:appPath()}/scripts/falcon-table-import.hql</script>
-            <param>falconTargetDatabase=${falconTargetDatabase}</param>
-            <param>falconTargetTable=${falconTargetTable}</param>
-            <param>falconTargetPartition=${falconTargetPartition}</param>
-            <param>falconTargetStagingDir=${distcpTargetPaths}</param>
-        </hive>
-        <ok to="cleanup-table-staging-dir"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name="cleanup-table-staging-dir">
-        <fs>
-            <delete path="${distcpSourcePaths}"/>
-            <delete path="${distcpTargetPaths}"/>
-        </fs>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name='succeeded-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>REPLICATE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>SUCCEEDED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-userWorkflowName</arg>
-            <arg>${userWorkflowName}</arg>
-            <arg>-userWorkflowVersion</arg>
-            <arg>${userWorkflowVersion}</arg>
-            <arg>-userWorkflowEngine</arg>
-            <arg>${userWorkflowEngine}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputPaths</arg>
-            <arg>${falconInPaths}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="end"/>
-        <error to="fail"/>
-    </action>
-    <action name='failed-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>REPLICATE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>FAILED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="fail"/>
-        <error to="fail"/>
-    </action>
-    <kill name="fail">
-        <message>
-            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-        </message>
-    </kill>
-    <end name='end'/>
-</workflow-app>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/retention-workflow.xml b/oozie/src/main/resources/workflow/retention-workflow.xml
deleted file mode 100644
index 5138865..0000000
--- a/oozie/src/main/resources/workflow/retention-workflow.xml
+++ /dev/null
@@ -1,208 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-feed-parent-workflow'>
-    <start to='eviction'/>
-    <action name="eviction">
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-                <!-- HCatalog jars -->
-                <property>
-                    <name>oozie.action.sharelib.for.java</name>
-                    <value>hcatalog</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.retention.FeedEvictor</main-class>
-            <arg>-feedBasePath</arg>
-            <arg>${feedDataPath}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
-            <arg>-retentionType</arg>
-            <arg>instance</arg>
-            <arg>-retentionLimit</arg>
-            <arg>${limit}</arg>
-            <arg>-timeZone</arg>
-            <arg>${timeZone}</arg>
-            <arg>-frequency</arg>
-            <arg>${frequency}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
-        </java>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name='succeeded-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>DELETE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>SUCCEEDED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <arg>-userWorkflowName</arg>
-            <arg>${userWorkflowName}</arg>
-            <arg>-userWorkflowVersion</arg>
-            <arg>${userWorkflowVersion}</arg>
-            <arg>-userWorkflowEngine</arg>
-            <arg>${userWorkflowEngine}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputPaths</arg>
-            <arg>${falconInPaths}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="end"/>
-        <error to="fail"/>
-    </action>
-    <action name='failed-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>DELETE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>FAILED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="fail"/>
-        <error to="fail"/>
-    </action>
-    <kill name="fail">
-        <message>
-            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-        </message>
-    </kill>
-    <end name='end'/>
-</workflow-app>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index c99c36c..fc1af7b 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -43,11 +43,11 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.process.AbstractTestBase;
 import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DECISION;
 import org.apache.falcon.oozie.workflow.JAVA;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -241,7 +241,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     }
 
     private String getWorkflowAppPath() {
-        return "${nameNode}/projects/falcon/REPLICATION/" + srcCluster.getName() + "/workflow.xml";
+        return "${nameNode}/projects/falcon/REPLICATION/" + srcCluster.getName();
     }
 
     private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
@@ -312,14 +312,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
 
         WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
-        assertWorkflowDefinition(fsReplFeed, workflow);
-
-        List<Object> actions = workflow.getDecisionOrForkOrJoin();
-        System.out.println("actions = " + actions);
-
-        ACTION replicationActionNode = (ACTION) actions.get(4);
-        Assert.assertEquals(replicationActionNode.getName(), "replication");
+        assertWorkflowDefinition(fsReplFeed, workflow, false);
 
+        ACTION replicationActionNode = getAction(workflow, "replication");
         JAVA replication = replicationActionNode.getJava();
         List<String> args = replication.getArg();
         Assert.assertEquals(args.size(), 13);
@@ -339,20 +334,21 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, aFeed));
     }
 
-    public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
-        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
-        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
-        Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
-        Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
-        Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
-        Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
-        Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
-        Assert.assertEquals("cleanup-table-staging-dir", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(9)).getName());
+    public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) {
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), workflow.getName());
+
+        boolean preProcess = RuntimeProperties.get().getProperty("feed.late.allowed", "true").equalsIgnoreCase("true");
+        if (preProcess) {
+            assertAction(workflow, "pre-processing", true);
+        }
+        assertAction(workflow, "replication", false);
+        assertAction(workflow, "succeeded-post-processing", true);
+        assertAction(workflow, "failed-post-processing", true);
+
+        if (isTable) {
+            assertAction(workflow, "table-import", false);
+            assertAction(workflow, "table-export", false);
+        }
     }
 
     @DataProvider(name = "secureOptions")
@@ -409,7 +405,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals("${now(0,-40)}", outEventInstance);
 
         // assert FS staging area
-        Path wfPath = new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "")).getParent();
+        Path wfPath = new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
         final FileSystem fs = trgMiniDFS.getFileSystem();
         Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
         Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
@@ -525,8 +521,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         List<Properties> coords = builder.buildCoords(srcCluster, new Path("/projects/falcon/"));
         COORDINATORAPP coord = getCoordinator(srcMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
 
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(),
-            "${nameNode}/projects/falcon/RETENTION/workflow.xml");
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
@@ -572,8 +567,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         List<Properties> coords = builder.buildCoords(trgCluster, new Path("/projects/falcon/"));
         COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
 
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(),
-            "${nameNode}/projects/falcon/RETENTION/workflow.xml");
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + tableFeed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
@@ -608,7 +602,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
         assertHCatCredentials(
             getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
-            new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "")).getParent().toString());
+            new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "")).toString());
     }
 
     private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index cc0c419..a0962fc 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -173,7 +173,7 @@ public class AbstractTestBase {
     @SuppressWarnings("unchecked")
     protected WORKFLOWAPP getWorkflowapp(FileSystem fs, COORDINATORAPP coord) throws JAXBException, IOException {
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        return getWorkflowapp(fs, new Path(wfPath));
+        return getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
     }
 
     @SuppressWarnings("unchecked")
@@ -181,4 +181,22 @@ public class AbstractTestBase {
         JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
         return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(fs.open(path))).getValue();
     }
+
+    protected ACTION getAction(WORKFLOWAPP wf, String name) {
+        for (Object action : wf.getDecisionOrForkOrJoin()) {
+            if (action instanceof ACTION && ((ACTION) action).getName().equals(name)) {
+                return (ACTION) action;
+            }
+        }
+        throw new IllegalArgumentException("Invalid action name " + name);
+    }
+
+    protected void assertAction(WORKFLOWAPP wf, String name, boolean assertRetry) {
+        ACTION action = getAction(wf, name);
+        Assert.assertNotNull(action);
+        if (assertRetry) {
+            Assert.assertEquals(action.getRetryMax(), "3");
+            Assert.assertEquals(action.getRetryInterval(), "1");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 3655af9..4b453c7 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -48,7 +48,6 @@ import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DECISION;
 import org.apache.falcon.oozie.workflow.PIG;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
@@ -211,6 +210,9 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
         WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
         testParentWorkflow(process, parentWorkflow);
+
+        ACTION oozieAction = getAction(parentWorkflow, "user-action");
+        Assert.assertNotNull(oozieAction.getSubWorkflow());
     }
 
     @Test
@@ -232,10 +234,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
         testParentWorkflow(process, parentWorkflow);
 
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
-        Assert.assertEquals("user-pig-job", pigActionNode.getName());
+        ACTION pigActionNode = getAction(parentWorkflow, "user-action");
 
         final PIG pigAction = pigActionNode.getPig();
         Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
@@ -245,10 +244,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(5, pigAction.getParam().size());
         Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
         Assert.assertTrue(pigAction.getFile().size() > 0);
-
-        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
-        Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
-        Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
     }
 
     @DataProvider(name = "secureOptions")
@@ -305,13 +300,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
         testParentWorkflow(process, parentWorkflow);
 
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
-        Assert.assertEquals("user-hive-job", hiveNode.getName());
+        ACTION hiveNode = getAction(parentWorkflow, "user-action");
 
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
@@ -363,13 +355,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
         testParentWorkflow(process, parentWorkflow);
 
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
-        Assert.assertEquals("user-hive-job", hiveNode.getName());
+        ACTION hiveNode = getAction(parentWorkflow, "user-action");
 
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
@@ -421,13 +410,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
         testParentWorkflow(process, parentWorkflow);
 
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
-        Assert.assertEquals("user-hive-job", hiveNode.getName());
+        ACTION hiveNode = getAction(parentWorkflow, "user-action");
 
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
@@ -475,13 +461,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
         testParentWorkflow(process, parentWorkflow);
 
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
-        Assert.assertEquals("user-hive-job", hiveNode.getName());
+        ACTION hiveNode = getAction(parentWorkflow, "user-action");
 
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
@@ -497,7 +480,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
     }
 
     private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
-        Path hiveConfPath = new Path(new Path(wfPath).getParent(), "conf/hive-site.xml");
+        Path hiveConfPath = new Path(new Path(wfPath), "conf/hive-site.xml");
         Assert.assertTrue(fs.exists(hiveConfPath));
 
         if (SecurityUtil.isSecurityEnabled()) {
@@ -586,7 +569,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
 
         Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
         assertHCatCredentials(parentWorkflow, wfPath);
@@ -620,7 +603,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
             props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
             props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
             props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
-            props.put(prefix + "_datain_partitions_hive", "${coord:dataInPartitions('input', 'hive-export')}");
         } else if (prefix.equals("falcon_output")) {
             props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
         }
@@ -654,27 +636,18 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         assertEquals(coord.getControls().getTimeout(), timeout);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        return getWorkflowapp(fs, new Path(wfPath));
+        return getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
     }
 
     public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
         Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
 
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
-        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
-        Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
-        Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
-        Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
-        Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryInterval());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryInterval());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
+        if (process.getLateProcess() != null) {
+            assertAction(parentWorkflow, "pre-processing", true);
+        }
+        assertAction(parentWorkflow, "succeeded-post-processing", true);
+        assertAction(parentWorkflow, "failed-post-processing", true);
+        assertAction(parentWorkflow, "user-action", false);
     }
 
     @AfterMethod

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/src/main/examples/entity/hcat/hcat-out-feed.xml
----------------------------------------------------------------------
diff --git a/src/main/examples/entity/hcat/hcat-out-feed.xml b/src/main/examples/entity/hcat/hcat-out-feed.xml
index 26d1f18..befd88f 100644
--- a/src/main/examples/entity/hcat/hcat-out-feed.xml
+++ b/src/main/examples/entity/hcat/hcat-out-feed.xml
@@ -17,8 +17,6 @@
   limitations under the License.
   -->
 <feed description="output" name="hcat-out" xmlns="uri:falcon:feed:0.1">
-    <groups>output</groups>
-
     <frequency>minutes(5)</frequency>
     <timezone>UTC</timezone>
     <late-arrival cut-off="hours(1)"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/src/main/examples/entity/hcat/hcat-pig-process.xml
----------------------------------------------------------------------
diff --git a/src/main/examples/entity/hcat/hcat-pig-process.xml b/src/main/examples/entity/hcat/hcat-pig-process.xml
index 6ee17eb..60836c4 100644
--- a/src/main/examples/entity/hcat/hcat-pig-process.xml
+++ b/src/main/examples/entity/hcat/hcat-pig-process.xml
@@ -31,12 +31,12 @@
 
     <inputs>
         <!-- In the pig script, the input paths will be available in a variable 'inparts' -->
-        <input name="inparts" feed="in" start="now(0,-5)" end="now(0,-1)"/>
+        <input name="inparts" feed="hcat-in" start="now(0,-5)" end="now(0,-1)"/>
     </inputs>
 
     <outputs>
         <!-- In the pig script, the output path will be available in a variable 'outparts' -->
-        <output name="outpart" feed="out" instance="now(0,0)"/>
+        <output name="outpart" feed="hcat-out" instance="now(0,0)"/>
     </outputs>
 
     <workflow engine="pig" path="/app/pig/hcat-wordcount.pig"/>


Mime
View raw message