falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [08/12] FALCON-85 Hive (HCatalog) integration. Contributed by Venkatesh Seetharam FALCON-163 Merge FALCON-85 branch into main line. Contributed by Venkatesh Seetharam
Date Tue, 12 Nov 2013 11:05:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index b6052e7..fb066ab 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -18,12 +18,15 @@
 
 package org.apache.falcon.converter;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -42,10 +45,13 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.*;
 
 /**
@@ -56,15 +62,8 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
 
     private static final Logger LOG = Logger.getLogger(OozieFeedMapper.class);
 
-    private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
-    private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
-    private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
-    private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
-
-    private static final String FEED_PATH_SEP = "#";
-    private static final String TIMEOUT = "timeout";
-    private static final String PARALLEL = "parallel";
+    private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
+    private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
 
     public OozieFeedMapper(Feed feed) {
         super(feed);
@@ -92,61 +91,8 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                     + " is not in the future");
             return null;
         }
-        COORDINATORAPP retentionApp = new COORDINATORAPP();
-        String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
-        retentionApp.setName(coordName);
-        retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
-        retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
-        retentionApp.setTimezone(feed.getTimezone().getID());
-        TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
-        if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
-            retentionApp.setFrequency("${coord:hours(6)}");
-        } else {
-            retentionApp.setFrequency("${coord:days(1)}");
-        }
 
-        Path wfPath = getCoordPath(bundlePath, coordName);
-        retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
-        return retentionApp;
-    }
-
-    private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName) throws FalconException {
-        Feed feed = getEntity();
-        ACTION retentionAction = new ACTION();
-        WORKFLOW retentionWorkflow = new WORKFLOW();
-        createRetentionWorkflow(cluster, wfPath, wfName);
-        retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
-
-        Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
-
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-        String feedPathMask = getLocationURI(cluster, feed, LocationType.DATA);
-        String metaPathMask = getLocationURI(cluster, feed, LocationType.META);
-        String statsPathMask = getLocationURI(cluster, feed, LocationType.STATS);
-        String tmpPathMask = getLocationURI(cluster, feed, LocationType.TMP);
-
-        StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
-        if (metaPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
-        }
-        if (statsPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
-        }
-        if (tmpPathMask != null) {
-            feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
-        }
-
-        props.put("feedDataPath", feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{"));
-        props.put("timeZone", feed.getTimezone().getID());
-        props.put("frequency", feed.getFrequency().getTimeUnit().name());
-        props.put("limit", feedCluster.getRetention().getLimit().toString());
-        props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
-        props.put(ARG.feedNames.getPropName(), feed.getName());
-        props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
-
-        retentionWorkflow.setConfiguration(getCoordConfig(props));
-        retentionAction.setWorkflow(retentionWorkflow);
-        return retentionAction;
+        return retentionMapper.getRetentionCoordinator(cluster, bundlePath, feed, feedCluster);
     }
 
     private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
@@ -158,52 +104,241 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         if (FeedHelper.getCluster(feed, targetCluster.getName()).getType() == ClusterType.TARGET) {
             String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, feed).toString();
             Path basePath = getCoordPath(bundlePath, coordName);
-            createReplicatonWorkflow(targetCluster, basePath, coordName);
+            replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
 
             for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
                 if (feedCluster.getType() == ClusterType.SOURCE) {
-                    COORDINATORAPP coord = createAndGetCoord(feed,
+                    COORDINATORAPP coord = replicationMapper.createAndGetCoord(feed,
                             (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
-                            targetCluster,
-                            bundlePath);
+                            targetCluster, bundlePath);
+
                     if (coord != null) {
                         replicationCoords.add(coord);
                     }
                 }
             }
-
         }
+
         return replicationCoords;
     }
 
-    private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster, Path bundlePath)
-        throws FalconException {
+    @Override
+    protected Map<String, String> getEntityProperties() {
+        Feed feed = getEntity();
+        Map<String, String> props = new HashMap<String, String>();
+        if (feed.getProperties() != null) {
+            for (Property prop : feed.getProperties().getProperties()) {
+                props.put(prop.getName(), prop.getValue());
+            }
+        }
+        return props;
+    }
+
+    private final class RetentionOozieWorkflowMapper {
+
+        private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
+
+        private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
+                                                       org.apache.falcon.entity.v0.feed.Cluster feedCluster)
+            throws FalconException {
+
+            COORDINATORAPP retentionApp = new COORDINATORAPP();
+            String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
+            retentionApp.setName(coordName);
+            retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+            retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
+            retentionApp.setTimezone(feed.getTimezone().getID());
+            TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
+            if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
+                retentionApp.setFrequency("${coord:hours(6)}");
+            } else {
+                retentionApp.setFrequency("${coord:days(1)}");
+            }
+
+            Path wfPath = getCoordPath(bundlePath, coordName);
+            retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
+            return retentionApp;
+        }
+
+        private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
+            throws FalconException {
+            Feed feed = getEntity();
+            ACTION retentionAction = new ACTION();
+            WORKFLOW retentionWorkflow = new WORKFLOW();
+            createRetentionWorkflow(cluster, wfPath, wfName);
+            retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
+
+            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
+            props.put("timeZone", feed.getTimezone().getID());
+            props.put("frequency", feed.getFrequency().getTimeUnit().name());
+
+            final Storage storage = FeedHelper.createStorage(cluster, feed);
+            props.put("falconFeedStorageType", storage.getType().name());
+
+            String feedDataPath = storage.getUriTemplate();
+            props.put("feedDataPath",
+                    feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                    FeedHelper.getCluster(feed, cluster.getName());
+            props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+            props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
+            props.put(ARG.feedNames.getPropName(), feed.getName());
+            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+
+            retentionWorkflow.setConfiguration(getCoordConfig(props));
+            retentionAction.setWorkflow(retentionWorkflow);
+
+            return retentionAction;
+        }
+
+        private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
+            try {
+                WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
+                retWfApp.setName(wfName);
+                addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
+                marshal(cluster, retWfApp, wfPath);
+            } catch(IOException e) {
+                throw new FalconException("Unable to create retention workflow", e);
+            }
+        }
+    }
+
+    private class ReplicationOozieWorkflowMapper {
+        private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+        private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
+        private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
+
+        private static final String TIMEOUT = "timeout";
+        private static final String PARALLEL = "parallel";
+
+        private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
+            throws FalconException {
+            try {
+                WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
+                repWFapp.setName(wfName);
+                addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
+                marshal(cluster, repWFapp, wfPath);
+            } catch(IOException e) {
+                throw new FalconException("Unable to create replication workflow", e);
+            }
+        }
+
+        private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
+                                                 Path bundlePath) throws FalconException {
+            long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
+            Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
+            Date sourceEndDate = getEndDate(feed, srcCluster);
+
+            Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
+            Date targetEndDate = getEndDate(feed, trgCluster);
+
+            if (noOverlapExists(sourceStartDate, sourceEndDate,
+                    targetStartDate, targetEndDate)) {
+                LOG.warn("Not creating replication coordinator, as the source cluster:"
+                        + srcCluster.getName()
+                        + " and target cluster: "
+                        + trgCluster.getName()
+                        + " do not have overlapping dates");
+                return null;
+            }
+
+            COORDINATORAPP replicationCoord;
+            try {
+                replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
+            } catch (FalconException e) {
+                throw new FalconException("Cannot unmarshall replication coordinator template", e);
+            }
+
+            String coordName = EntityUtil.getWorkflowName(
+                    Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
+            String start = sourceStartDate.after(targetStartDate)
+                    ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
+            String end = sourceEndDate.before(targetEndDate)
+                    ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
+
+            initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
+            setCoordControls(feed, replicationCoord);
+
+            final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
+            initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
+
+            final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
+            initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
+
+            Path wfPath = getCoordPath(bundlePath, coordName);
+            ACTION replicationWorkflowAction = getReplicationWorkflowAction(
+                    srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
+            replicationCoord.setAction(replicationWorkflowAction);
+
+            return replicationCoord;
+        }
+
+        private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
+            Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
+            return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
+        }
+
+        private Date getEndDate(Feed feed, Cluster cluster) {
+            return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
+        }
+
+        private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
+                                        Date targetStartDate, Date targetEndDate) {
+            return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
+        }
 
-        COORDINATORAPP replicationCoord;
-        String coordName;
-        try {
-            replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
-            coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()),
-                    feed).toString();
+        private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
+                                               Feed feed, String start, String end, long delayInMillis) {
             replicationCoord.setName(coordName);
             replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
 
-            long frequencyInMillis = ExpressionHelper.get().
-                    evaluate(feed.getFrequency().toString(), Long.class);
+            if (delayInMillis > 0) {
+                long delayInMins = -1 * delayInMillis / (1000 * 60);
+                String elExp = "${now(0," + delayInMins + ")}";
+
+                replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
+                replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
+            }
+
+            replicationCoord.setStart(start);
+            replicationCoord.setEnd(end);
+            replicationCoord.setTimezone(feed.getTimezone().getID());
+        }
+
+        private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
+            Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
+            long delayInMillis=0;
+            if (replicationDelay != null) {
+                delayInMillis = ExpressionHelper.get().evaluate(
+                        replicationDelay.toString(), Long.class);
+            }
+
+            return delayInMillis;
+        }
+
+        private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
+            long frequencyInMillis = ExpressionHelper.get().evaluate(
+                    feed.getFrequency().toString(), Long.class);
             long timeoutInMillis = frequencyInMillis * 6;
             if (timeoutInMillis < THIRTY_MINUTES) {
                 timeoutInMillis = THIRTY_MINUTES;
             }
+
             Map<String, String> props = getEntityProperties();
             String timeout = props.get(TIMEOUT);
             if (timeout!=null) {
                 try{
-                    timeoutInMillis= ExpressionHelper.get().
-                            evaluate(timeout, Long.class);
+                    timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
                 } catch (Exception ignore) {
                     LOG.error("Unable to evaluate timeout:", ignore);
                 }
             }
+            replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+            replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+
             String parallelProp = props.get(PARALLEL);
             int parallel = 1;
             if (parallelProp != null) {
@@ -213,164 +348,199 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                     LOG.error("Unable to parse parallel:", ignore);
                 }
             }
-
-            replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-            replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
             replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
+        }
 
-            Frequency replicationDelay = FeedHelper.getCluster(feed,
-                    srcCluster.getName()).getDelay();
-            long delayInMillis=0;
-            if (replicationDelay != null) {
-                delayInMillis = ExpressionHelper.get().evaluate(
-                        replicationDelay.toString(), Long.class);
-                long delayInMins = -1 * delayInMillis / (1000 * 60);
-                String elExp = "${now(0," + delayInMins + ")}";
-                replicationCoord.getInputEvents().getDataIn().get(0)
-                .getInstance().set(0, elExp);
-                replicationCoord.getOutputEvents().getDataOut().get(0)
-                .setInstance(elExp);
-            }
-            Date srcStartDate = FeedHelper.getCluster(feed, srcCluster.getName()).getValidity().getStart();
-            srcStartDate=new Date(srcStartDate.getTime()+delayInMillis);
-            Date srcEndDate = FeedHelper.getCluster(feed, srcCluster.getName()).getValidity().getEnd();
-            Date trgStartDate = FeedHelper.getCluster(feed, trgCluster.getName()).getValidity().getStart();
-            Date trgEndDate = FeedHelper.getCluster(feed, trgCluster.getName()).getValidity().getEnd();
-            trgStartDate=new Date(trgStartDate.getTime()+delayInMillis);
-            if (srcStartDate.after(trgEndDate)
-                    || trgStartDate.after(srcEndDate)) {
-                LOG.warn("Not creating replication coordinator, as the source cluster:"
-                        + srcCluster.getName()
-                        + " and target cluster: "
-                        + trgCluster.getName()
-                        + " do not have overlapping dates");
-                return null;
+        private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
+                                            Storage sourceStorage) throws FalconException {
+            SYNCDATASET inputDataset = (SYNCDATASET)
+                    replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+            String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
+            if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
             }
-            replicationCoord.setStart(
-                    srcStartDate.after(trgStartDate) ? SchemaHelper.formatDateUTC(srcStartDate) : SchemaHelper
-                            .formatDateUTC(trgStartDate));
-            replicationCoord.setEnd(
-                    srcEndDate.before(trgEndDate) ? SchemaHelper.formatDateUTC(srcEndDate) : SchemaHelper
-                            .formatDateUTC(trgEndDate));
-            replicationCoord.setTimezone(feed.getTimezone().getID());
-            SYNCDATASET inputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
-            SYNCDATASET outputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
+            inputDataset.setUriTemplate(uriTemplate);
 
-            inputDataset.setUriTemplate(new Path(ClusterHelper.getReadOnlyStorageUrl(srcCluster),
-                FeedHelper.getLocation(feed, LocationType.DATA, srcCluster.getName()).getPath()).toString());
-            outputDataset.setUriTemplate(getStoragePath(
-                FeedHelper.getLocation(feed, LocationType.DATA, trgCluster.getName()).getPath()));
             setDatasetValues(inputDataset, feed, srcCluster);
-            setDatasetValues(outputDataset, feed, srcCluster);
+
             if (feed.getAvailabilityFlag() == null) {
                 inputDataset.setDoneFlag("");
             } else {
                 inputDataset.setDoneFlag(feed.getAvailabilityFlag());
             }
+        }
 
-        } catch (FalconException e) {
-            throw new FalconException("Cannot unmarshall replication coordinator template", e);
+        private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
+                                             Storage targetStorage) throws FalconException {
+            SYNCDATASET outputDataset = (SYNCDATASET)
+                    replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
+
+            String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
+            if (targetStorage.getType() == Storage.TYPE.TABLE) {
+                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+            }
+            outputDataset.setUriTemplate(uriTemplate);
+
+            setDatasetValues(outputDataset, feed, targetCluster);
         }
 
-        Path wfPath = getCoordPath(bundlePath, coordName);
-        replicationCoord.setAction(getReplicationWorkflowAction(srcCluster, trgCluster, wfPath, coordName));
-        return replicationCoord;
-    }
+        private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
+            dataset.setInitialInstance(SchemaHelper.formatDateUTC(
+                    FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
+            dataset.setTimezone(feed.getTimezone().getID());
+            dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+        }
 
-    private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
-        dataset.setInitialInstance(
-                SchemaHelper.formatDateUTC(FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
-        dataset.setTimezone(feed.getTimezone().getID());
-        dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-    }
+        private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
+                                                    String wfName, Storage sourceStorage,
+                                                    Storage targetStorage) throws FalconException {
+            ACTION replicationAction = new ACTION();
+            WORKFLOW replicationWF = new WORKFLOW();
+            try {
+                replicationWF.setAppPath(getStoragePath(wfPath.toString()));
+                Feed feed = getEntity();
+
+                Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
+                props.put("srcClusterName", srcCluster.getName());
+                props.put("srcClusterColo", srcCluster.getColo());
+
+                // the storage type is uniform across source and target feeds for replication
+                props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+                String instancePaths = null;
+                if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+                    String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
+                    instancePaths = pathsWithPartitions;
+
+                    propagateFileSystemCopyProperties(pathsWithPartitions, props);
+                } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+                    instancePaths = "${coord:dataIn('input')}";
+
+                    final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
+                    propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
+                    final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
+                    propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
+                    propagateTableCopyProperties(srcCluster, sourceTableStorage,
+                            trgCluster, targetTableStorage, props);
+                    setupHiveConfiguration(trgCluster, sourceTableStorage, targetTableStorage, wfPath);
+                }
 
-    private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath, String wfName)
-        throws FalconException {
+                propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
 
-        ACTION replicationAction = new ACTION();
-        WORKFLOW replicationWF = new WORKFLOW();
-        try {
-            replicationWF.setAppPath(getStoragePath(wfPath.toString()));
-            Feed feed = getEntity();
+                replicationWF.setConfiguration(getCoordConfig(props));
+                replicationAction.setWorkflow(replicationWF);
+
+            } catch (Exception e) {
+                throw new FalconException("Unable to create replication workflow", e);
+            }
+
+            return replicationAction;
+        }
 
+        private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
+                                              Feed feed) throws FalconException {
             String srcPart = FeedHelper.normalizePartitionExpression(
                     FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
             srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
+
             String targetPart = FeedHelper.normalizePartitionExpression(
                     FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
             targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
 
             StringBuilder pathsWithPartitions = new StringBuilder();
-            pathsWithPartitions.append("${coord:dataIn('input')}/").append(
-                    FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+            pathsWithPartitions.append("${coord:dataIn('input')}/")
+                    .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
 
-            Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
-            props.put("srcClusterName", srcCluster.getName());
-            props.put("srcClusterColo", srcCluster.getColo());
-            props.put(ARG.feedNames.getPropName(), feed.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), pathsWithPartitions.toString());
             String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
             parts = StringUtils.stripEnd(parts, "/");
-            props.put("sourceRelativePaths", parts);
+            return parts;
+        }
+
+        private void propagateFileSystemCopyProperties(String pathsWithPartitions,
+                                                       Map<String, String> props) throws FalconException {
+            props.put("sourceRelativePaths", pathsWithPartitions);
+
             props.put("distcpSourcePaths", "${coord:dataIn('input')}");
             props.put("distcpTargetPaths", "${coord:dataOut('output')}");
-            props.put("falconInPaths", pathsWithPartitions.toString());
-            props.put("falconInputFeeds", feed.getName());
-            replicationWF.setConfiguration(getCoordConfig(props));
-            replicationAction.setWorkflow(replicationWF);
-        } catch (Exception e) {
-            throw new FalconException("Unable to create replication workflow", e);
         }
-        return replicationAction;
 
-    }
+        private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+                                                     Map<String, String> props, String prefix) {
+            props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
+            props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
+            props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
 
-    private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
-        try {
-            WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
-            repWFapp.setName(wfName);
-            addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
-            marshal(cluster, repWFapp, wfPath);
-        } catch(IOException e) {
-            throw new FalconException("Unable to create replication workflow", e);
+            props.put(prefix + "Database", tableStorage.getDatabase());
+            props.put(prefix + "Table", tableStorage.getTable());
+            props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
         }
-    }
 
-    private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
-        try {
-            WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
-            retWfApp.setName(wfName);
-            addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
-            marshal(cluster, retWfApp, wfPath);
-        } catch(IOException e) {
-            throw new FalconException("Unable to create retention workflow", e);
+        private void setupHiveConfiguration(Cluster trgCluster, CatalogStorage sourceStorage,
+                                            CatalogStorage targetStorage, Path wfPath) throws IOException {
+            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(trgCluster));
+
+            // copy import export scripts to stagingDir
+            Path scriptPath = new Path(wfPath, "scripts");
+            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
+            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
+
+            // create hive conf to stagingDir
+            Path confPath = new Path(wfPath + "/conf");
+            createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), "falcon-source-");
+            createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), "falcon-target-");
         }
-    }
 
-    @Override
-    protected Map<String, String> getEntityProperties() {
-        Feed feed = getEntity();
-        Map<String, String> props = new HashMap<String, String>();
-        if (feed.getProperties() != null) {
-            for (Property prop : feed.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
+        private void copyHiveScript(FileSystem fs, Path scriptPath,
+                                    String localScriptPath, String scriptName) throws IOException {
+            OutputStream out = null;
+            InputStream in = null;
+            try {
+                out = fs.create(new Path(scriptPath, scriptName));
+                in = OozieFeedMapper.class.getResourceAsStream(localScriptPath + scriptName);
+                IOUtils.copy(in, out);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(out);
             }
         }
-        return props;
-    }
 
-    private String getLocationURI(Cluster cluster, Feed feed, LocationType type) {
-        String path = FeedHelper.getLocation(feed, type, cluster.getName())
-                .getPath();
+        private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
+                                                  Cluster trgCluster, CatalogStorage targetStorage,
+                                                  Map<String, String> props) {
+            // create staging dirs for export at source & set it as distcpSourcePaths
+            String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
+            String sourceStagingDir =
+                    FeedHelper.getStagingDir(srcCluster, getEntity(), sourceStorage, Tag.REPLICATION)
+                    + "/" + sourceDatedPartitionKey
+                    + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
+            props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+            // create staging dirs for import at target & set it as distcpTargetPaths
+            String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
+            String targetStagingDir =
+                    FeedHelper.getStagingDir(trgCluster, getEntity(), targetStorage, Tag.REPLICATION)
+                    + "/" + targetDatedPartitionKey
+                    + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
+            props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+            props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
+        }
 
-        if (!path.equals("/tmp")) {
-            if (new Path(path).toUri().getScheme() == null) {
-                return new Path(ClusterHelper.getStorageUrl(cluster), path)
-                        .toString();
-            } else {
-                return path;
-            }
+        private void propagateLateDataProperties(Feed feed, String instancePaths,
+                                                 String falconFeedStorageType, Map<String, String> props) {
+            // todo these pairs are the same but used in different context
+            // late data handler - should-record action
+            props.put("falconInputFeeds", feed.getName());
+            props.put("falconInPaths", instancePaths);
+
+            // storage type for each corresponding feed - in this case only one feed is involved
+            // needed to compute usage based on storage type in LateDataHandler
+            props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+
+            // falcon post processing
+            props.put(ARG.feedNames.getPropName(), feed.getName());
+            props.put(ARG.feedInstancePaths.getPropName(), instancePaths);
         }
-        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/main/resources/config/workflow/falcon-table-export.hql
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/falcon-table-export.hql b/feed/src/main/resources/config/workflow/falcon-table-export.hql
new file mode 100644
index 0000000..37fd1b7
--- /dev/null
+++ b/feed/src/main/resources/config/workflow/falcon-table-export.hql
@@ -0,0 +1,18 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+export table ${falconSourceDatabase}.${falconSourceTable} partition ${falconSourcePartition} to '${falconSourceStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/main/resources/config/workflow/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/falcon-table-import.hql b/feed/src/main/resources/config/workflow/falcon-table-import.hql
new file mode 100644
index 0000000..653d580
--- /dev/null
+++ b/feed/src/main/resources/config/workflow/falcon-table-import.hql
@@ -0,0 +1,20 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+use ${falconTargetDatabase};
+alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
+import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index d141476..91d0285 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -22,7 +22,7 @@
             <case to="recordsize">
                 ${shouldRecord=="true"}
             </case>
-            <default to="replication"/>
+            <default to="replication-decision"/>
         </switch>
     </decision>
     <action name='recordsize'>
@@ -38,6 +38,15 @@
                     <name>oozie.launcher.mapred.job.priority</name>
                     <value>${jobPriority}</value>
                 </property>
+                <!-- HCatalog jars -->
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>hcatalog</value>
+                </property>
             </configuration>
             <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
             <arg>-out</arg>
@@ -46,11 +55,50 @@
             <arg>${falconInPaths}</arg>
             <arg>-falconInputFeeds</arg>
             <arg>${falconInputFeeds}</arg>
+            <arg>-falconInputFeedStorageTypes</arg>
+            <arg>${falconInputFeedStorageTypes}</arg>
             <capture-output/>
         </java>
+        <ok to="replication-decision"/>
+        <error to="fail"/>
+    </action>
+    <decision name="replication-decision">
+        <switch>
+            <case to="table-export">
+                ${falconFeedStorageType == "TABLE"}
+            </case>
+            <default to="replication"/>
+        </switch>
+    </decision>
+    <!-- Table Replication - Export data and metadata to HDFS Staging from Source Hive -->
+    <action name="table-export">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${falconSourceJobTracker}</job-tracker>
+            <name-node>${falconSourceNameNode}</name-node>
+            <prepare>
+                <delete path="${distcpSourcePaths}"/>
+            </prepare>
+            <job-xml>${wf:appPath()}/conf/falcon-source-hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>${wf:appPath()}/scripts/falcon-table-export.hql</script>
+            <param>falconSourceDatabase=${falconSourceDatabase}</param>
+            <param>falconSourceTable=${falconSourceTable}</param>
+            <param>falconSourcePartition=${falconSourcePartition}</param>
+            <param>falconSourceStagingDir=${distcpSourcePaths}</param>
+        </hive>
         <ok to="replication"/>
         <error to="fail"/>
     </action>
+    <!-- Replication action -->
     <action name="replication">
         <java>
             <job-tracker>${jobTracker}</job-tracker>
@@ -75,8 +123,43 @@
             <arg>${distcpSourcePaths}</arg>
             <arg>-targetPath</arg>
             <arg>${distcpTargetPaths}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
             <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file>
         </java>
+        <ok to="post-replication-decision"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <decision name="post-replication-decision">
+        <switch>
+            <case to="table-import">
+                ${falconFeedStorageType == "TABLE"}
+            </case>
+            <default to="succeeded-post-processing"/>
+        </switch>
+    </decision>
+    <!-- Table Replication - Import data and metadata from HDFS Staging into Target Hive -->
+    <action name="table-import">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${falconTargetJobTracker}</job-tracker>
+            <name-node>${falconTargetNameNode}</name-node>
+            <job-xml>${wf:appPath()}/conf/falcon-target-hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>${wf:appPath()}/scripts/falcon-table-import.hql</script>
+            <param>falconTargetDatabase=${falconTargetDatabase}</param>
+            <param>falconTargetTable=${falconTargetTable}</param>
+            <param>falconTargetPartition=${falconTargetPartition}</param>
+            <param>falconTargetStagingDir=${distcpTargetPaths}</param>
+        </hive>
         <ok to="succeeded-post-processing"/>
         <error to="failed-post-processing"/>
     </action>
@@ -211,8 +294,8 @@
         <error to="fail"/>
     </action>
     <kill name="fail">
-        <message>Workflow failed, error
-            message[${wf:errorMessage(wf:lastErrorNode())}]
+        <message>
+            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
         </message>
     </kill>
     <end name='end'/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/main/resources/config/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/retention-workflow.xml b/feed/src/main/resources/config/workflow/retention-workflow.xml
index 72e0133..8b444f5 100644
--- a/feed/src/main/resources/config/workflow/retention-workflow.xml
+++ b/feed/src/main/resources/config/workflow/retention-workflow.xml
@@ -30,10 +30,21 @@
                     <name>oozie.launcher.mapred.job.priority</name>
                     <value>${jobPriority}</value>
                 </property>
+                <!-- HCatalog jars -->
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>hcatalog</value>
+                </property>
             </configuration>
             <main-class>org.apache.falcon.retention.FeedEvictor</main-class>
             <arg>-feedBasePath</arg>
             <arg>${feedDataPath}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
             <arg>-retentionType</arg>
             <arg>instance</arg>
             <arg>-retentionLimit</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 4fb6a58..de7f9e5 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -17,21 +17,19 @@
  */
 package org.apache.falcon.converter;
 
