falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [5/9] FALCON-369 Refactor workflow builder. Contributed by Shwetha GS
Date Thu, 10 Jul 2014 06:57:34 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/replication-workflow.xml b/oozie/src/main/resources/workflow/replication-workflow.xml
new file mode 100644
index 0000000..0748acf
--- /dev/null
+++ b/oozie/src/main/resources/workflow/replication-workflow.xml
@@ -0,0 +1,330 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-feed-parent-workflow'>
+    <start to='should-record'/>
+    <decision name='should-record'>
+        <switch>
+            <case to="recordsize">
+                ${shouldRecord=="true"}
+            </case>
+            <default to="replication-decision"/>
+        </switch>
+    </decision>
+    <action name='recordsize'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <!-- HCatalog jars -->
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
+            <arg>-out</arg>
+            <arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
+            <arg>-paths</arg>
+            <arg>${falconInPaths}</arg>
+            <arg>-falconInputFeeds</arg>
+            <arg>${falconInputFeeds}</arg>
+            <arg>-falconInputFeedStorageTypes</arg>
+            <arg>${falconInputFeedStorageTypes}</arg>
+            <capture-output/>
+        </java>
+        <ok to="replication-decision"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <decision name="replication-decision">
+        <switch>
+            <case to="table-export">
+                ${falconFeedStorageType == "TABLE"}
+            </case>
+            <default to="replication"/>
+        </switch>
+    </decision>
+    <!-- Table Replication - Export data and metadata to HDFS Staging from Source Hive -->
+    <action name="table-export">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${falconSourceJobTracker}</job-tracker>
+            <name-node>${falconSourceNameNode}</name-node>
+            <prepare>
+                <delete path="${distcpSourcePaths}"/>
+            </prepare>
+            <job-xml>${wf:appPath()}/conf/falcon-source-hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>${wf:appPath()}/scripts/falcon-table-export.hql</script>
+            <param>falconSourceDatabase=${falconSourceDatabase}</param>
+            <param>falconSourceTable=${falconSourceTable}</param>
+            <param>falconSourcePartition=${falconSourcePartition}</param>
+            <param>falconSourceStagingDir=${distcpSourcePaths}</param>
+        </hive>
+        <ok to="replication"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <!-- Replication action -->
+    <action name="replication">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property> <!-- hadoop 1 parameter -->
+                    <name>oozie.launcher.mapreduce.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.replication.FeedReplicator</main-class>
+            <arg>-Dfalcon.include.path=${sourceRelativePaths}</arg>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-maxMaps</arg>
+            <arg>${maxMaps}</arg>
+            <arg>-mapBandwidthKB</arg>
+            <arg>${mapBandwidthKB}</arg>
+            <arg>-sourcePaths</arg>
+            <arg>${distcpSourcePaths}</arg>
+            <arg>-targetPath</arg>
+            <arg>${distcpTargetPaths}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
+            <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file>
+        </java>
+        <ok to="post-replication-decision"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <decision name="post-replication-decision">
+        <switch>
+            <case to="table-import">
+                ${falconFeedStorageType == "TABLE"}
+            </case>
+            <default to="succeeded-post-processing"/>
+        </switch>
+    </decision>
+    <!-- Table Replication - Import data and metadata from HDFS Staging into Target Hive -->
+    <action name="table-import">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${falconTargetJobTracker}</job-tracker>
+            <name-node>${falconTargetNameNode}</name-node>
+            <job-xml>${wf:appPath()}/conf/falcon-target-hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>${wf:appPath()}/scripts/falcon-table-import.hql</script>
+            <param>falconTargetDatabase=${falconTargetDatabase}</param>
+            <param>falconTargetTable=${falconTargetTable}</param>
+            <param>falconTargetPartition=${falconTargetPartition}</param>
+            <param>falconTargetStagingDir=${distcpTargetPaths}</param>
+        </hive>
+        <ok to="cleanup-table-staging-dir"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <action name="cleanup-table-staging-dir">
+        <fs>
+            <delete path="${distcpSourcePaths}"/>
+            <delete path="${distcpTargetPaths}"/>
+        </fs>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <action name='succeeded-post-processing'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+            <arg>-cluster</arg>
+            <arg>${cluster}</arg>
+            <arg>-entityType</arg>
+            <arg>${entityType}</arg>
+            <arg>-entityName</arg>
+            <arg>${entityName}</arg>
+            <arg>-nominalTime</arg>
+            <arg>${nominalTime}</arg>
+            <arg>-operation</arg>
+            <arg>REPLICATE</arg>
+            <arg>-workflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-runId</arg>
+            <arg>${wf:run()}</arg>
+            <arg>-status</arg>
+            <arg>SUCCEEDED</arg>
+            <arg>-timeStamp</arg>
+            <arg>${timeStamp}</arg>
+            <arg>-brokerImplClass</arg>
+            <arg>${wf:conf("broker.impl.class")}</arg>
+            <arg>-brokerUrl</arg>
+            <arg>${wf:conf("broker.url")}</arg>
+            <arg>-userBrokerImplClass</arg>
+            <arg>${userBrokerImplClass}</arg>
+            <arg>-userBrokerUrl</arg>
+            <arg>${userBrokerUrl}</arg>
+            <arg>-brokerTTL</arg>
+            <arg>${wf:conf("broker.ttlInMins")}</arg>
+            <arg>-feedNames</arg>
+            <arg>${feedNames}</arg>
+            <arg>-feedInstancePaths</arg>
+            <arg>${feedInstancePaths}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
+            <arg>-workflowEngineUrl</arg>
+            <arg>${workflowEngineUrl}</arg>
+            <arg>-userWorkflowName</arg>
+            <arg>${userWorkflowName}</arg>
+            <arg>-userWorkflowVersion</arg>
+            <arg>${userWorkflowVersion}</arg>
+            <arg>-userWorkflowEngine</arg>
+            <arg>${userWorkflowEngine}</arg>
+            <arg>-subflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-logDir</arg>
+            <arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
+            <arg>-falconInputFeeds</arg>
+            <arg>${falconInputFeeds}</arg>
+            <arg>-falconInputPaths</arg>
+            <arg>${falconInPaths}</arg>
+            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+        </java>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <action name='failed-post-processing'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+            <arg>-cluster</arg>
+            <arg>${cluster}</arg>
+            <arg>-entityType</arg>
+            <arg>${entityType}</arg>
+            <arg>-entityName</arg>
+            <arg>${entityName}</arg>
+            <arg>-nominalTime</arg>
+            <arg>${nominalTime}</arg>
+            <arg>-operation</arg>
+            <arg>REPLICATE</arg>
+            <arg>-workflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-runId</arg>
+            <arg>${wf:run()}</arg>
+            <arg>-status</arg>
+            <arg>FAILED</arg>
+            <arg>-timeStamp</arg>
+            <arg>${timeStamp}</arg>
+            <arg>-brokerImplClass</arg>
+            <arg>${wf:conf("broker.impl.class")}</arg>
+            <arg>-brokerUrl</arg>
+            <arg>${wf:conf("broker.url")}</arg>
+            <arg>-userBrokerImplClass</arg>
+            <arg>${userBrokerImplClass}</arg>
+            <arg>-userBrokerUrl</arg>
+            <arg>${userBrokerUrl}</arg>
+            <arg>-brokerTTL</arg>
+            <arg>${wf:conf("broker.ttlInMins")}</arg>
+            <arg>-feedNames</arg>
+            <arg>${feedNames}</arg>
+            <arg>-feedInstancePaths</arg>
+            <arg>${feedInstancePaths}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
+            <arg>-workflowEngineUrl</arg>
+            <arg>${workflowEngineUrl}</arg>
+            <arg>-subflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-logDir</arg>
+            <arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
+            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+        </java>
+        <ok to="fail"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>
+            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+        </message>
+    </kill>
+    <end name='end'/>
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/retention-workflow.xml b/oozie/src/main/resources/workflow/retention-workflow.xml
new file mode 100644
index 0000000..5138865
--- /dev/null
+++ b/oozie/src/main/resources/workflow/retention-workflow.xml
@@ -0,0 +1,208 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-feed-parent-workflow'>
+    <start to='eviction'/>
+    <action name="eviction">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <!-- HCatalog jars -->
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.retention.FeedEvictor</main-class>
+            <arg>-feedBasePath</arg>
+            <arg>${feedDataPath}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
+            <arg>-retentionType</arg>
+            <arg>instance</arg>
+            <arg>-retentionLimit</arg>
+            <arg>${limit}</arg>
+            <arg>-timeZone</arg>
+            <arg>${timeZone}</arg>
+            <arg>-frequency</arg>
+            <arg>${frequency}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+        </java>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <action name='succeeded-post-processing'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+            <arg>-cluster</arg>
+            <arg>${cluster}</arg>
+            <arg>-entityType</arg>
+            <arg>${entityType}</arg>
+            <arg>-entityName</arg>
+            <arg>${entityName}</arg>
+            <arg>-nominalTime</arg>
+            <arg>${nominalTime}</arg>
+            <arg>-operation</arg>
+            <arg>DELETE</arg>
+            <arg>-workflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-runId</arg>
+            <arg>${wf:run()}</arg>
+            <arg>-status</arg>
+            <arg>SUCCEEDED</arg>
+            <arg>-timeStamp</arg>
+            <arg>${timeStamp}</arg>
+            <arg>-brokerImplClass</arg>
+            <arg>${wf:conf("broker.impl.class")}</arg>
+            <arg>-brokerUrl</arg>
+            <arg>${wf:conf("broker.url")}</arg>
+            <arg>-userBrokerImplClass</arg>
+            <arg>${userBrokerImplClass}</arg>
+            <arg>-userBrokerUrl</arg>
+            <arg>${userBrokerUrl}</arg>
+            <arg>-brokerTTL</arg>
+            <arg>${wf:conf("broker.ttlInMins")}</arg>
+            <arg>-feedNames</arg>
+            <arg>${feedNames}</arg>
+            <arg>-feedInstancePaths</arg>
+            <arg>${feedInstancePaths}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+            <arg>-workflowEngineUrl</arg>
+            <arg>${workflowEngineUrl}</arg>
+            <arg>-subflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-logDir</arg>
+            <arg>${logDir}/job-${nominalTime}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
+            <arg>-userWorkflowName</arg>
+            <arg>${userWorkflowName}</arg>
+            <arg>-userWorkflowVersion</arg>
+            <arg>${userWorkflowVersion}</arg>
+            <arg>-userWorkflowEngine</arg>
+            <arg>${userWorkflowEngine}</arg>
+            <arg>-falconInputFeeds</arg>
+            <arg>${falconInputFeeds}</arg>
+            <arg>-falconInputPaths</arg>
+            <arg>${falconInPaths}</arg>
+            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+        </java>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <action name='failed-post-processing'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+            <arg>-cluster</arg>
+            <arg>${cluster}</arg>
+            <arg>-entityType</arg>
+            <arg>${entityType}</arg>
+            <arg>-entityName</arg>
+            <arg>${entityName}</arg>
+            <arg>-nominalTime</arg>
+            <arg>${nominalTime}</arg>
+            <arg>-operation</arg>
+            <arg>DELETE</arg>
+            <arg>-workflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-runId</arg>
+            <arg>${wf:run()}</arg>
+            <arg>-status</arg>
+            <arg>FAILED</arg>
+            <arg>-timeStamp</arg>
+            <arg>${timeStamp}</arg>
+            <arg>-brokerImplClass</arg>
+            <arg>${wf:conf("broker.impl.class")}</arg>
+            <arg>-brokerUrl</arg>
+            <arg>${wf:conf("broker.url")}</arg>
+            <arg>-userBrokerImplClass</arg>
+            <arg>${userBrokerImplClass}</arg>
+            <arg>-userBrokerUrl</arg>
+            <arg>${userBrokerUrl}</arg>
+            <arg>-brokerTTL</arg>
+            <arg>${wf:conf("broker.ttlInMins")}</arg>
+            <arg>-feedNames</arg>
+            <arg>${feedNames}</arg>
+            <arg>-feedInstancePaths</arg>
+            <arg>${feedInstancePaths}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+            <arg>-workflowEngineUrl</arg>
+            <arg>${workflowEngineUrl}</arg>
+            <arg>-subflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-logDir</arg>
+            <arg>${logDir}/job-${nominalTime}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
+            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+        </java>
+        <ok to="fail"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>
+            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+        </message>
+    </kill>
+    <end name='end'/>
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
new file mode 100644
index 0000000..542634d
--- /dev/null
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.oozie.feed;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.oozie.OozieEntityBuilder;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.COORDINATOR;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.process.AbstractTestBase;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.JAVA;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Tests for Oozie workflow definition for feed replication & retention.
+ */
+public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
+    private EmbeddedCluster srcMiniDFS;
+    private EmbeddedCluster trgMiniDFS;
+    private final ConfigurationStore store = ConfigurationStore.get();
+    private Cluster srcCluster;
+    private Cluster trgCluster;
+    private Cluster alphaTrgCluster;
+    private Cluster betaTrgCluster;
+    private Feed feed;
+    private Feed tableFeed;
+    private Feed fsReplFeed;
+
+    private static final String SRC_CLUSTER_PATH = "/feed/src-cluster.xml";
+    private static final String TRG_CLUSTER_PATH = "/feed/trg-cluster.xml";
+    private static final String FEED = "/feed/feed.xml";
+    private static final String TABLE_FEED = "/feed/table-replication-feed.xml";
+    private static final String FS_REPLICATION_FEED = "/feed/fs-replication-feed.xml";
+
+    @BeforeClass
+    public void setUpDFS() throws Exception {
+        CurrentUser.authenticate("falcon");
+
+        srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
+        String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
+
+        trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
+        String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
+
+        cleanupStore();
+
+        org.apache.falcon.entity.v0.cluster.Property property =
+                new org.apache.falcon.entity.v0.cluster.Property();
+        property.setName(OozieOrchestrationWorkflowBuilder.METASTORE_KERBEROS_PRINCIPAL);
+        property.setValue("hive/_HOST");
+
+        srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
+        srcCluster.getProperties().getProperties().add(property);
+
+        trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
+        trgCluster.getProperties().getProperties().add(property);
+
+        alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/feed/trg-cluster-alpha.xml", trgHdfsUrl);
+        betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/feed/trg-cluster-beta.xml", trgHdfsUrl);
+
+        feed = (Feed) storeEntity(EntityType.FEED, FEED);
+        fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED);
+        tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED);
+    }
+
+    private Entity storeEntity(EntityType type, String resource) throws Exception {
+        return storeEntity(type, null, resource, null);
+    }
+
+    private Entity storeEntity(EntityType type, String resource, String writeUrl) throws Exception {
+        return storeEntity(type, null, resource, writeUrl);
+    }
+
+    protected void cleanupStore() throws FalconException {
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = store.getEntities(type);
+            for (String entity : entities) {
+                store.remove(type, entity);
+            }
+        }
+    }
+
+    @AfterClass
+    public void stopDFS() {
+        srcMiniDFS.shutdown();
+        trgMiniDFS.shutdown();
+    }
+
+    @Test
+    public void testReplicationCoordsForFSStorage() throws Exception {
+        OozieEntityBuilder builder = OozieEntityBuilder.get(feed);
+        Path bundlePath = new Path("/projects/falcon/");
+        builder.build(trgCluster, bundlePath);
+        BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
+        List<COORDINATOR> coords = bundle.getCoordinator();
+
+        //Assert retention coord
+        COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
+        assertLibExtensions(coord, "retention");
+
+        //Assert replication coord
+        coord = getCoordinator(trgMiniDFS, coords.get(1).getAppPath());
+        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+        Assert.assertEquals(getWorkflowAppPath(), coord.getAction().getWorkflow().getAppPath());
+        Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
+                + srcCluster.getName(), coord.getName());
+        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(0);
+        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(1);
+
+        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+        Assert.assertEquals("input-dataset", inputDataset.getName());
+        Assert.assertEquals(
+                ClusterHelper.getReadOnlyStorageUrl(srcCluster)
+                        + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+                inputDataset.getUriTemplate());
+
+        Assert.assertEquals("${coord:minutes(20)}",
+                outputDataset.getFrequency());
+        Assert.assertEquals("output-dataset", outputDataset.getName());
+        Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
+                + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+                        outputDataset.getUriTemplate());
+        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+        Assert.assertEquals("input", inEventName);
+        Assert.assertEquals("input-dataset", inEventDataset);
+        Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+        Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify the replication param that feed replicator depends on
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
+        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, feed));
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+        // verify workflow params
+        Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
+        Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
+        Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
+
+        // verify default params
+        Assert.assertEquals(props.get("queueName"), "default");
+        Assert.assertEquals(props.get("jobPriority"), "NORMAL");
+        Assert.assertEquals(props.get("maxMaps"), "5");
+        Assert.assertEquals(props.get("mapBandwidthKB"), "102400");
+
+        assertLibExtensions(coord, "replication");
+        WORKFLOWAPP wf = getWorkflowapp(coord);
+        assertWorkflowRetries(wf);
+
+        Assert.assertFalse(Storage.TYPE.TABLE == FeedHelper.getStorageType(feed, trgCluster));
+    }
+
+    private COORDINATORAPP getCoordinator(EmbeddedCluster cluster, String appPath) throws Exception {
+        return getCoordinator(cluster.getFileSystem(), new Path(StringUtils.removeStart(appPath, "${nameNode}")));
+    }
+
+    private String getWorkflowAppPath() {
+        return "${nameNode}/projects/falcon/REPLICATION/" + srcCluster.getName();
+    }
+
+    private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
+        assertWorkflowRetries(getWorkflowapp(coord));
+    }
+
+    private void assertWorkflowRetries(WORKFLOWAPP wf) throws JAXBException, IOException {
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            String actionName = action.getName();
+            if (OozieOrchestrationWorkflowBuilder.FALCON_ACTIONS.contains(actionName)) {
+                Assert.assertEquals(action.getRetryMax(), "3");
+                Assert.assertEquals(action.getRetryInterval(), "1");
+            }
+        }
+    }
+
+    private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
+        WORKFLOWAPP wf = getWorkflowapp(coord);
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            List<String> files = null;
+            if (action.getJava() != null) {
+                files = action.getJava().getFile();
+            } else if (action.getPig() != null) {
+                files = action.getPig().getFile();
+            } else if (action.getMapReduce() != null) {
+                files = action.getMapReduce().getFile();
+            }
+            if (files != null) {
+                Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
+                        + lifecycle + "/ext.jar"));
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
+                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
+    }
+
+    @Test
+    public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
+        OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeed, Tag.REPLICATION);
+
+        List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster, new Path("/alpha/falcon/"));
+        final COORDINATORAPP alphaCoord = getCoordinator(trgMiniDFS,
+            alphaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
+        Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
+        Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
+
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
+        assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
+
+        List<Properties> betaCoords = builder.buildCoords(betaTrgCluster, new Path("/beta/falcon/"));
+        final COORDINATORAPP betaCoord = getCoordinator(trgMiniDFS,
+            betaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
+        Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
+        Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
+
+        pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
+        assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
+    }
+
+    private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
+                                          Feed aFeed) throws FalconException {
+        String srcPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
+        srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
+        String targetPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
+        targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
+
+        String pathsWithPartitions = "${coord:dataIn('input')}/"
+                + FeedHelper.normalizePartitionExpression(srcPart, targetPart);
+        String parts = pathsWithPartitions.replaceAll("//+", "/");
+        parts = StringUtils.stripEnd(parts, "/");
+        return parts;
+    }
+
+    private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
+                                 String pathsWithPartitions) throws JAXBException, IOException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
+        Date startDate = feedCluster.getValidity().getStart();
+        Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
+
+        Date endDate = feedCluster.getValidity().getEnd();
+        Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
+
+        WORKFLOWAPP workflow = getWorkflowapp(coord);
+        assertWorkflowDefinition(fsReplFeed, workflow);
+
+        List<Object> actions = workflow.getDecisionOrForkOrJoin();
+        System.out.println("actions = " + actions);
+
+        ACTION replicationActionNode = (ACTION) actions.get(4);
+        Assert.assertEquals(replicationActionNode.getName(), "replication");
+
+        JAVA replication = replicationActionNode.getJava();
+        List<String> args = replication.getArg();
+        Assert.assertEquals(args.size(), 13);
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+        Assert.assertEquals(props.get("maxMaps"), "33");
+        Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
+        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, aFeed));
+    }
+
+    public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+        Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+        Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+        Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+        Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("cleanup-table-staging-dir", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(9)).getName());
+    }
+
+    @DataProvider(name = "secureOptions")
+    private Object[][] createOptions() {
+        return new Object[][] {
+            {"simple"},
+            {"kerberos"},
+        };
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testReplicationCoordsForTableStorage(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(tableFeed, Tag.REPLICATION);
+        List<Properties> coords = builder.buildCoords(trgCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
+
+        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+        Assert.assertEquals(getWorkflowAppPath(),
+                coord.getAction().getWorkflow().getAppPath());
+        Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
+                + srcCluster.getName(), coord.getName());
+        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+
+        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(0);
+        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+        Assert.assertEquals("input-dataset", inputDataset.getName());
+
+        String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
+        sourceRegistry = sourceRegistry.replace("thrift", "hcat");
+        Assert.assertEquals(inputDataset.getUriTemplate(),
+                sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(1);
+        Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
+        Assert.assertEquals("output-dataset", outputDataset.getName());
+
+        String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
+        targetRegistry = targetRegistry.replace("thrift", "hcat");
+        Assert.assertEquals(outputDataset.getUriTemplate(),
+                targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+        Assert.assertEquals("input", inEventName);
+        Assert.assertEquals("input-dataset", inEventDataset);
+        Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+        Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+        // assert FS staging area
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        final FileSystem fs = trgMiniDFS.getFileSystem();
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
+
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
+        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
+
+        // verify the replication param that feed replicator depends on
+        Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
+
+        Assert.assertTrue(props.containsKey("distcpSourcePaths"));
+        Assert.assertEquals(props.get("distcpSourcePaths"),
+                FeedHelper.getStagingPath(srcCluster, tableFeed, srcStorage, Tag.REPLICATION,
+                        "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" + "/" + trgCluster.getName()));
+
+        Assert.assertTrue(props.containsKey("distcpTargetPaths"));
+        Assert.assertEquals(props.get("distcpTargetPaths"),
+                FeedHelper.getStagingPath(trgCluster, tableFeed, trgStorage, Tag.REPLICATION,
+                        "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" + "/" + trgCluster.getName()));
+
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
+
+        // verify table props
+        assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
+        assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+        Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
+        assertReplicationHCatCredentials(getWorkflowapp(coord),
+                coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
+    }
+
+    private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
+        FileSystem fs = trgMiniDFS.getFileSystem();
+
+        Path hiveConfPath = new Path(wfPath, "conf/falcon-source-hive-site.xml");
+        Assert.assertTrue(fs.exists(hiveConfPath));
+
+        hiveConfPath = new Path(wfPath, "conf/falcon-target-hive-site.xml");
+        Assert.assertTrue(fs.exists(hiveConfPath));
+
+        boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled();
+        if (isSecurityEnabled) {
+            Assert.assertNotNull(wf.getCredentials());
+            Assert.assertEquals(2, wf.getCredentials().getCredential().size());
+        }
+
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            String actionName = action.getName();
+
+            if (!isSecurityEnabled) {
+                Assert.assertNull(action.getCred());
+            }
+
+            if ("recordsize".equals(actionName)) {
+                Assert.assertEquals(action.getJava().getJobXml(), "${wf:appPath()}/conf/falcon-source-hive-site.xml");
+                if (isSecurityEnabled) {
+                    Assert.assertNotNull(action.getCred());
+                    Assert.assertEquals(action.getCred(), "falconSourceHiveAuth");
+                }
+            } else if ("table-export".equals(actionName) && isSecurityEnabled) {
+                Assert.assertNotNull(action.getCred());
+                Assert.assertEquals(action.getCred(), "falconSourceHiveAuth");
+            } else if ("table-import".equals(actionName) && isSecurityEnabled) {
+                Assert.assertNotNull(action.getCred());
+                Assert.assertEquals(action.getCred(), "falconTargetHiveAuth");
+            }
+        }
+    }
+
+    private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+                                              Map<String, String> props, String prefix) {
+        Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
+        Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
+        Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
+
+        Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
+        Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
+        Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitions('input', 'hive-export')}");
+    }
+
+    @Test
+    public void testRetentionCoords() throws Exception {
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+        final Calendar instance = Calendar.getInstance();
+        instance.roll(Calendar.YEAR, 1);
+        cluster.getValidity().setEnd(instance.getTime());
+
+        OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(feed, Tag.RETENTION);
+        List<Properties> coords = builder.buildCoords(srcCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = getCoordinator(srcMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
+
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String feedDataPath = props.get("feedDataPath");
+        String storageType = props.get("falconFeedStorageType");
+
+        // verify the param that feed evictor depends on
+        Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
+
+        final Storage storage = FeedHelper.createStorage(cluster, feed);
+        if (feedDataPath != null) {
+            Assert.assertEquals(feedDataPath, storage.getUriTemplate()
+                    .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+        }
+
+        if (storageType != null) {
+            Assert.assertEquals(storageType, storage.getType().name());
+        }
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+        Assert.assertEquals(props.get("logDir"), getLogPath(srcCluster, feed));
+
+        assertWorkflowRetries(coord);
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testRetentionCoordsForTable(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(tableFeed, trgCluster.getName());
+        final Calendar instance = Calendar.getInstance();
+        instance.roll(Calendar.YEAR, 1);
+        cluster.getValidity().setEnd(instance.getTime());
+
+        OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(tableFeed, Tag.RETENTION);
+        List<Properties> coords = builder.buildCoords(trgCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
+
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + tableFeed.getName());
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String feedDataPath = props.get("feedDataPath");
+        String storageType = props.get("falconFeedStorageType");
+
+        // verify the param that feed evictor depends on
+        Assert.assertEquals(storageType, Storage.TYPE.TABLE.name());
+
+        final Storage storage = FeedHelper.createStorage(cluster, tableFeed);
+        if (feedDataPath != null) {
+            Assert.assertEquals(feedDataPath, storage.getUriTemplate()
+                    .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+        }
+
+        if (storageType != null) {
+            Assert.assertEquals(storageType, storage.getType().name());
+        }
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
+
+        assertWorkflowRetries(coord);
+
+        Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
+        assertHCatCredentials(getWorkflowapp(coord),
+                coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
+    }
+
+    private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
+        Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml");
+        FileSystem fs = trgMiniDFS.getFileSystem();
+        Assert.assertTrue(fs.exists(hiveConfPath));
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            Assert.assertNotNull(wf.getCredentials());
+            Assert.assertEquals(1, wf.getCredentials().getCredential().size());
+        }
+
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            String actionName = action.getName();
+
+            if ("eviction".equals(actionName)) {
+                Assert.assertEquals(action.getJava().getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+                if (SecurityUtil.isSecurityEnabled()) {
+                    Assert.assertNotNull(action.getCred());
+                    Assert.assertEquals(action.getCred(), "falconHiveAuth");
+                }
+            }
+        }
+    }
+
+    private String getLogPath(Cluster aCluster, Feed aFeed) {
+        Path logPath = EntityUtil.getLogPath(aCluster, aFeed);
+        return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
new file mode 100644
index 0000000..54a2ea7
--- /dev/null
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.util.Collection;
+
+/**
+ * Base for falcon unit tests involving configuration store.
+ */
+public class AbstractTestBase {
+    protected Entity storeEntity(EntityType type, String name, String resource, String writeEndpoint) throws Exception {
+        Unmarshaller unmarshaller = type.getUnmarshaller();
+        ConfigurationStore store = ConfigurationStore.get();
+        switch (type) {
+        case CLUSTER:
+            Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(resource));
+            if (name != null){
+                store.remove(type, name);
+                cluster.setName(name);
+            }
+            store.publish(type, cluster);
+
+            if (writeEndpoint != null) {
+                ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
+                FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
+                fs.create(
+                    new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
+                fs.create(
+                    new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
+            }
+
+            return cluster;
+
+        case FEED:
+            Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(resource));
+            if (name != null) {
+                store.remove(type, name);
+                feed.setName(name);
+            }
+            store.publish(type, feed);
+            return feed;
+
+        case PROCESS:
+            Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(resource));
+            if (name != null) {
+                store.remove(type, name);
+                process.setName(name);
+            }
+            store.publish(type, process);
+            return process;
+
+        default:
+        }
+
+        throw new IllegalArgumentException("Unhandled type: " + type);
+    }
+
+    protected COORDINATORAPP getCoordinator(FileSystem fs, Path path) throws Exception {
+        String coordStr = readFile(fs, new Path(path, "coordinator.xml"));
+
+        Unmarshaller unmarshaller = JAXBContext.newInstance(COORDINATORAPP.class).createUnmarshaller();
+        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
+        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
+        unmarshaller.setSchema(schema);
+        JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
+            new StreamSource(new ByteArrayInputStream(coordStr.trim().getBytes())), COORDINATORAPP.class);
+        return jaxbBundle.getValue();
+    }
+
+    protected BUNDLEAPP getBundle(FileSystem fs, Path path) throws Exception {
+        String bundleStr = readFile(fs, new Path(path, "bundle.xml"));
+
+        Unmarshaller unmarshaller = JAXBContext.newInstance(BUNDLEAPP.class).createUnmarshaller();
+        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
+        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
+        unmarshaller.setSchema(schema);
+        JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
+            new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
+        return jaxbBundle.getValue();
+    }
+
+    protected String readFile(FileSystem fs, Path path) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
+        String line;
+        StringBuilder contents = new StringBuilder();
+        while ((line = reader.readLine()) != null) {
+            contents.append(line);
+        }
+        return contents.toString();
+    }
+
+    protected void cleanupStore() throws FalconException {
+        ConfigurationStore store = ConfigurationStore.get();
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = store.getEntities(type);
+            for (String entity : entities) {
+                store.remove(type, entity);
+            }
+        }
+    }
+}


Mime
View raw message