falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [4/9] FALCON-369 Refactor workflow builder. Contributed by Shwetha GS
Date Thu, 10 Jul 2014 06:57:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
new file mode 100644
index 0000000..5ceea75
--- /dev/null
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -0,0 +1,767 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.messaging.EntityInstanceMessage;
+import org.apache.falcon.oozie.OozieEntityBuilder;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Test for the Falcon entities mapping into Oozie artifacts.
+ */
+public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
+    private static final String PROCESS_XML = "/config/process/process-0.1.xml";
+    private static final String FEED_XML = "/config/feed/feed-0.1.xml";
+    private static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
+    private static final String PIG_PROCESS_XML = "/config/process/pig-process-0.1.xml";
+
+    private String hdfsUrl;
+    private FileSystem fs;
+    private Cluster cluster;
+
+    @BeforeClass
+    public void setUpDFS() throws Exception {
+        CurrentUser.authenticate("falcon");
+
+        Configuration conf = EmbeddedCluster.newCluster("testCluster").getConf();
+        hdfsUrl = conf.get("fs.default.name");
+    }
+
+    private void storeEntity(EntityType type, String name, String resource) throws Exception {
+        storeEntity(type, name, resource, null);
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        storeEntity(EntityType.CLUSTER, "corp", CLUSTER_XML);
+        storeEntity(EntityType.FEED, "clicks", FEED_XML);
+        storeEntity(EntityType.FEED, "impressions", FEED_XML);
+        storeEntity(EntityType.FEED, "clicksummary", FEED_XML);
+        storeEntity(EntityType.PROCESS, "clicksummary", PROCESS_XML);
+        storeEntity(EntityType.PROCESS, "pig-process", PIG_PROCESS_XML);
+
+        ConfigurationStore store = ConfigurationStore.get();
+        cluster = store.get(EntityType.CLUSTER, "corp");
+        org.apache.falcon.entity.v0.cluster.Property property =
+                new org.apache.falcon.entity.v0.cluster.Property();
+        property.setName(OozieOrchestrationWorkflowBuilder.METASTORE_KERBEROS_PRINCIPAL);
+        property.setValue("hive/_HOST");
+        cluster.getProperties().getProperties().add(property);
+
+        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+        ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
+        fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
+        fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
+
+        Process process = store.get(EntityType.PROCESS, "clicksummary");
+        Path wfpath = new Path(process.getWorkflow().getPath());
+        assert new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()).mkdirs(wfpath);
+    }
+
+    public void testDefCoordMap(Process process, COORDINATORAPP coord) throws Exception {
+        assertEquals("FALCON_PROCESS_DEFAULT_" + process.getName(), coord.getName());
+        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+        assertEquals(SchemaHelper.formatDateUTC(processValidity.getStart()), coord.getStart());
+        assertEquals(SchemaHelper.formatDateUTC(processValidity.getEnd()), coord.getEnd());
+        assertEquals("${coord:" + process.getFrequency().toString() + "}", coord.getFrequency());
+        assertEquals(process.getTimezone().getID(), coord.getTimezone());
+
+        assertEquals(process.getParallel() + "", coord.getControls().getConcurrency());
+        assertEquals(process.getOrder().name(), coord.getControls().getExecution());
+
+        assertEquals(process.getInputs().getInputs().get(0).getName(),
+                coord.getInputEvents().getDataIn().get(0).getName());
+        assertEquals(process.getInputs().getInputs().get(0).getName(),
+                coord.getInputEvents().getDataIn().get(0).getDataset());
+        assertEquals("${" + process.getInputs().getInputs().get(0).getStart() + "}",
+                coord.getInputEvents().getDataIn().get(0).getStartInstance());
+        assertEquals("${" + process.getInputs().getInputs().get(0).getEnd() + "}",
+                coord.getInputEvents().getDataIn().get(0).getEndInstance());
+
+        assertEquals(process.getInputs().getInputs().get(1).getName(),
+                coord.getInputEvents().getDataIn().get(1).getName());
+        assertEquals(process.getInputs().getInputs().get(1).getName(),
+                coord.getInputEvents().getDataIn().get(1).getDataset());
+        assertEquals("${" + process.getInputs().getInputs().get(1).getStart() + "}",
+                coord.getInputEvents().getDataIn().get(1).getStartInstance());
+        assertEquals("${" + process.getInputs().getInputs().get(1).getEnd() + "}",
+                coord.getInputEvents().getDataIn().get(1).getEndInstance());
+
+        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "stats",
+                coord.getOutputEvents().getDataOut().get(1).getName());
+        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
+                coord.getOutputEvents().getDataOut().get(2).getName());
+        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "tmp",
+                coord.getOutputEvents().getDataOut().get(3).getName());
+
+        assertEquals(process.getOutputs().getOutputs().get(0).getName(),
+                coord.getOutputEvents().getDataOut().get(0).getName());
+        assertEquals("${" + process.getOutputs().getOutputs().get(0).getInstance() + "}",
+                coord.getOutputEvents().getDataOut().get(0).getInstance());
+        assertEquals(process.getOutputs().getOutputs().get(0).getName(),
+                coord.getOutputEvents().getDataOut().get(0).getDataset());
+
+        assertEquals(6, coord.getDatasets().getDatasetOrAsyncDataset().size());
+
+        ConfigurationStore store = ConfigurationStore.get();
+        Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
+        SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+        final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+        assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
+        assertEquals(feed.getTimezone().getID(), ds.getTimezone());
+        assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
+        assertEquals("", ds.getDoneFlag());
+        assertEquals(ds.getUriTemplate(),
+                FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+        assertEquals(props.get("mapred.job.priority"), "LOW");
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+
+        assertLibExtensions(coord);
+    }
+
+    private String getLogPath(Process process) {
+        Path logPath = EntityUtil.getLogPath(cluster, process);
+        return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+    }
+
+    @Test
+    public void testBundle() throws Exception {
+        String path = StartupProperties.get().getProperty("system.lib.location");
+        if (!new File(path).exists()) {
+            Assert.assertTrue(new File(path).mkdirs());
+        }
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
+        testParentWorkflow(process, parentWorkflow);
+    }
+
+    @Test
+    public void testBundle1() throws Exception {
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+        process.setFrequency(Frequency.fromString("minutes(1)"));
+        process.setTimeout(Frequency.fromString("minutes(15)"));
+
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "30", "15");
+        testParentWorkflow(process, parentWorkflow);
+    }
+
+    @Test
+    public void testPigProcessMapper() throws Exception {
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
+        Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
+
+        prepare(process);
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
+        Assert.assertEquals("user-pig-job", pigActionNode.getName());
+
+        final PIG pigAction = pigActionNode.getPig();
+        Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
+        Assert.assertNotNull(pigAction.getPrepare());
+        Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
+        Assert.assertFalse(pigAction.getParam().isEmpty());
+        Assert.assertEquals(5, pigAction.getParam().size());
+        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
+        Assert.assertTrue(pigAction.getFile().size() > 0);
+
+        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
+        Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
+        Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
+    }
+
+    @DataProvider(name = "secureOptions")
+    private Object[][] createOptions() {
+        return new Object[][] {
+            {"simple"},
+            {"kerberos"},
+        };
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testHiveProcessMapper(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/hive-process.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify table and hive props
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
+        expected.putAll(ClusterHelper.getHiveProperties(cluster));
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+            }
+        }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+        Assert.assertNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertFalse(hiveAction.getParam().isEmpty());
+        Assert.assertEquals(14, hiveAction.getParam().size());
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        assertHCatCredentials(parentWorkflow, wfPath);
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/hive-process-FSInputFeed.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+        Assert.assertNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertFalse(hiveAction.getParam().isEmpty());
+        Assert.assertEquals(10, hiveAction.getParam().size());
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        assertHCatCredentials(parentWorkflow, wfPath);
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testHiveProcessMapperWithTableInputFeedAndFSOutputFeed(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/process/hive-process-FSOutputFeed.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+        Assert.assertNotNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertFalse(hiveAction.getParam().isEmpty());
+        Assert.assertEquals(6, hiveAction.getParam().size());
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        assertHCatCredentials(parentWorkflow, wfPath);
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testHiveProcessWithNoInputsAndOutputs(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        URL resource = this.getClass().getResource("/config/process/dumb-hive-process.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+        Assert.assertNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertTrue(hiveAction.getParam().isEmpty());
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
+        Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml");
+        Assert.assertTrue(fs.exists(hiveConfPath));
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            Assert.assertNotNull(wf.getCredentials());
+            Assert.assertEquals(1, wf.getCredentials().getCredential().size());
+        }
+
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+
+            ACTION action = (ACTION) obj;
+
+            if (!SecurityUtil.isSecurityEnabled()) {
+                Assert.assertNull(action.getCred());
+                return;
+            }
+
+            String actionName = action.getName();
+            if ("user-hive-job".equals(actionName) || "user-pig-job".equals(actionName)
+                    || "user-oozie-workflow".equals(actionName) || "recordsize".equals(actionName)) {
+                Assert.assertNotNull(action.getCred());
+                Assert.assertEquals(action.getCred(), "falconHiveAuth");
+            }
+        }
+    }
+
+    private void prepare(Process process) throws IOException {
+        Path wf = new Path(process.getWorkflow().getPath());
+        fs.mkdirs(wf.getParent());
+        fs.create(wf).close();
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testProcessMapperForTableStorage(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/pig-process-table.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify table props
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+            }
+        }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        assertHCatCredentials(parentWorkflow, wfPath);
+    }
+
+    private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed,
+                                                      Process process) throws FalconException {
+        Map<String, String> expected = new HashMap<String, String>();
+        for (Input input : process.getInputs().getInputs()) {
+            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
+            propagateStorageProperties(input.getName(), storage, expected);
+        }
+
+        for (Output output : process.getOutputs().getOutputs()) {
+            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
+            propagateStorageProperties(output.getName(), storage, expected);
+        }
+
+        return expected;
+    }
+
+    private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
+                                            Map<String, String> props) {
+        String prefix = "falcon_" + feedName;
+        props.put(prefix + "_storage_type", tableStorage.getType().name());
+        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+        props.put(prefix + "_database", tableStorage.getDatabase());
+        props.put(prefix + "_table", tableStorage.getTable());
+
+        if (prefix.equals("falcon_input")) {
+            props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
+            props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
+            props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
+            props.put(prefix + "_datain_partitions_hive", "${coord:dataInPartitions('input', 'hive-export')}");
+        } else if (prefix.equals("falcon_output")) {
+            props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
+        }
+    }
+
+    @Test
+    public void testProcessWorkflowMapper() throws Exception {
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+        Workflow processWorkflow = process.getWorkflow();
+        Assert.assertEquals("test", processWorkflow.getName());
+        Assert.assertEquals("1.0.0", processWorkflow.getVersion());
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
+                fs.open(new Path(wfPath, "workflow.xml")))).getValue();
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            List<String> files = null;
+            if (action.getJava() != null) {
+                files = action.getJava().getFile();
+            } else if (action.getPig() != null) {
+                files = action.getPig().getFile();
+            } else if (action.getMapReduce() != null) {
+                files = action.getMapReduce().getFile();
+            }
+            if (files != null) {
+                Assert.assertTrue(files.get(files.size() - 1)
+                        .endsWith("/projects/falcon/working/libext/PROCESS/ext.jar"));
+            }
+        }
+    }
+
+    private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
+        throws Exception {
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+            bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        testDefCoordMap(process, coord);
+        assertEquals(coord.getControls().getThrottle(), throttle);
+        assertEquals(coord.getControls().getTimeout(), timeout);
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        return getParentWorkflow(new Path(wfPath));
+    }
+
+    public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+        Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+        Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+        Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+        Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryInterval());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryInterval());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
+    }
+
+    @SuppressWarnings("unchecked")
+    private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
+        String workflow = readFile(fs, new Path(path, "workflow.xml"));
+
+        JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
+        return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        cleanupStore();
+    }
+
+    @Test
+    public void testProcessWithNoInputsAndOutputs() throws Exception {
+        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+        URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
+        Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+        OozieEntityBuilder builder = OozieEntityBuilder.get(processEntity);
+        Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+        Assert.assertEquals(props.get("logDir"), getLogPath(processEntity));
+
+        String[] expected = {
+            EntityInstanceMessage.ARG.feedNames.getPropName(),
+            EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
+            "falconInputFeeds",
+            "falconInPaths",
+            "userWorkflowName",
+            "userWorkflowVersion",
+            "userWorkflowEngine",
+        };
+
+        for (String property : expected) {
+            Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/cluster/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/cluster/cluster-0.1.xml b/oozie/src/test/resources/config/cluster/cluster-0.1.xml
new file mode 100644
index 0000000..032cc77
--- /dev/null
+++ b/oozie/src/test/resources/config/cluster/cluster-0.1.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<cluster colo="gs" description="" name="corp" xmlns="uri:falcon:cluster:0.1"
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+        <property name="hive.metastore.client.socket.timeout" value="20"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/feed/feed-0.1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/feed/feed-0.1.xml b/oozie/src/test/resources/config/feed/feed-0.1.xml
new file mode 100644
index 0000000..fb9b707
--- /dev/null
+++ b/oozie/src/test/resources/config/feed/feed-0.1.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1"
+        >
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="country"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="corp" type="source">
+            <validity start="2011-11-01T00:00Z" end="2099-12-31T23:59Z"/>
+            <retention limit="hours(6)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <locations>
+                <location type="data" path="/projects/falcon/clicks/${YY}/${MM}"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2099-12-31T23:59Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+    </properties>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/feed/hive-table-feed-out.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/feed/hive-table-feed-out.xml b/oozie/src/test/resources/config/feed/hive-table-feed-out.xml
new file mode 100644
index 0000000..bd93a01
--- /dev/null
+++ b/oozie/src/test/resources/config/feed/hive-table-feed-out.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks summary table " name="clicks-summary-table" xmlns="uri:falcon:feed:0.1">
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="corp" type="source" partition="*/${cluster.colo}">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/feed/hive-table-feed.xml b/oozie/src/test/resources/config/feed/hive-table-feed.xml
new file mode 100644
index 0000000..66d0742
--- /dev/null
+++ b/oozie/src/test/resources/config/feed/hive-table-feed.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log table " name="clicks-raw-table" xmlns="uri:falcon:feed:0.1">
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="corp" type="source" partition="*/${cluster.colo}">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/late/late-cluster.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/late/late-cluster.xml b/oozie/src/test/resources/config/late/late-cluster.xml
new file mode 100644
index 0000000..ac0817f
--- /dev/null
+++ b/oozie/src/test/resources/config/late/late-cluster.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<cluster colo="gs" description="" name="late-cluster" xmlns="uri:falcon:cluster:0.1"
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/late/late-feed1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/late/late-feed1.xml b/oozie/src/test/resources/config/late/late-feed1.xml
new file mode 100644
index 0000000..c500c4c
--- /dev/null
+++ b/oozie/src/test/resources/config/late/late-feed1.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<feed description="clicks log" name="late-feed1" xmlns="uri:falcon:feed:0.1"
+        >
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="country"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(5)"/>
+
+    <clusters>
+        <cluster name="late-cluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2099-12-31T23:59Z"/>
+            <retention limit="hours(6)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+    </properties>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/late/late-feed2.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/late/late-feed2.xml b/oozie/src/test/resources/config/late/late-feed2.xml
new file mode 100644
index 0000000..6ccffe2
--- /dev/null
+++ b/oozie/src/test/resources/config/late/late-feed2.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<feed description="clicks log" name="late-feed2" xmlns="uri:falcon:feed:0.1"
+        >
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="country"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(5)"/>
+
+    <clusters>
+        <cluster name="late-cluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2099-12-31T23:59Z"/>
+            <retention limit="hours(6)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+    </properties>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/late/late-feed3.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/late/late-feed3.xml b/oozie/src/test/resources/config/late/late-feed3.xml
new file mode 100644
index 0000000..239f140
--- /dev/null
+++ b/oozie/src/test/resources/config/late/late-feed3.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<feed description="clicks log" name="late-feed3" xmlns="uri:falcon:feed:0.1"
+        >
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="country"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="late-cluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2099-12-31T23:59Z"/>
+            <retention limit="hours(6)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+    </properties>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/late/late-process1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/late/late-process1.xml b/oozie/src/test/resources/config/late/late-process1.xml
new file mode 100644
index 0000000..aba5525
--- /dev/null
+++ b/oozie/src/test/resources/config/late/late-process1.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one ~ or more contributor license agreements. See the NOTICE file ~ distributed with this work for additional information 
+    ~ regarding copyright ownership. The ASF licenses this file ~ to you under the Apache License, Version 2.0 (the ~ "License"); you may not use this file except in compliance ~ with the 
+    License. You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed 
+    under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing 
+    permissions and ~ limitations under the License. -->
+<process name="late-process1" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="late-cluster">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <concurrency>1</concurrency>
+    <execution>LIFO</execution>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="impression" feed="late-feed1" start-instance="today(0,0)" end-instance="today(0,2)"/>
+        <input name="clicks" feed="late-feed2" start-instance="yesterday(0,0)" end-instance="today(0,0)"
+               partition="*/US"/>
+    </inputs>
+
+    <outputs>
+        <output name="clicksummary" feed="late-feed3" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <properties>
+        <property name="procprop" value="procprop"/>
+    </properties>
+
+    <workflow engine="oozie" path="/user/guest/workflow"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/late/late-process2.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/late/late-process2.xml b/oozie/src/test/resources/config/late/late-process2.xml
new file mode 100644
index 0000000..bc507ad
--- /dev/null
+++ b/oozie/src/test/resources/config/late/late-process2.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<process name="late-process2" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="late-cluster">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <concurrency>1</concurrency>
+    <execution>LIFO</execution>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="impression" feed="late-feed1" start-instance="today(0,0)" end-instance="today(0,2)"/>
+        <input name="clicks" feed="late-feed2" start-instance="yesterday(0,0)" end-instance="today(0,0)"
+               partition="*/US"/>
+    </inputs>
+
+    <outputs>
+        <output name="clicksummary" feed="late-feed3" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <properties>
+        <property name="procprop" value="procprop"/>
+    </properties>
+
+    <workflow engine="oozie" path="/user/guest/workflow"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+    <late-process policy="exp-backoff" delay="hours(1)">
+        <late-input feed="impression" workflow-path="hdfs://impression/late/workflow"/>
+        <late-input feed="clicks" workflow-path="hdfs://clicks/late/workflow"/>
+    </late-process>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/process/dumb-hive-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/dumb-hive-process.xml b/oozie/src/test/resources/config/process/dumb-hive-process.xml
new file mode 100644
index 0000000..c504074
--- /dev/null
+++ b/oozie/src/test/resources/config/process/dumb-hive-process.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what = none -->
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/process/dumb-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/dumb-process.xml b/oozie/src/test/resources/config/process/dumb-process.xml
new file mode 100644
index 0000000..b71f089
--- /dev/null
+++ b/oozie/src/test/resources/config/process/dumb-process.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="dumb-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what = none -->
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/process/hive-process-FSInputFeed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/hive-process-FSInputFeed.xml b/oozie/src/test/resources/config/process/hive-process-FSInputFeed.xml
new file mode 100644
index 0000000..d871377
--- /dev/null
+++ b/oozie/src/test/resources/config/process/hive-process-FSInputFeed.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/process/hive-process-FSOutputFeed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/hive-process-FSOutputFeed.xml b/oozie/src/test/resources/config/process/hive-process-FSOutputFeed.xml
new file mode 100644
index 0000000..23d96c3
--- /dev/null
+++ b/oozie/src/test/resources/config/process/hive-process-FSOutputFeed.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/process/hive-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/hive-process.xml b/oozie/src/test/resources/config/process/hive-process.xml
new file mode 100644
index 0000000..4dac8e9
--- /dev/null
+++ b/oozie/src/test/resources/config/process/hive-process.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/process/pig-process-0.1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/pig-process-0.1.xml b/oozie/src/test/resources/config/process/pig-process-0.1.xml
new file mode 100644
index 0000000..318f0da
--- /dev/null
+++ b/oozie/src/test/resources/config/process/pig-process-0.1.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<process name="pig-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="impression" feed="impressions" start="today(0,0)" end="today(0,2)"/>
+        <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US"/>
+    </inputs>
+
+    <outputs>
+        <output name="clicksummary" feed="impressions" instance="today(0,0)"/>
+    </outputs>
+
+    <properties>
+        <property name="procprop" value="procprop"/>
+        <property name="mapred.job.priority" value="LOW"/>
+    </properties>
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/process/pig-process-table.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/pig-process-table.xml b/oozie/src/test/resources/config/process/pig-process-table.xml
new file mode 100644
index 0000000..37aca10
--- /dev/null
+++ b/oozie/src/test/resources/config/process/pig-process-table.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="table-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/process-0.1.xml b/oozie/src/test/resources/config/process/process-0.1.xml
new file mode 100644
index 0000000..6148441
--- /dev/null
+++ b/oozie/src/test/resources/config/process/process-0.1.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one ~ or more contributor license agreements. See the NOTICE file ~ distributed with this work for additional information 
+    ~ regarding copyright ownership. The ASF licenses this file ~ to you under the Apache License, Version 2.0 (the ~ "License"); you may not use this file except in compliance ~ with the 
+    License. You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed 
+    under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing 
+    permissions and ~ limitations under the License. -->
+<process name="sample" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="impression" feed="impressions" start="today(0,0)" end="today(0,2)"/>
+        <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US"/>
+    </inputs>
+
+    <outputs>
+        <output name="clicksummary" feed="impressions" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <properties>
+        <property name="procprop" value="procprop"/>
+        <property name="mapred.job.priority" value="LOW"/>
+    </properties>
+
+    <workflow name="test" version="1.0.0" engine="oozie" path="/user/guest/workflow"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+    <late-process policy="exp-backoff" delay="hours(1)">
+        <late-input input="impression" workflow-path="hdfs://impression/late/workflow"/>
+        <late-input input="click" workflow-path="hdfs://clicks/late/workflow"/>
+    </late-process>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/feed/feed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/feed.xml b/oozie/src/test/resources/feed/feed.xml
new file mode 100644
index 0000000..4da222e
--- /dev/null
+++ b/oozie/src/test/resources/feed/feed.xml
@@ -0,0 +1,56 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="raw-logs" xmlns="uri:falcon:feed:0.1"
+        >
+
+    <groups>online,bi</groups>
+
+    <frequency>minutes(20)</frequency>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="minutes(3)"/>
+    <clusters>
+        <cluster name="corp1" type="source" delay="minutes(40)">
+            <validity start="2010-01-01T00:00Z" end="2020-01-01T02:00Z"/>
+            <retention limit="minutes(5)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="corp2" type="target">
+            <validity start="2010-01-01T00:00Z" end="2020-01-01T02:00Z"/>
+            <retention limit="minutes(7)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data"
+                  path="/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+
+    <properties>
+        <property name="field3" value="value3"/>
+        <property name="field2" value="value2"/>
+
+        <property name="field4" value="value2"/>
+    </properties>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/feed/fs-replication-feed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/fs-replication-feed.xml b/oozie/src/test/resources/feed/fs-replication-feed.xml
new file mode 100644
index 0000000..bada507
--- /dev/null
+++ b/oozie/src/test/resources/feed/fs-replication-feed.xml
@@ -0,0 +1,68 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="billing RC File" name="replication-test" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(1)"/>
+
+    <clusters>
+        <cluster partition="${cluster.colo}" type="source" name="corp1">
+            <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+            <retention action="delete" limit="days(10000)"/>
+        </cluster>
+        <cluster type="target" name="alpha">
+            <validity end="2012-10-01T12:11Z" start="2012-10-01T12:05Z"/>
+            <retention action="delete" limit="days(10000)"/>
+            <locations>
+                <location path="/localDC/rc/billing/ua1/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/>
+            </locations>
+        </cluster>
+        <cluster type="target" name="beta">
+            <validity end="2012-10-01T12:26Z" start="2012-10-01T12:10Z"/>
+            <retention action="delete" limit="days(10000)"/>
+            <locations>
+                <location path="/localDC/rc/billing/ua2/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location
+                path="/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/"
+                type="data"/>
+        <location path="/data/regression/fetlrc/billing/stats" type="stats"/>
+        <location path="/data/regression/fetlrc/billing/metadata"
+                  type="meta"/>
+    </locations>
+
+    <ACL permission="0x755" group="group" owner="fetl"/>
+    <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+    <properties>
+        <property name="maxMaps" value="33" />
+        <property name="mapBandwidthKB" value="2048" />
+    </properties>
+</feed>


Mime
View raw message