-import static org.testng.Assert.assertEquals;
-
-import java.util.Collection;
-import java.util.List;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.Unmarshaller;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
@@ -39,6 +37,8 @@ import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.JAVA;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +48,18 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Tests for Oozie workflow definition for feed replication & retention.
  */
@@ -57,11 +69,17 @@ public class OozieFeedMapperTest {
     private final ConfigurationStore store = ConfigurationStore.get();
     private Cluster srcCluster;
     private Cluster trgCluster;
+    private Cluster alphaTrgCluster;
+    private Cluster betaTrgCluster;
     private Feed feed;
+    private Feed tableFeed;
+    private Feed fsReplFeed;
 
     private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
     private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
     private static final String FEED = "/feed.xml";
+    private static final String TABLE_FEED = "/table-replication-feed.xml";
+    private static final String FS_REPLICATION_FEED = "/fs-replication-feed.xml";
 
     @BeforeClass
     public void setUpDFS() throws Exception {
@@ -74,11 +92,13 @@ public class OozieFeedMapperTest {
         cleanupStore();
 
         srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
-
         trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
+        alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-alpha.xml", trgHdfsUrl);
+        betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-beta.xml", trgHdfsUrl);
 
         feed = (Feed) storeEntity(EntityType.FEED, FEED, null);
-
+        fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED, null);
+        tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED, null);
     }
 
     protected Entity storeEntity(EntityType type, String template, String writeEndpoint) throws Exception {
@@ -114,7 +134,7 @@ public class OozieFeedMapperTest {
     }
 
     @Test
-    public void testFeedCoords() throws Exception {
+    public void testReplicationCoordsForFSStorage() throws Exception {
         OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
         List<COORDINATORAPP> coords = feedMapper.getCoordinators(trgCluster,
                 new Path("/projects/falcon/"));
@@ -145,9 +165,8 @@ public class OozieFeedMapperTest {
         Assert.assertEquals("${coord:minutes(20)}",
                 outputDataset.getFrequency());
         Assert.assertEquals("output-dataset", outputDataset.getName());
-        Assert.assertEquals(
-                "${nameNode}"
-                        + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+        Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
+                + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
                         outputDataset.getUriTemplate());
         String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
         String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
@@ -159,20 +178,35 @@ public class OozieFeedMapperTest {
         String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
         Assert.assertEquals("${now(0,-40)}", outEventInstance);
 
+        HashMap<String, String> props = new HashMap<String, String>();
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            if (prop.getName().equals("mapred.job.priority")) {
-                assertEquals(prop.getValue(), "NORMAL");
-                break;
-            }
+            props.put(prop.getName(), prop.getValue());
         }
+
+        // verify the replication param that feed replicator depends on
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
+        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}");
+
         assertLibExtensions(coord, "replication");
     }
 
     private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
-                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
+        WORKFLOWAPP wf = getWorkflowapp(coord);
         List<Object> actions = wf.getDecisionOrForkOrJoin();
         for (Object obj : actions) {
             if (!(obj instanceof ACTION)) {
@@ -193,4 +227,246 @@ public class OozieFeedMapperTest {
             }
         }
     }
+
+    @SuppressWarnings("unchecked")
+    private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
+                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
+    }
+
+    @Test
+    public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
+        OozieFeedMapper feedMapper = new OozieFeedMapper(fsReplFeed);
+
+        List<COORDINATORAPP> alphaCoords = feedMapper.getCoordinators(alphaTrgCluster, new Path("/alpha/falcon/"));
+        final COORDINATORAPP alphaCoord = alphaCoords.get(0);
+        Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
+        Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
+
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
+        assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
+
+        List<COORDINATORAPP> betaCoords = feedMapper.getCoordinators(betaTrgCluster, new Path("/beta/falcon/"));
+        final COORDINATORAPP betaCoord = betaCoords.get(0);
+        Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
+        Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
+
+        pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
+        assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
+    }
+
+    private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
+                                          Feed aFeed) throws FalconException {
+        String srcPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
+        srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
+        String targetPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
+        targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
+
+        StringBuilder pathsWithPartitions = new StringBuilder();
+        pathsWithPartitions.append("${coord:dataIn('input')}/")
+                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+        String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+        parts = StringUtils.stripEnd(parts, "/");
+        return parts;
+    }
+
+    private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
+                                 String pathsWithPartitions) throws JAXBException, IOException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
+        Date startDate = feedCluster.getValidity().getStart();
+        Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
+
+        Date endDate = feedCluster.getValidity().getEnd();
+        Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
+
+        WORKFLOWAPP workflow = getWorkflowapp(coord);
+        assertWorkflowDefinition(fsReplFeed, workflow);
+
+        List<Object> actions = workflow.getDecisionOrForkOrJoin();
+        System.out.println("actions = " + actions);
+
+        ACTION replicationActionNode = (ACTION) actions.get(4);
+        Assert.assertEquals(replicationActionNode.getName(), "replication");
+
+        JAVA replication = replicationActionNode.getJava();
+        List<String> args = replication.getArg();
+        Assert.assertEquals(args.size(), 11);
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+    }
+
+    public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+        Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+        Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+        Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+        Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+    }
+
+    @Test
+    public void testReplicationCoordsForTableStorage() throws Exception {
+        OozieFeedMapper feedMapper = new OozieFeedMapper(tableFeed);
+        List<COORDINATORAPP> coords = feedMapper.getCoordinators(
+                trgCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
+
+        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION",
+                coord.getAction().getWorkflow().getAppPath());
+        Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
+                + srcCluster.getName(), coord.getName());
+        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+
+        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(0);
+        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+        Assert.assertEquals("input-dataset", inputDataset.getName());
+
+        String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
+        sourceRegistry = sourceRegistry.replace("thrift", "hcat");
+        Assert.assertEquals(inputDataset.getUriTemplate(),
+                sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(1);
+        Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
+        Assert.assertEquals("output-dataset", outputDataset.getName());
+
+        String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
+        targetRegistry = targetRegistry.replace("thrift", "hcat");
+        Assert.assertEquals(outputDataset.getUriTemplate(),
+                targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+        Assert.assertEquals("input", inEventName);
+        Assert.assertEquals("input-dataset", inEventDataset);
+        Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+        Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+        // assert FS staging area
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        final FileSystem fs = trgMiniDFS.getFileSystem();
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
+
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
+        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
+
+        // verify the replication param that feed replicator depends on
+        Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
+
+        Assert.assertTrue(props.containsKey("distcpSourcePaths"));
+        Assert.assertEquals(props.get("distcpSourcePaths"),
+                FeedHelper.getStagingDir(srcCluster, tableFeed, srcStorage, Tag.REPLICATION)
+                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+        Assert.assertTrue(props.containsKey("distcpTargetPaths"));
+        Assert.assertEquals(props.get("distcpTargetPaths"),
+                FeedHelper.getStagingDir(trgCluster, tableFeed, trgStorage, Tag.REPLICATION)
+                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
+
+        // verify table props
+        assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
+        assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}");
+    }
+
+    private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+                                              Map<String, String> props, String prefix) {
+        Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
+        Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
+        Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
+
+        Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
+        Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
+        Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
+    }
+
+    @Test
+    public void testRetentionCoords() throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+        final Calendar instance = Calendar.getInstance();
+        instance.roll(Calendar.YEAR, 1);
+        cluster.getValidity().setEnd(instance.getTime());
+
+        OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
+        List<COORDINATORAPP> coords = feedMapper.getCoordinators(srcCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
+
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String feedDataPath = props.get("feedDataPath");
+        String storageType = props.get("falconFeedStorageType");
+
+        // verify the param that feed evictor depends on
+        Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
+
+        final Storage storage = FeedHelper.createStorage(cluster, feed);
+        if (feedDataPath != null) {
+            Assert.assertEquals(feedDataPath, storage.getUriTemplate()
+                    .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+        }
+
+        if (storageType != null) {
+            Assert.assertEquals(storageType, storage.getType().name());
+        }
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/test/resources/fs-replication-feed.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/fs-replication-feed.xml b/feed/src/test/resources/fs-replication-feed.xml
new file mode 100644
index 0000000..c4ee460
--- /dev/null
+++ b/feed/src/test/resources/fs-replication-feed.xml
@@ -0,0 +1,64 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="billing RC File" name="replication-test" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(1)"/>
+
+    <clusters>
+        <cluster partition="${cluster.colo}" type="source" name="corp1">
+            <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+            <retention action="delete" limit="days(10000)"/>
+        </cluster>
+        <cluster type="target" name="alpha">
+            <validity end="2012-10-01T12:11Z" start="2012-10-01T12:05Z"/>
+            <retention action="delete" limit="days(10000)"/>
+            <locations>
+                <location path="/localDC/rc/billing/ua1/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/>
+            </locations>
+        </cluster>
+        <cluster type="target" name="beta">
+            <validity end="2012-10-01T12:26Z" start="2012-10-01T12:10Z"/>
+            <retention action="delete" limit="days(10000)"/>
+            <locations>
+                <location path="/localDC/rc/billing/ua2/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location
+                path="/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/"
+                type="data"/>
+        <location path="/data/regression/fetlrc/billing/stats" type="stats"/>
+        <location path="/data/regression/fetlrc/billing/metadata"
+                  type="meta"/>
+    </locations>
+
+    <ACL permission="0x755" group="group" owner="fetl"/>
+    <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/test/resources/src-cluster.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/src-cluster.xml b/feed/src/test/resources/src-cluster.xml
index 75d8ed0..730f8d2 100644
--- a/feed/src/test/resources/src-cluster.xml
+++ b/feed/src/test/resources/src-cluster.xml
@@ -27,7 +27,7 @@
                    version="3.1"/>
         <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
                    version="5.1.6"/>
-        <interface type="registry" endpoint="Hcat" version="1"/>
+        <interface type="registry" endpoint="thrift://localhost:49093" version="1"/>
     </interfaces>
     <locations>
         <location name="temp" path="/tmp"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/test/resources/table-replication-feed.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/table-replication-feed.xml b/feed/src/test/resources/table-replication-feed.xml
new file mode 100644
index 0000000..4c610f6
--- /dev/null
+++ b/feed/src/test/resources/table-replication-feed.xml
@@ -0,0 +1,42 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="raw-logs-table" xmlns="uri:falcon:feed:0.1">
+
+    <frequency>minutes(20)</frequency>
+    <timezone>UTC</timezone>
+
+    <clusters>
+        <cluster name="corp1" type="source" delay="minutes(40)">
+            <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
+            <retention limit="minutes(5)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="corp2" type="target">
+            <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
+            <retention limit="minutes(7)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <table uri="catalog:target_db:target_clicks_table#ds=${YEAR}${MONTH}${DAY};region=${region}" />
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:source_db:source_clicks_table#ds=${YEAR}${MONTH}${DAY};region=${region}" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/test/resources/trg-cluster-alpha.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/trg-cluster-alpha.xml b/feed/src/test/resources/trg-cluster-alpha.xml
new file mode 100644
index 0000000..1fb07cb
--- /dev/null
+++ b/feed/src/test/resources/trg-cluster-alpha.xml
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<cluster colo="ua1" description="" name="alpha" xmlns="uri:falcon:cluster:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="thrift://localhost:59093" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging2"/>
+    </locations>
+    <properties>
+        <property name="separator" value="-"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/test/resources/trg-cluster-beta.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/trg-cluster-beta.xml b/feed/src/test/resources/trg-cluster-beta.xml
new file mode 100644
index 0000000..0bf0bcd
--- /dev/null
+++ b/feed/src/test/resources/trg-cluster-beta.xml
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<cluster colo="ua2" description="" name="beta" xmlns="uri:falcon:cluster:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="thrift://localhost:59093" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging2"/>
+    </locations>
+    <properties>
+        <property name="separator" value="-"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/feed/src/test/resources/trg-cluster.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/trg-cluster.xml b/feed/src/test/resources/trg-cluster.xml
index 9a99b62..8260fda 100644
--- a/feed/src/test/resources/trg-cluster.xml
+++ b/feed/src/test/resources/trg-cluster.xml
@@ -27,7 +27,7 @@
                    version="3.1"/>
         <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
                    version="5.1.6"/>
-        <interface type="registry" endpoint="Hcat" version="1"/>
+        <interface type="registry" endpoint="thrift://localhost:59093" version="1"/>
     </interfaces>
     <locations>
         <location name="temp" path="/tmp"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/hadoop-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/pom.xml b/hadoop-webapp/pom.xml
index 00a8504..9770312 100644
--- a/hadoop-webapp/pom.xml
+++ b/hadoop-webapp/pom.xml
@@ -97,6 +97,33 @@
             <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-core</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+        </dependency>
+
+        <!-- Hive Metastore and WebHcat fails with out these dependencies -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+
+        <!-- Oozie dependencies -->
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hcatalog</groupId>
+            <artifactId>webhcat-java-client</artifactId>
+        </dependency>
     </dependencies>
 
     <build>


Mime
View raw message