falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [4/5] FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS
Date Tue, 18 Mar 2014 11:41:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
deleted file mode 100644
index e610df2..0000000
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DECISION;
-import org.apache.falcon.oozie.workflow.JAVA;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests for Oozie workflow definition for feed replication & retention.
- */
-public class OozieFeedMapperTest {
-    private EmbeddedCluster srcMiniDFS;
-    private EmbeddedCluster trgMiniDFS;
-    private final ConfigurationStore store = ConfigurationStore.get();
-    private Cluster srcCluster;
-    private Cluster trgCluster;
-    private Cluster alphaTrgCluster;
-    private Cluster betaTrgCluster;
-    private Feed feed;
-    private Feed tableFeed;
-    private Feed fsReplFeed;
-
-    private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
-    private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
-    private static final String FEED = "/feed.xml";
-    private static final String TABLE_FEED = "/table-replication-feed.xml";
-    private static final String FS_REPLICATION_FEED = "/fs-replication-feed.xml";
-
-    @BeforeClass
-    public void setUpDFS() throws Exception {
-        CurrentUser.authenticate("falcon");
-
-        srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
-        String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
-
-        trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
-        String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
-
-        cleanupStore();
-
-        srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
-        trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
-        alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-alpha.xml", trgHdfsUrl);
-        betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-beta.xml", trgHdfsUrl);
-
-        feed = (Feed) storeEntity(EntityType.FEED, FEED, null);
-        fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED, null);
-        tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED, null);
-    }
-
-    protected Entity storeEntity(EntityType type, String template, String writeEndpoint) throws Exception {
-        Unmarshaller unmarshaller = type.getUnmarshaller();
-        Entity entity = (Entity) unmarshaller
-                .unmarshal(OozieFeedMapperTest.class.getResource(template));
-        store.publish(type, entity);
-
-        if (type == EntityType.CLUSTER) {
-            Cluster cluster = (Cluster) entity;
-            ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
-            FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
-            fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
-            fs.create(
-                    new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
-        }
-        return entity;
-    }
-
-    protected void cleanupStore() throws FalconException {
-        for (EntityType type : EntityType.values()) {
-            Collection<String> entities = store.getEntities(type);
-            for (String entity : entities) {
-                store.remove(type, entity);
-            }
-        }
-    }
-
-    @AfterClass
-    public void stopDFS() {
-        srcMiniDFS.shutdown();
-        trgMiniDFS.shutdown();
-    }
-
-    @Test
-    public void testReplicationCoordsForFSStorage() throws Exception {
-        OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
-        List<COORDINATORAPP> coords = feedMapper.getCoordinators(trgCluster,
-                new Path("/projects/falcon/"));
-        //Assert retention coord
-        COORDINATORAPP coord = coords.get(0);
-        assertLibExtensions(coord, "retention");
-
-        //Assert replication coord
-        coord = coords.get(1);
-        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
-        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
-                .getAction().getWorkflow().getAppPath());
-        Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
-                + srcCluster.getName(), coord.getName());
-        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
-        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
-                .getDatasetOrAsyncDataset().get(0);
-        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
-                .getDatasetOrAsyncDataset().get(1);
-
-        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
-        Assert.assertEquals("input-dataset", inputDataset.getName());
-        Assert.assertEquals(
-                ClusterHelper.getReadOnlyStorageUrl(srcCluster)
-                        + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
-                inputDataset.getUriTemplate());
-
-        Assert.assertEquals("${coord:minutes(20)}",
-                outputDataset.getFrequency());
-        Assert.assertEquals("output-dataset", outputDataset.getName());
-        Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
-                + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
-                        outputDataset.getUriTemplate());
-        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
-        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
-        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
-        Assert.assertEquals("input", inEventName);
-        Assert.assertEquals("input-dataset", inEventDataset);
-        Assert.assertEquals("${now(0,-40)}", inEventInstance);
-
-        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
-        Assert.assertEquals("${now(0,-40)}", outEventInstance);
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        // verify the replication param that feed replicator depends on
-        String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
-        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
-
-        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
-        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
-
-        // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
-
-        // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), feed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
-
-        // verify workflow params
-        Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
-        Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
-        Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
-
-        // verify default params
-        Assert.assertEquals(props.get("queueName"), "default");
-        Assert.assertEquals(props.get("jobPriority"), "NORMAL");
-        Assert.assertEquals(props.get("maxMaps"), "5");
-
-        assertLibExtensions(coord, "replication");
-        assertWorkflowRetries(coord);
-    }
-
-    private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
-        WORKFLOWAPP wf = getWorkflowapp(coord);
-        List<Object> actions = wf.getDecisionOrForkOrJoin();
-        for (Object obj : actions) {
-            if (!(obj instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) obj;
-            String actionName = action.getName();
-            if (AbstractOozieEntityMapper.FALCON_ACTIONS.contains(actionName)) {
-                Assert.assertEquals(action.getRetryMax(), "3");
-                Assert.assertEquals(action.getRetryInterval(), "1");
-            }
-        }
-    }
-
-    private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
-        WORKFLOWAPP wf = getWorkflowapp(coord);
-        List<Object> actions = wf.getDecisionOrForkOrJoin();
-        for (Object obj : actions) {
-            if (!(obj instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) obj;
-            List<String> files = null;
-            if (action.getJava() != null) {
-                files = action.getJava().getFile();
-            } else if (action.getPig() != null) {
-                files = action.getPig().getFile();
-            } else if (action.getMapReduce() != null) {
-                files = action.getMapReduce().getFile();
-            }
-            if (files != null) {
-                Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
-                        + lifecycle + "/ext.jar"));
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
-                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
-    }
-
-    @Test
-    public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
-        OozieFeedMapper feedMapper = new OozieFeedMapper(fsReplFeed);
-
-        List<COORDINATORAPP> alphaCoords = feedMapper.getCoordinators(alphaTrgCluster, new Path("/alpha/falcon/"));
-        final COORDINATORAPP alphaCoord = alphaCoords.get(0);
-        Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
-        Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
-
-        String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
-        assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
-
-        List<COORDINATORAPP> betaCoords = feedMapper.getCoordinators(betaTrgCluster, new Path("/beta/falcon/"));
-        final COORDINATORAPP betaCoord = betaCoords.get(0);
-        Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
-        Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
-
-        pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
-        assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
-    }
-
-    private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
-                                          Feed aFeed) throws FalconException {
-        String srcPart = FeedHelper.normalizePartitionExpression(
-                FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
-        srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
-        String targetPart = FeedHelper.normalizePartitionExpression(
-                FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
-        targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
-
-        StringBuilder pathsWithPartitions = new StringBuilder();
-        pathsWithPartitions.append("${coord:dataIn('input')}/")
-                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
-        String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
-        parts = StringUtils.stripEnd(parts, "/");
-        return parts;
-    }
-
-    private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
-                                 String pathsWithPartitions) throws JAXBException, IOException {
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
-        Date startDate = feedCluster.getValidity().getStart();
-        Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
-
-        Date endDate = feedCluster.getValidity().getEnd();
-        Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
-
-        WORKFLOWAPP workflow = getWorkflowapp(coord);
-        assertWorkflowDefinition(fsReplFeed, workflow);
-
-        List<Object> actions = workflow.getDecisionOrForkOrJoin();
-        System.out.println("actions = " + actions);
-
-        ACTION replicationActionNode = (ACTION) actions.get(4);
-        Assert.assertEquals(replicationActionNode.getName(), "replication");
-
-        JAVA replication = replicationActionNode.getJava();
-        List<String> args = replication.getArg();
-        Assert.assertEquals(args.size(), 11);
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
-        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
-        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
-        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
-        Assert.assertEquals(props.get("maxMaps"), "33");
-    }
-
-    public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
-        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
-        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
-        Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
-        Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
-        Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
-        Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
-        Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
-    }
-
-    @Test
-    public void testReplicationCoordsForTableStorage() throws Exception {
-        OozieFeedMapper feedMapper = new OozieFeedMapper(tableFeed);
-        List<COORDINATORAPP> coords = feedMapper.getCoordinators(
-                trgCluster, new Path("/projects/falcon/"));
-        COORDINATORAPP coord = coords.get(0);
-
-        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
-        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION",
-                coord.getAction().getWorkflow().getAppPath());
-        Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
-                + srcCluster.getName(), coord.getName());
-        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
-
-        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
-                .getDatasetOrAsyncDataset().get(0);
-        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
-        Assert.assertEquals("input-dataset", inputDataset.getName());
-
-        String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
-        sourceRegistry = sourceRegistry.replace("thrift", "hcat");
-        Assert.assertEquals(inputDataset.getUriTemplate(),
-                sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
-
-        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
-                .getDatasetOrAsyncDataset().get(1);
-        Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
-        Assert.assertEquals("output-dataset", outputDataset.getName());
-
-        String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
-        targetRegistry = targetRegistry.replace("thrift", "hcat");
-        Assert.assertEquals(outputDataset.getUriTemplate(),
-                targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
-
-        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
-        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
-        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
-        Assert.assertEquals("input", inEventName);
-        Assert.assertEquals("input-dataset", inEventDataset);
-        Assert.assertEquals("${now(0,-40)}", inEventInstance);
-
-        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
-        Assert.assertEquals("${now(0,-40)}", outEventInstance);
-
-        // assert FS staging area
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        final FileSystem fs = trgMiniDFS.getFileSystem();
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
-
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
-        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
-
-        // verify the replication param that feed replicator depends on
-        Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
-
-        Assert.assertTrue(props.containsKey("distcpSourcePaths"));
-        Assert.assertEquals(props.get("distcpSourcePaths"),
-                FeedHelper.getStagingDir(srcCluster, tableFeed, srcStorage, Tag.REPLICATION)
-                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
-                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
-
-        Assert.assertTrue(props.containsKey("distcpTargetPaths"));
-        Assert.assertEquals(props.get("distcpTargetPaths"),
-                FeedHelper.getStagingDir(trgCluster, tableFeed, trgStorage, Tag.REPLICATION)
-                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
-                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
-
-        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
-
-        // verify table props
-        assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
-        assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
-
-        // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
-
-        // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
-    }
-
-    private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
-                                              Map<String, String> props, String prefix) {
-        Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
-        Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
-        Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
-
-        Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
-        Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
-        Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
-    }
-
-    @Test
-    public void testRetentionCoords() throws FalconException, JAXBException, IOException {
-        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
-        final Calendar instance = Calendar.getInstance();
-        instance.roll(Calendar.YEAR, 1);
-        cluster.getValidity().setEnd(instance.getTime());
-
-        OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
-        List<COORDINATORAPP> coords = feedMapper.getCoordinators(srcCluster, new Path("/projects/falcon/"));
-        COORDINATORAPP coord = coords.get(0);
-
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
-        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
-        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        String feedDataPath = props.get("feedDataPath");
-        String storageType = props.get("falconFeedStorageType");
-
-        // verify the param that feed evictor depends on
-        Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
-
-        final Storage storage = FeedHelper.createStorage(cluster, feed);
-        if (feedDataPath != null) {
-            Assert.assertEquals(feedDataPath, storage.getUriTemplate()
-                    .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-        }
-
-        if (storageType != null) {
-            Assert.assertEquals(storageType, storage.getType().name());
-        }
-
-        // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), feed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
-
-        assertWorkflowRetries(coord);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
new file mode 100644
index 0000000..182d9cb
--- /dev/null
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.converter;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.JAVA;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.workflow.OozieFeedWorkflowBuilder;
+import org.apache.falcon.workflow.OozieWorkflowBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for Oozie workflow definition for feed replication & retention.
+ */
+public class OozieFeedWorkflowBuilderTest {
+    private EmbeddedCluster srcMiniDFS;
+    private EmbeddedCluster trgMiniDFS;
+    private final ConfigurationStore store = ConfigurationStore.get();
+    private Cluster srcCluster;
+    private Cluster trgCluster;
+    private Cluster alphaTrgCluster;
+    private Cluster betaTrgCluster;
+    private Feed feed;
+    private Feed tableFeed;
+    private Feed fsReplFeed;
+
+    private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
+    private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
+    private static final String FEED = "/feed.xml";
+    private static final String TABLE_FEED = "/table-replication-feed.xml";
+    private static final String FS_REPLICATION_FEED = "/fs-replication-feed.xml";
+
+    @BeforeClass
+    public void setUpDFS() throws Exception {
+        CurrentUser.authenticate("falcon");
+
+        srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
+        String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
+
+        trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
+        String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
+
+        cleanupStore();
+
+        srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
+        trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
+        alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-alpha.xml", trgHdfsUrl);
+        betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-beta.xml", trgHdfsUrl);
+
+        feed = (Feed) storeEntity(EntityType.FEED, FEED, null);
+        fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED, null);
+        tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED, null);
+    }
+
+    protected Entity storeEntity(EntityType type, String template, String writeEndpoint) throws Exception {
+        Unmarshaller unmarshaller = type.getUnmarshaller();
+        Entity entity = (Entity) unmarshaller
+                .unmarshal(OozieFeedWorkflowBuilderTest.class.getResource(template));
+        store.publish(type, entity);
+
+        if (type == EntityType.CLUSTER) {
+            Cluster cluster = (Cluster) entity;
+            ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
+            FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
+            fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
+            fs.create(
+                    new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
+        }
+        return entity;
+    }
+
+    protected void cleanupStore() throws FalconException {
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = store.getEntities(type);
+            for (String entity : entities) {
+                store.remove(type, entity);
+            }
+        }
+    }
+
+    @AfterClass
+    public void stopDFS() {
+        srcMiniDFS.shutdown();
+        trgMiniDFS.shutdown();
+    }
+
+    @Test
+    public void testReplicationCoordsForFSStorage() throws Exception {
+        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
+        List<COORDINATORAPP> coords = builder.getCoordinators(trgCluster, new Path("/projects/falcon/"));
+        //Assert retention coord
+        COORDINATORAPP coord = coords.get(0);
+        assertLibExtensions(coord, "retention");
+
+        //Assert replication coord
+        coord = coords.get(1);
+        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
+                .getAction().getWorkflow().getAppPath());
+        Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
+                + srcCluster.getName(), coord.getName());
+        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(0);
+        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(1);
+
+        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+        Assert.assertEquals("input-dataset", inputDataset.getName());
+        Assert.assertEquals(
+                ClusterHelper.getReadOnlyStorageUrl(srcCluster)
+                        + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+                inputDataset.getUriTemplate());
+
+        Assert.assertEquals("${coord:minutes(20)}",
+                outputDataset.getFrequency());
+        Assert.assertEquals("output-dataset", outputDataset.getName());
+        Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
+                + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+                        outputDataset.getUriTemplate());
+        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+        Assert.assertEquals("input", inEventName);
+        Assert.assertEquals("input-dataset", inEventDataset);
+        Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+        Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify the replication param that feed replicator depends on
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
+        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+        // verify workflow params
+        Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
+        Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
+        Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
+
+        // verify default params
+        Assert.assertEquals(props.get("queueName"), "default");
+        Assert.assertEquals(props.get("jobPriority"), "NORMAL");
+        Assert.assertEquals(props.get("maxMaps"), "5");
+
+        assertLibExtensions(coord, "replication");
+        assertWorkflowRetries(coord);
+    }
+
+    private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
+        WORKFLOWAPP wf = getWorkflowapp(coord);
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            String actionName = action.getName();
+            if (OozieWorkflowBuilder.FALCON_ACTIONS.contains(actionName)) {
+                Assert.assertEquals(action.getRetryMax(), "3");
+                Assert.assertEquals(action.getRetryInterval(), "1");
+            }
+        }
+    }
+
+    private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
+        WORKFLOWAPP wf = getWorkflowapp(coord);
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            List<String> files = null;
+            if (action.getJava() != null) {
+                files = action.getJava().getFile();
+            } else if (action.getPig() != null) {
+                files = action.getPig().getFile();
+            } else if (action.getMapReduce() != null) {
+                files = action.getMapReduce().getFile();
+            }
+            if (files != null) {
+                Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
+                        + lifecycle + "/ext.jar"));
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
+                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
+    }
+
+    @Test
+    public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
+        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(fsReplFeed);
+
+        List<COORDINATORAPP> alphaCoords = builder.getCoordinators(alphaTrgCluster, new Path("/alpha/falcon/"));
+        final COORDINATORAPP alphaCoord = alphaCoords.get(0);
+        Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
+        Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
+
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
+        assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
+
+        List<COORDINATORAPP> betaCoords = builder.getCoordinators(betaTrgCluster, new Path("/beta/falcon/"));
+        final COORDINATORAPP betaCoord = betaCoords.get(0);
+        Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
+        Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
+
+        pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
+        assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
+    }
+
+    private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
+                                          Feed aFeed) throws FalconException {
+        String srcPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
+        srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
+        String targetPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
+        targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
+
+        StringBuilder pathsWithPartitions = new StringBuilder();
+        pathsWithPartitions.append("${coord:dataIn('input')}/")
+                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+        String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+        parts = StringUtils.stripEnd(parts, "/");
+        return parts;
+    }
+
+    private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
+                                 String pathsWithPartitions) throws JAXBException, IOException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
+        Date startDate = feedCluster.getValidity().getStart();
+        Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
+
+        Date endDate = feedCluster.getValidity().getEnd();
+        Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
+
+        WORKFLOWAPP workflow = getWorkflowapp(coord);
+        assertWorkflowDefinition(fsReplFeed, workflow);
+
+        List<Object> actions = workflow.getDecisionOrForkOrJoin();
+        System.out.println("actions = " + actions);
+
+        ACTION replicationActionNode = (ACTION) actions.get(4);
+        Assert.assertEquals(replicationActionNode.getName(), "replication");
+
+        JAVA replication = replicationActionNode.getJava();
+        List<String> args = replication.getArg();
+        Assert.assertEquals(args.size(), 11);
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+        Assert.assertEquals(props.get("maxMaps"), "33");
+    }
+
+    public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+        Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+        Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+        Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+        Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+    }
+
+    @Test
+    public void testReplicationCoordsForTableStorage() throws Exception {
+        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(tableFeed);
+        List<COORDINATORAPP> coords = builder.getCoordinators(
+                trgCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
+
+        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION",
+                coord.getAction().getWorkflow().getAppPath());
+        Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
+                + srcCluster.getName(), coord.getName());
+        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+
+        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(0);
+        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+        Assert.assertEquals("input-dataset", inputDataset.getName());
+
+        String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
+        sourceRegistry = sourceRegistry.replace("thrift", "hcat");
+        Assert.assertEquals(inputDataset.getUriTemplate(),
+                sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(1);
+        Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
+        Assert.assertEquals("output-dataset", outputDataset.getName());
+
+        String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
+        targetRegistry = targetRegistry.replace("thrift", "hcat");
+        Assert.assertEquals(outputDataset.getUriTemplate(),
+                targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+        Assert.assertEquals("input", inEventName);
+        Assert.assertEquals("input-dataset", inEventDataset);
+        Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+        Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+        // assert FS staging area
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        final FileSystem fs = trgMiniDFS.getFileSystem();
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
+
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
+        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
+
+        // verify the replication param that feed replicator depends on
+        Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
+
+        Assert.assertTrue(props.containsKey("distcpSourcePaths"));
+        Assert.assertEquals(props.get("distcpSourcePaths"),
+                FeedHelper.getStagingDir(srcCluster, tableFeed, srcStorage, Tag.REPLICATION)
+                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+        Assert.assertTrue(props.containsKey("distcpTargetPaths"));
+        Assert.assertEquals(props.get("distcpTargetPaths"),
+                FeedHelper.getStagingDir(trgCluster, tableFeed, trgStorage, Tag.REPLICATION)
+                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
+
+        // verify table props
+        assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
+        assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+    }
+
+    private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+                                              Map<String, String> props, String prefix) {
+        Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
+        Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
+        Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
+
+        Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
+        Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
+        Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
+    }
+
+    @Test
+    public void testRetentionCoords() throws FalconException, JAXBException, IOException {
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+        final Calendar instance = Calendar.getInstance();
+        instance.roll(Calendar.YEAR, 1);
+        cluster.getValidity().setEnd(instance.getTime());
+
+        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
+        List<COORDINATORAPP> coords = builder.getCoordinators(srcCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
+
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String feedDataPath = props.get("feedDataPath");
+        String storageType = props.get("falconFeedStorageType");
+
+        // verify the param that feed evictor depends on
+        Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
+
+        final Storage storage = FeedHelper.createStorage(cluster, feed);
+        if (feedDataPath != null) {
+            Assert.assertEquals(feedDataPath, storage.getUriTemplate()
+                    .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+        }
+
+        if (storageType != null) {
+            Assert.assertEquals(storageType, storage.getType().name());
+        }
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+
+        assertWorkflowRetries(coord);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
deleted file mode 100644
index f443939..0000000
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconRuntimException;
-import org.apache.falcon.Tag;
-import org.apache.commons.io.IOUtils;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.ExternalId;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.bundle.COORDINATOR;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.ObjectFactory;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.service.FalconPathFilter;
-import org.apache.falcon.service.SharedLibraryHostingService;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
-
-import javax.xml.bind.*;
-import java.io.*;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * Entity mapper base class that allows an entity to be mapped to oozie bundle.
- * @param <T>
- */
-public abstract class AbstractOozieEntityMapper<T extends Entity> {
-
-    private static final Logger LOG = Logger.getLogger(AbstractOozieEntityMapper.class);
-
-    protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
-
-    protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
-    protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
-    protected static final String MR_QUEUE_NAME = "queueName";
-    protected static final String MR_JOB_PRIORITY = "jobPriority";
-
-    protected static final JAXBContext WORKFLOW_JAXB_CONTEXT;
-    protected static final JAXBContext COORD_JAXB_CONTEXT;
-    protected static final JAXBContext BUNDLE_JAXB_CONTEXT;
-    protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
-    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(new String[] { "recordsize",
-        "succeeded-post-processing", "failed-post-processing", }));
-
-    protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
-        @Override
-        public boolean accept(Path path) {
-            return path.getName().startsWith("falcon");
-        }
-
-        @Override
-        public String getJarName(Path path) {
-            String name = path.getName();
-            if (name.endsWith(".jar")) {
-                name = name.substring(0, name.indexOf(".jar"));
-            }
-            return name;
-        }
-    };
-
-    static {
-        try {
-            WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
-            COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
-            BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
-            HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
-                    org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
-        } catch (JAXBException e) {
-            throw new RuntimeException("Unable to create JAXB context", e);
-        }
-    }
-
-    private final T entity;
-
-    protected AbstractOozieEntityMapper(T entity) {
-        this.entity = entity;
-    }
-
-    protected T getEntity() {
-        return entity;
-    }
-
-    protected Path getCoordPath(Path bundlePath, String coordName) {
-        Tag tag = EntityUtil.getWorkflowNameTag(coordName, getEntity());
-        return new Path(bundlePath, tag.name());
-    }
-
-    protected abstract Map<String, String> getEntityProperties();
-
-    public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
-        BUNDLEAPP bundleApp = new BUNDLEAPP();
-        bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
-        // all the properties are set prior to bundle and coordinators creation
-
-        List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
-        if (coordinators.size() == 0) {
-            return false;
-        }
-        for (COORDINATORAPP coordinatorapp : coordinators) {
-            Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
-            String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
-                    EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
-            createLogsDir(cluster, coordPath);
-            COORDINATOR bundleCoord = new COORDINATOR();
-            bundleCoord.setName(coordinatorapp.getName());
-            bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
-            bundleApp.getCoordinator().add(bundleCoord);
-
-            copySharedLibs(cluster, coordPath);
-        }
-
-        marshal(cluster, bundleApp, bundlePath);
-        return true;
-    }
-
-    private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
-        FileStatus[] libs = null;
-        try {
-            libs = fs.listStatus(path);
-        } catch(FileNotFoundException ignore) {
-            //Ok if the libext is not configured
-        }
-
-        if (libs == null) {
-            return;
-        }
-
-        for(FileStatus lib : libs) {
-            if (lib.isDir()) {
-                continue;
-            }
-
-            for(Object obj: wf.getDecisionOrForkOrJoin()) {
-                if (!(obj instanceof ACTION)) {
-                    continue;
-                }
-                ACTION action = (ACTION) obj;
-                List<String> files = null;
-                if (action.getJava() != null) {
-                    files = action.getJava().getFile();
-                } else if (action.getPig() != null) {
-                    files = action.getPig().getFile();
-                } else if (action.getMapReduce() != null) {
-                    files = action.getMapReduce().getFile();
-                }
-                if (files != null) {
-                    files.add(lib.getPath().toString());
-                }
-            }
-        }
-    }
-
-    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
-        throws IOException, FalconException {
-        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
-        addExtensionJars(fs, new Path(libext), wf);
-        addExtensionJars(fs, new Path(libext, type.name()), wf);
-        if (StringUtils.isNotEmpty(lifecycle)) {
-            addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
-        }
-    }
-
-    private void copySharedLibs(Cluster cluster, Path coordPath) throws FalconException {
-        try {
-            Path libPath = new Path(coordPath, "lib");
-            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
-                    libPath, cluster, FALCON_JAR_FILTER);
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
-        }
-    }
-
-    protected abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
-
-    protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
-        org.apache.falcon.oozie.coordinator.CONFIGURATION conf
-            = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
-        List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
-        for (Entry<String, String> prop : propMap.entrySet()) {
-            props.add(createCoordProperty(prop.getKey(), prop.getValue()));
-        }
-        return conf;
-    }
-
-    protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
-        Map<String, String> props = new HashMap<String, String>();
-        props.put(ARG.entityName.getPropName(), entity.getName());
-        props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
-        props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
-        props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
-        props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
-        String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
-                "tcp://localhost:61616?daemon=true");
-        props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
-        String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
-                ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
-        props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
-        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
-                DEFAULT_BROKER_MSG_TTL.toString());
-        props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
-        props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
-        props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
-        props.put(OozieClient.EXTERNAL_ID,
-                new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
-                        "${coord:nominalTime()}").getId());
-        props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
-        try {
-            if (EntityUtil.getLateProcess(entity) == null
-                    || EntityUtil.getLateProcess(entity).getLateInputs() == null
-                    || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
-                props.put("shouldRecord", "false");
-            } else {
-                props.put("shouldRecord", "true");
-            }
-        } catch (FalconException e) {
-            LOG.error("Unable to get Late Process for entity:" + entity, e);
-            throw new FalconRuntimException(e);
-        }
-        props.put("entityName", entity.getName());
-        props.put("entityType", entity.getEntityType().name().toLowerCase());
-        props.put(ARG.cluster.getPropName(), cluster.getName());
-        if (cluster.getProperties() != null) {
-            for (Property prop : cluster.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-
-        props.put(MR_QUEUE_NAME, "default");
-        props.put(MR_JOB_PRIORITY, "NORMAL");
-        //props in entity override the set props.
-        props.putAll(getEntityProperties());
-        return props;
-    }
-
-    protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
-                                                                                             String value) {
-        org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
-            = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
-        prop.setName(name);
-        prop.setValue(value);
-        return prop;
-    }
-
-    protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
-        throws FalconException {
-        try {
-            Marshaller marshaller = jaxbContext.createMarshaller();
-            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
-            OutputStream out = fs.create(outPath);
-            try {
-                marshaller.marshal(jaxbElement, out);
-            } finally {
-                out.close();
-            }
-            if (LOG.isDebugEnabled()) {
-                StringWriter writer = new StringWriter();
-                marshaller.marshal(jaxbElement, writer);
-                LOG.debug("Writing definition to " + outPath + " on cluster " + cluster.getName());
-                LOG.debug(writer.getBuffer());
-            }
-
-            LOG.info("Marshalled " + jaxbElement.getDeclaredType() + " to " + outPath);
-        } catch (Exception e) {
-            throw new FalconException("Unable to marshall app object", e);
-        }
-    }
-
-    private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                    coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
-            Path logsDir = new Path(coordPath, "../../logs");
-            fs.mkdirs(logsDir);
-
-            // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(logsDir, permission);
-        } catch (Exception e) {
-            throw new FalconException("Unable to create temp dir in " + coordPath, e);
-        }
-    }
-
-    protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath, String name) throws FalconException {
-        if (StringUtils.isEmpty(name)) {
-            name = "coordinator";
-        }
-        name = name + ".xml";
-        marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), COORD_JAXB_CONTEXT, new Path(outPath, name));
-        return name;
-    }
-
-    protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
-
-        marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
-                BUNDLE_JAXB_CONTEXT,
-                new Path(outPath, "bundle.xml"));
-    }
-
-    protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
-
-        marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
-                WORKFLOW_JAXB_CONTEXT,
-                new Path(outPath, "workflow.xml"));
-    }
-
-    protected String getStoragePath(Path path) {
-        if (path != null) {
-            return getStoragePath(path.toString());
-        }
-        return null;
-    }
-
-    protected String getStoragePath(String path) {
-        if (StringUtils.isNotEmpty(path)) {
-            if (new Path(path).toUri().getScheme() == null) {
-                path = "${nameNode}" + path;
-            }
-        }
-        return path;
-    }
-
-    protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
-        InputStream resourceAsStream = null;
-        try {
-            resourceAsStream = AbstractOozieEntityMapper.class.getResourceAsStream(template);
-            Unmarshaller unmarshaller = WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked")
-            JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
-                    resourceAsStream);
-            return jaxbElement.getValue();
-        } catch (JAXBException e) {
-            throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
-        }
-    }
-
-    protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
-        InputStream resourceAsStream = null;
-        try {
-            resourceAsStream = AbstractOozieEntityMapper.class.getResourceAsStream(template);
-            Unmarshaller unmarshaller = COORD_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked")
-            JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
-                    unmarshaller.unmarshal(resourceAsStream);
-            return jaxbElement.getValue();
-        } catch (JAXBException e) {
-            throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
-        }
-    }
-
-    protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
-                                  Cluster cluster, String prefix) throws IOException {
-        Configuration hiveConf = new Configuration(false);
-        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
-        hiveConf.set("hive.metastore.local", "false");
-
-        if (UserGroupInformation.isSecurityEnabled()) {
-            hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
-                    ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
-            hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
-        }
-
-        OutputStream out = null;
-        try {
-            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
-            hiveConf.writeXml(out);
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-    }
-
-    protected void decorateWithOozieRetries(ACTION action) {
-        Properties props = RuntimeProperties.get();
-        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
-        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 2f53370..9e1c82d 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -17,8 +17,20 @@
  */
 package org.apache.falcon.util;
 
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.hive.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.xerces.dom.ElementNSImpl;
+import org.w3c.dom.Document;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.dom.DOMResult;
 import java.io.ByteArrayInputStream;
 import java.util.Map;
 import java.util.Properties;
@@ -27,6 +39,23 @@ import java.util.Properties;
  * Help methods relating to oozie configuration.
  */
 public final class OozieUtils {
+    public static final JAXBContext WORKFLOW_JAXB_CONTEXT;
+    public static final JAXBContext COORD_JAXB_CONTEXT;
+    public static final JAXBContext BUNDLE_JAXB_CONTEXT;
+    protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
+
+    static {
+        try {
+            WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+            COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
+            BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
+            HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
+                org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXB context", e);
+        }
+    }
+
 
     private OozieUtils() {}
 
@@ -39,4 +68,29 @@ public final class OozieUtils {
         }
         return jobprops;
     }
+
+    @SuppressWarnings("unchecked")
+    public static  JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) {
+        try {
+            Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
+            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+            return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
+                unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to unmarshall hive action.", e);
+        }
+    }
+
+    public static  void marshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction,
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
+        try {
+            DOMResult hiveActionDOM = new DOMResult();
+            Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(actionjaxbElement, hiveActionDOM);
+            wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to marshall hive action.", e);
+        }
+    }
+
 }


Mime
View raw message