falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [2/2] git commit: FACON-481 Simplify process parent workflow. Contributed by Shwetha GS
Date Thu, 31 Jul 2014 10:14:15 GMT
FACON-481 Simplify process parent workflow. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/3bb5a62a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/3bb5a62a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/3bb5a62a

Branch: refs/heads/master
Commit: 3bb5a62affc5a87df4232865c5e8894aef0333bd
Parents: ed100c8
Author: Shwetha GS <shwetha.gs@inmobi.com>
Authored: Thu Jul 31 15:44:05 2014 +0530
Committer: Shwetha GS <shwetha.gs@inmobi.com>
Committed: Thu Jul 31 15:44:05 2014 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/entity/EntityUtil.java    |  14 +
 .../apache/falcon/oozie/OozieBundleBuilder.java |   2 +-
 .../falcon/oozie/OozieCoordinatorBuilder.java   |  20 +-
 .../apache/falcon/oozie/OozieEntityBuilder.java |  74 ++---
 .../OozieOrchestrationWorkflowBuilder.java      | 124 +++++--
 .../feed/FSReplicationWorkflowBuilder.java      |  70 ++++
 .../feed/FeedReplicationCoordinatorBuilder.java |  24 +-
 .../feed/FeedReplicationWorkflowBuilder.java    |  63 +---
 .../feed/FeedRetentionCoordinatorBuilder.java   |   4 +-
 .../feed/FeedRetentionWorkflowBuilder.java      |  42 ++-
 .../feed/HCatReplicationWorkflowBuilder.java    | 133 ++++++++
 .../process/HiveProcessWorkflowBuilder.java     |   9 +-
 .../process/OozieProcessWorkflowBuilder.java    |   9 +-
 .../process/PigProcessWorkflowBuilder.java      |  12 +-
 .../ProcessExecutionCoordinatorBuilder.java     |  16 +-
 .../ProcessExecutionWorkflowBuilder.java        |  66 ++--
 .../java/org/apache/falcon/util/OozieUtils.java |   2 +
 .../resources/action/feed/eviction-action.xml   |  55 ++++
 .../action/feed/falcon-table-export.hql         |  18 +
 .../action/feed/falcon-table-import.hql         |  20 ++
 .../action/feed/replication-action.xml          |  59 ++++
 .../resources/action/feed/table-cleanup.xml     |  25 ++
 .../main/resources/action/feed/table-export.xml |  45 +++
 .../main/resources/action/feed/table-import.xml |  42 +++
 .../src/main/resources/action/post-process.xml  |  94 ++++++
 oozie/src/main/resources/action/pre-process.xml |  50 +++
 .../resources/action/process/hive-action.xml    |  37 +++
 .../resources/action/process/oozie-action.xml   |  25 ++
 .../resources/action/process/pig-action.xml     |  40 +++
 .../resources/workflow/falcon-table-export.hql  |  18 -
 .../resources/workflow/falcon-table-import.hql  |  20 --
 .../workflow/process-parent-workflow.xml        | 278 ----------------
 .../resources/workflow/replication-workflow.xml | 330 -------------------
 .../resources/workflow/retention-workflow.xml   | 208 ------------
 .../feed/OozieFeedWorkflowBuilderTest.java      |  52 ++-
 .../falcon/oozie/process/AbstractTestBase.java  |  20 +-
 .../OozieProcessWorkflowBuilderTest.java        |  69 ++--
 src/main/examples/entity/hcat/hcat-out-feed.xml |   2 -
 .../examples/entity/hcat/hcat-pig-process.xml   |   4 +-
 39 files changed, 1061 insertions(+), 1134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 6f50829..5909113 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -672,5 +672,19 @@ public final class EntityUtil {
         throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
     }
 
+    public static boolean isTableStorageType(Cluster cluster, Entity entity) throws FalconException {
+        return entity.getEntityType() == EntityType.PROCESS
+            ? isTableStorageType(cluster, (Process) entity) : isTableStorageType(cluster, (Feed) entity);
+    }
+
+    public static boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
+        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
+        return Storage.TYPE.TABLE == storageType;
+    }
+
+    public static boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
+        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
+        return Storage.TYPE.TABLE == storageType;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 62d95fa..6185aaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -122,7 +122,7 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
         properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib");
 
-        if (isTableStorageType(cluster)) {
+        if (EntityUtil.isTableStorageType(cluster, entity)) {
             properties.putAll(getHiveCredentials(cluster));
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 5f483f0..e354011 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.oozie;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
@@ -40,10 +39,6 @@ import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.OozieClient;
 
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.InputStream;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
@@ -121,7 +116,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
             DEFAULT_BROKER_MSG_TTL.toString());
         props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
         props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
-        props.put("logDir", getStoragePath(new Path(EntityUtil.getBaseStagingPath(cluster, entity), "logs")));
+        props.put("logDir", getLogDirectory(cluster));
         props.put(OozieClient.EXTERNAL_ID,
             new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
                 "${coord:nominalTime()}").getId());
@@ -164,18 +159,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
     public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException;
 
     protected COORDINATORAPP unmarshal(String template) throws FalconException {
-        InputStream resourceAsStream = null;
-        try {
-            resourceAsStream = OozieCoordinatorBuilder.class.getResourceAsStream(template);
-            Unmarshaller unmarshaller = OozieUtils.COORD_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked") JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
-                unmarshaller.unmarshal(resourceAsStream);
-            return jaxbElement.getValue();
-        } catch (JAXBException e) {
-            throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
-        }
+        return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 1238b82..7557e3d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -18,19 +18,16 @@
 
 package org.apache.falcon.oozie;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -48,8 +45,12 @@ import org.slf4j.LoggerFactory;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -130,6 +131,14 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         try {
             Marshaller marshaller = jaxbContext.createMarshaller();
             marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+
+            if (LOG.isDebugEnabled()) {
+                StringWriter writer = new StringWriter();
+                marshaller.marshal(jaxbElement, writer);
+                LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
+                LOG.debug(writer.getBuffer().toString());
+            }
+
             FileSystem fs = HadoopClientFactory.get().createFileSystem(
                 outPath.toUri(), ClusterHelper.getConfiguration(cluster));
             OutputStream out = fs.create(outPath);
@@ -138,12 +147,6 @@ public abstract class OozieEntityBuilder<T extends Entity> {
             } finally {
                 out.close();
             }
-            if (LOG.isDebugEnabled()) {
-                StringWriter writer = new StringWriter();
-                marshaller.marshal(jaxbElement, writer);
-                LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
-                LOG.debug(writer.getBuffer().toString());
-            }
 
             LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
             return outPath;
@@ -152,21 +155,6 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         }
     }
 
-    protected boolean isTableStorageType(Cluster cluster) throws FalconException {
-        return entity.getEntityType() == EntityType.PROCESS
-            ? isTableStorageType(cluster, (Process) entity) : isTableStorageType(cluster, (Feed) entity);
-    }
-
-    protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
-        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
-        return Storage.TYPE.TABLE == storageType;
-    }
-
-    protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
-        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
-        return Storage.TYPE.TABLE == storageType;
-    }
-
     protected Properties getHiveCredentials(Cluster cluster) {
         String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
         if (metaStoreUrl == null) {
@@ -238,21 +226,6 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         return properties;
     }
 
-    protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
-        String prefix = "falcon_" + input.getName();
-
-        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
-        props.put(prefix + "_partition_filter_pig",
-            "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
-        props.put(prefix + "_partition_filter_hive",
-            "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
-        props.put(prefix + "_partition_filter_java",
-            "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
-        props.put(prefix + "_datain_partitions_hive",
-            "${coord:dataInPartitions('" + input.getName() + "', 'hive-export')}");
-    }
-
     protected void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage, Properties props) {
         String prefix = "falcon_" + output.getName();
 
@@ -304,4 +277,23 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         prop.setProperty(OozieEntityBuilder.ENTITY_NAME, name);
         return prop;
     }
+
+    protected String getLogDirectory(Cluster cluster) {
+        return getStoragePath(new Path(EntityUtil.getBaseStagingPath(cluster, entity), "logs"));
+    }
+
+    protected <T> T unmarshal(String template, JAXBContext context, Class<T> cls) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            resourceAsStream = OozieEntityBuilder.class.getResourceAsStream(template);
+            Unmarshaller unmarshaller = context.createUnmarshaller();
+            @SuppressWarnings("unchecked")
+            JAXBElement<T> jaxbElement = unmarshaller.unmarshal(new StreamSource(resourceAsStream), cls);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException("Failed to unmarshal " + template, e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 2ef162b..083f807 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -22,19 +22,24 @@ import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.oozie.feed.FeedReplicationWorkflowBuilder;
+import org.apache.falcon.oozie.feed.FSReplicationWorkflowBuilder;
 import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder;
+import org.apache.falcon.oozie.feed.HCatReplicationWorkflowBuilder;
 import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CREDENTIAL;
 import org.apache.falcon.oozie.workflow.CREDENTIALS;
+import org.apache.falcon.oozie.workflow.END;
+import org.apache.falcon.oozie.workflow.KILL;
+import org.apache.falcon.oozie.workflow.START;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.OozieUtils;
@@ -44,12 +49,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -63,8 +64,22 @@ import java.util.Set;
  */
 public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
     protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+
+    protected static final String USER_ACTION_NAME = "user-action";
+    protected static final String PREPROCESS_ACTION_NAME = "pre-processing";
+    protected static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing";
+    protected static final String FAIL_POSTPROCESS_ACTION_NAME = "failed-post-processing";
+    protected static final String OK_ACTION_NAME = "end";
+    protected static final String FAIL_ACTION_NAME = "fail";
+
+
+    private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml";
+    private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml";
+
     public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
-        Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
+        Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME,
+            FAIL_POSTPROCESS_ACTION_NAME, }));
+
     private final Tag lifecycle;
 
     public OozieOrchestrationWorkflowBuilder(T entity, Tag lifecycle) {
@@ -72,7 +87,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         this.lifecycle = lifecycle;
     }
 
-    public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Tag lifecycle) {
+    public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
+        throws FalconException {
         switch(entity.getEntityType()) {
         case FEED:
             Feed feed = (Feed) entity;
@@ -81,7 +97,12 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
                 return new FeedRetentionWorkflowBuilder(feed);
 
             case REPLICATION:
-                return new FeedReplicationWorkflowBuilder(feed);
+                boolean isTable = EntityUtil.isTableStorageType(cluster, feed);
+                if (isTable) {
+                    return new HCatReplicationWorkflowBuilder(feed);
+                } else {
+                    return new FSReplicationWorkflowBuilder(feed);
+                }
 
             default:
                 throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle "
@@ -110,24 +131,76 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + lifecycle);
     }
 
+    protected void addTransition(ACTION action, String ok, String fail) {
+        action.getOk().setTo(ok);
+        action.getError().setTo(fail);
+    }
+
+    protected void decorateWorkflow(WORKFLOWAPP wf, String name, String startAction) {
+        wf.setName(name);
+        wf.setStart(new START());
+        wf.getStart().setTo(startAction);
+
+        wf.setEnd(new END());
+        wf.getEnd().setName(OK_ACTION_NAME);
+
+        KILL kill = new KILL();
+        kill.setName(FAIL_ACTION_NAME);
+        kill.setMessage("Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
+        wf.getDecisionOrForkOrJoin().add(kill);
+    }
+
+    protected ACTION getSuccessPostProcessAction() throws FalconException {
+        ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
+        decorateWithOozieRetries(action);
+        return action;
+    }
+
+    protected ACTION getFailPostProcessAction() throws FalconException {
+        ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
+        decorateWithOozieRetries(action);
+        action.setName(FAIL_POSTPROCESS_ACTION_NAME);
+        return action;
+    }
+
+    protected ACTION getPreProcessingAction(boolean isTableStorageType, Tag tag) throws FalconException {
+        ACTION action = unmarshalAction(PREPROCESS_TEMPLATE);
+        decorateWithOozieRetries(action);
+        if (isTableStorageType) {
+            // adds hive-site.xml in actions classpath
+            action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+        }
+
+        List<String> args = action.getJava().getArg();
+        args.add("-out");
+        if (tag == Tag.REPLICATION) {
+            args.add("${logDir}/latedata/${nominalTime}/${srcClusterName}");
+        } else {
+            args.add("${logDir}/latedata/${nominalTime}");
+        }
+        return action;
+    }
+
     protected Path marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
         return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
             OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
     }
 
     protected WORKFLOWAPP unmarshal(String template) throws FalconException {
-        InputStream resourceAsStream = null;
-        try {
-            resourceAsStream = OozieOrchestrationWorkflowBuilder.class.getResourceAsStream(template);
-            Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked")
-            JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(resourceAsStream);
-            return jaxbElement.getValue();
-        } catch (JAXBException e) {
-            throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
+        return unmarshal(template, OozieUtils.WORKFLOW_JAXB_CONTEXT, WORKFLOWAPP.class);
+    }
+
+    protected ACTION unmarshalAction(String template) throws FalconException {
+        return unmarshal(template, OozieUtils.ACTION_JAXB_CONTEXT, ACTION.class);
+    }
+
+    protected boolean shouldPreProcess() throws FalconException {
+        if (EntityUtil.getLateProcess(entity) == null
+            || EntityUtil.getLateProcess(entity).getLateInputs() == null
+            || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+            return false;
         }
+        return true;
     }
 
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag)
@@ -281,19 +354,6 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         return property;
     }
 
-    protected void addOozieRetries(WORKFLOWAPP workflow) {
-        for (Object object : workflow.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
-                continue;
-            }
-            org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
-            String actionName = action.getName();
-            if (FALCON_ACTIONS.contains(actionName)) {
-                decorateWithOozieRetries(action);
-            }
-        }
-    }
-
     protected void decorateWithOozieRetries(ACTION action) {
         Properties props = RuntimeProperties.get();
         action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
new file mode 100644
index 0000000..6feb32e
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+
+import java.util.Arrays;
+
+/**
+ * Builds replication workflow for filesystem based feed.
+ */
+public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder{
+    public FSReplicationWorkflowBuilder(Feed entity) {
+        super(entity);
+    }
+
+    @Override protected WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException {
+        WORKFLOWAPP workflow = new WORKFLOWAPP();
+        String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(src.getName()), entity).toString();
+
+        String start = REPLICATION_ACTION_NAME;
+
+        //Add pre-processing
+        if (shouldPreProcess()) {
+            ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
+            addTransition(action, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(action);
+            start = PREPROCESS_ACTION_NAME;
+        }
+
+        //Add replication
+        ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
+        addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(replication);
+
+        //Add post-processing actions
+        ACTION success = getSuccessPostProcessAction();
+        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(success);
+
+        ACTION fail = getFailPostProcessAction();
+        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(fail);
+
+        decorateWorkflow(workflow, wfName, start);
+        return workflow;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 0b582ef..2798db2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -63,6 +63,9 @@ import java.util.Properties;
  */
 public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
     private static final String REPLICATION_COORD_TEMPLATE = "/coordinator/replication-coordinator.xml";
+    private static final String IMPORT_HQL = "/action/feed/falcon-table-import.hql";
+    private static final String EXPORT_HQL = "/action/feed/falcon-table-export.hql";
+
     private static final int THIRTY_MINUTES = 30 * 60 * 1000;
 
     private static final String PARALLEL = "parallel";
@@ -96,8 +99,9 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
 
         // Different workflow for each source since hive credentials vary for each cluster
-        OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, Tag.REPLICATION);
-        builder.build(trgCluster, buildPath);
+        OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster,
+            Tag.REPLICATION);
+        Properties wfProps = builder.build(trgCluster, buildPath);
 
         long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster);
         Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis);
@@ -136,7 +140,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         coord.setAction(replicationWorkflowAction);
 
         Path marshalPath = marshal(trgCluster, coord, buildPath);
-        return getProperties(marshalPath, coordName);
+        wfProps.putAll(getProperties(marshalPath, coordName));
+        return wfProps;
     }
 
     private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath,
@@ -144,7 +149,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         ACTION action = new ACTION();
         WORKFLOW workflow = new WORKFLOW();
 
-        workflow.setAppPath(getStoragePath(new Path(buildPath, "workflow.xml")));
+        workflow.setAppPath(getStoragePath(buildPath));
         Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
         props.put("srcClusterName", srcCluster.getName());
         props.put("srcClusterColo", srcCluster.getColo());
@@ -266,8 +271,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         try {
             // copy import export scripts to stagingDir
             Path scriptPath = new Path(buildPath, "scripts");
-            copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-export.hql");
-            copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-import.hql");
+            copyHiveScript(fs, scriptPath, IMPORT_HQL);
+            copyHiveScript(fs, scriptPath, EXPORT_HQL);
 
             // create hive conf to stagingDir
             Path confPath = new Path(buildPath + "/conf");
@@ -278,13 +283,12 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         }
     }
 
-    private void copyHiveScript(FileSystem fs, Path scriptPath, String localScriptPath,
-        String scriptName) throws IOException {
+    private void copyHiveScript(FileSystem fs, Path scriptPath, String resource) throws IOException {
         OutputStream out = null;
         InputStream in = null;
         try {
-            out = fs.create(new Path(scriptPath, scriptName));
-            in = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(localScriptPath + scriptName);
+            out = fs.create(new Path(scriptPath, new Path(resource).getName()));
+            in = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(resource);
             IOUtils.copy(in, out);
         } finally {
             IOUtils.closeQuietly(in);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index 2537725..eafef32 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -34,67 +34,34 @@ import java.util.Properties;
 /**
  * Builds feed replication workflow, one per source-target cluster combination.
  */
-public class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
-    private static final String REPLICATION_WF_TEMPLATE = "/workflow/replication-workflow.xml";
-    private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth";
-    private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth";
+public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
+    protected static final String REPLICATION_ACTION_TEMPLATE = "/action/feed/replication-action.xml";
+    protected static final String REPLICATION_ACTION_NAME = "replication";
 
     public FeedReplicationWorkflowBuilder(Feed entity) {
         super(entity, Tag.REPLICATION);
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
-        WORKFLOWAPP workflow = unmarshal(REPLICATION_WF_TEMPLATE);
         Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName());
+
+        WORKFLOWAPP workflow = getWorkflow(srcCluster, cluster);
         String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
         workflow.setName(wfName);
 
         addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION);
 
-        addOozieRetries(workflow);
-
-        if (isTableStorageType(cluster)) {
-            setupHiveCredentials(cluster, srcCluster, workflow);
-        }
-
-        Path marshalPath = marshal(cluster, workflow, buildPath);
-        return getProperties(marshalPath, wfName);
+        marshal(cluster, workflow, buildPath);
+        Properties props = getProperties(buildPath, wfName);
+        props.putAll(getWorkflowProperties());
+        return props;
     }
 
-    private void setupHiveCredentials(Cluster targetCluster, Cluster sourceCluster, WORKFLOWAPP workflowApp) {
-        if (isSecurityEnabled) {
-            // add hcatalog credentials for secure mode and add a reference to each action
-            addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME);
-            addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME);
-        }
-
-        // hive-site.xml file is created later in coordinator initialization but
-        // actions are set to point to that here
-
-        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
-                continue;
-            }
-
-            org.apache.falcon.oozie.workflow.ACTION action =
-                (org.apache.falcon.oozie.workflow.ACTION) object;
-            String actionName = action.getName();
-            if ("recordsize".equals(actionName)) {
-                // add reference to hive-site conf to each action
-                action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
-
-                if (isSecurityEnabled) { // add a reference to credential in the action
-                    action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
-                }
-            } else if ("table-export".equals(actionName)) {
-                if (isSecurityEnabled) { // add a reference to credential in the action
-                    action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
-                }
-            } else if ("table-import".equals(actionName)) {
-                if (isSecurityEnabled) { // add a reference to credential in the action
-                    action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
-                }
-            }
-        }
+    private Properties getWorkflowProperties() {
+        Properties props = new Properties();
+        props.setProperty("falconDataOperation", "REPLICATE");
+        return props;
     }
+
+    protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index ac38532..2238778 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -95,8 +95,10 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
         props.putAll(FeedHelper.getUserWorkflowProperties("eviction"));
 
         WORKFLOW workflow = new WORKFLOW();
-        Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, Tag.RETENTION).build(cluster, coordPath);
+        Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster,
+            coordPath);
         workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+        props.putAll(wfProp);
         workflow.setConfiguration(getConfig(props));
         ACTION action = new ACTION();
         action.setWorkflow(workflow);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index eee4fe9..3aabb19 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.hadoop.fs.Path;
 
@@ -33,25 +34,50 @@ import java.util.Properties;
  * Builds feed retention workflow.
  */
 public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
-    private static final String RETENTION_WF_TEMPLATE = "/workflow/retention-workflow.xml";
+    private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml";
+
+    private static final String EVICTION_ACTION_NAME = "eviction";
 
     public FeedRetentionWorkflowBuilder(Feed entity) {
         super(entity, Tag.DEFAULT);
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
-        WORKFLOWAPP workflow = unmarshal(RETENTION_WF_TEMPLATE);
+        WORKFLOWAPP workflow = new WORKFLOWAPP();
         String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
-        workflow.setName(wfName);
+
+        //Add eviction action
+        ACTION eviction = unmarshalAction(EVICTION_ACTION_TEMPLATE);
+        addTransition(eviction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(eviction);
+
+        //Add post-processing actions
+        ACTION success = getSuccessPostProcessAction();
+        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(success);
+
+        ACTION fail = getFailPostProcessAction();
+        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(fail);
+
+        decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
         addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
-        addOozieRetries(workflow);
 
-        if (isTableStorageType(cluster)) {
+        if (EntityUtil.isTableStorageType(cluster, entity)) {
             setupHiveCredentials(cluster, buildPath, workflow);
         }
 
-        Path marshalPath = marshal(cluster, workflow, buildPath);
-        return getProperties(marshalPath, wfName);
+        marshal(cluster, workflow, buildPath);
+        Properties props = getProperties(buildPath, wfName);
+        props.putAll(getWorkflowProperties());
+        return props;
+    }
+
+    private Properties getWorkflowProperties() {
+        Properties props = new Properties();
+        props.setProperty("falconDataOperation", "DELETE");
+        props.setProperty("srcClusterName", "NA");
+        return props;
     }
 
     private void setupHiveCredentials(Cluster cluster, Path wfPath,
@@ -72,7 +98,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
             org.apache.falcon.oozie.workflow.ACTION action =
                 (org.apache.falcon.oozie.workflow.ACTION) object;
             String actionName = action.getName();
-            if ("eviction".equals(actionName)) {
+            if (EVICTION_ACTION_NAME.equals(actionName)) {
                 // add reference to hive-site conf to each action
                 action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
new file mode 100644
index 0000000..61739a5
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+
+import java.util.Arrays;
+
+/**
+ * Builds replication workflow for hcat based feed.
+ */
+public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder {
+    private static final String EXPORT_ACTION_TEMPLATE = "/action/feed/table-export.xml";
+    private static final String IMPORT_ACTION_TEMPLATE = "/action/feed/table-import.xml";
+    private static final String CLEANUP_ACTION_TEMPLATE = "/action/feed/table-cleanup.xml";
+
+    private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth";
+    private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth";
+    public static final String EXPORT_ACTION_NAME = "table-export";
+    public static final String IMPORT_ACTION_NAME = "table-import";
+    private static final String CLEANUP_ACTION_NAME = "cleanup-table-staging-dir";
+
+    public HCatReplicationWorkflowBuilder(Feed entity) {
+        super(entity);
+    }
+
+    @Override protected WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException {
+        WORKFLOWAPP workflow = new WORKFLOWAPP();
+        String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(src.getName()), entity).toString();
+
+        String start = EXPORT_ACTION_NAME;
+
+        //Add pre-processing
+        if (shouldPreProcess()) {
+            ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
+            addTransition(action, EXPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(action);
+            start = PREPROCESS_ACTION_NAME;
+        }
+
+        //Add export action
+        ACTION export = unmarshalAction(EXPORT_ACTION_TEMPLATE);
+        addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(export);
+
+        //Add replication
+        ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
+        addTransition(replication, IMPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(replication);
+
+        //Add import action
+        ACTION importAction = unmarshalAction(IMPORT_ACTION_TEMPLATE);
+        addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(importAction);
+
+        //Add cleanup action
+        ACTION cleanup = unmarshalAction(CLEANUP_ACTION_TEMPLATE);
+        addTransition(cleanup, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(cleanup);
+
+        //Add post-processing actions
+        ACTION success = getSuccessPostProcessAction();
+        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(success);
+
+        ACTION fail = getFailPostProcessAction();
+        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(fail);
+
+        decorateWorkflow(workflow, wfName, start);
+        setupHiveCredentials(src, target, workflow);
+        return workflow;
+    }
+
+    private void setupHiveCredentials(Cluster sourceCluster, Cluster targetCluster, WORKFLOWAPP workflowApp) {
+        if (isSecurityEnabled) {
+            // add hcatalog credentials for secure mode and add a reference to each action
+            addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME);
+            addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME);
+        }
+
+        // hive-site.xml file is created later in coordinator initialization but
+        // actions are set to point to that here
+
+        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+
+            org.apache.falcon.oozie.workflow.ACTION action =
+                (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if (PREPROCESS_ACTION_NAME.equals(actionName)) {
+                // add reference to hive-site conf to each action
+                action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
+
+                if (isSecurityEnabled) { // add a reference to credential in the action
+                    action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+                }
+            } else if (EXPORT_ACTION_NAME.equals(actionName)) {
+                if (isSecurityEnabled) { // add a reference to credential in the action
+                    action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+                }
+            } else if (IMPORT_ACTION_NAME.equals(actionName)) {
+                if (isSecurityEnabled) { // add a reference to credential in the action
+                    action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
index 79a1883..358475d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
@@ -35,14 +35,14 @@ import java.util.List;
  * Builds orchestration workflow for process where engine is hive.
  */
 public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+    private static final String ACTION_TEMPLATE = "/action/process/hive-action.xml";
+
     public HiveProcessWorkflowBuilder(Process entity) {
         super(entity);
     }
 
-    @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
-        if (!action.getName().equals("user-hive-job")) {
-            return;
-        }
+    @Override protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
+        ACTION action = unmarshalAction(ACTION_TEMPLATE);
 
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(action);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
@@ -65,6 +65,7 @@ public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
             buildPath));
 
         OozieUtils.marshalHiveAction(action, actionJaxbElement);
+        return action;
     }
 
     private void propagateEntityProperties(org.apache.falcon.oozie.hive.ACTION hiveAction) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
index 977d8c1..14668f0 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
@@ -29,15 +29,16 @@ import org.apache.hadoop.fs.Path;
  * Builds oozie workflow for process where the engine is oozie.
  */
 public class OozieProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+    private static final String ACTION_TEMPLATE = "/action/process/oozie-action.xml";
+
     public OozieProcessWorkflowBuilder(Process entity) {
         super(entity);
     }
 
-    @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
-        if (!action.getName().equals("user-oozie-workflow")) {
-            return;
-        }
+    @Override protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
+        ACTION action = unmarshalAction(ACTION_TEMPLATE);
         action.getSubWorkflow().setAppPath(getStoragePath(ProcessHelper.getUserWorkflowPath(entity, cluster,
             buildPath)));
+        return action;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
index 29f601d..6a83ddf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie.process;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
@@ -34,15 +35,14 @@ import java.util.List;
  * Builds orchestration workflow for process where engine is pig.
  */
 public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+    private static final String ACTION_TEMPLATE = "/action/process/pig-action.xml";
 
     public PigProcessWorkflowBuilder(Process entity) {
         super(entity);
     }
 
-    @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
-        if (!action.getName().equals("user-pig-job")) {
-            return;
-        }
+    @Override protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
+        ACTION action = unmarshalAction(ACTION_TEMPLATE);
 
         PIG pigAction = action.getPig();
         Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath);
@@ -56,12 +56,14 @@ public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
 
         propagateEntityProperties(pigAction.getConfiguration(), pigAction.getParam());
 
-        if (isTableStorageType(cluster)) { // adds hive-site.xml in pig classpath
+        if (EntityUtil.isTableStorageType(cluster, entity)) { // adds hive-site.xml in pig classpath
             pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
         }
 
         addArchiveForCustomJars(cluster, pigAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
             buildPath));
+
+        return action;
     }
 
     private void addPrepareDeleteOutputPath(PIG pigAction) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index e46ae6e..e907087 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -89,10 +89,12 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         propagateUserWorkflowProperties(processWorkflow, props);
 
         // create parent wf
-        Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, Tag.DEFAULT).build(cluster, coordPath);
+        Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT).build(cluster,
+            coordPath);
 
         WORKFLOW wf = new WORKFLOW();
         wf.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+        props.putAll(wfProps);
         wf.setConfiguration(getConfig(props));
 
         // set coord action to parent wf
@@ -333,4 +335,16 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         props.put("userWorkflowEngine", processWorkflow.getEngine().value());
     }
 
+    protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
+        String prefix = "falcon_" + input.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        props.put(prefix + "_partition_filter_pig",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
+        props.put(prefix + "_partition_filter_hive",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
+        props.put(prefix + "_partition_filter_java",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 2e3a5c1..2eae7ca 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -53,50 +53,66 @@ import java.util.Set;
  * Base class for building orchestration workflow for process.
  */
 public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Process> {
-    private static final String DEFAULT_WF_TEMPLATE = "/workflow/process-parent-workflow.xml";
+
     private static final Set<String> FALCON_PROCESS_HIVE_ACTIONS = new HashSet<String>(
-        Arrays.asList(new String[]{"recordsize", "user-oozie-workflow", "user-pig-job", "user-hive-job", }));
+        Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, USER_ACTION_NAME, }));
 
     protected ProcessExecutionWorkflowBuilder(Process entity) {
         super(entity, Tag.DEFAULT);
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
-        WORKFLOWAPP wfApp = unmarshal(DEFAULT_WF_TEMPLATE);
+        WORKFLOWAPP wfApp = new WORKFLOWAPP();
         String wfName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
-        wfApp.setName(wfName);
+
+        String startAction = USER_ACTION_NAME;
+        final boolean isTableStorageType = EntityUtil.isTableStorageType(cluster, entity);
+
+        //Add pre-processing action
+        if (shouldPreProcess()) {
+            ACTION preProcessAction = getPreProcessingAction(isTableStorageType, Tag.DEFAULT);
+            addTransition(preProcessAction, USER_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+            wfApp.getDecisionOrForkOrJoin().add(preProcessAction);
+            startAction = PREPROCESS_ACTION_NAME;
+        }
+
+        //Add user action
+        ACTION userAction = getUserAction(cluster, buildPath);
+        addTransition(userAction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        wfApp.getDecisionOrForkOrJoin().add(userAction);
+
+        //Add post-processing
+        ACTION success = getSuccessPostProcessAction();
+        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+        wfApp.getDecisionOrForkOrJoin().add(success);
+
+        ACTION fail = getFailPostProcessAction();
+        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+        wfApp.getDecisionOrForkOrJoin().add(fail);
+
+        decorateWorkflow(wfApp, wfName, startAction);
 
         addLibExtensionsToWorkflow(cluster, wfApp, null);
 
-        final boolean isTableStorageType = isTableStorageType(cluster);
         if (isTableStorageType) {
             setupHiveCredentials(cluster, buildPath, wfApp);
         }
 
-        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof ACTION)) {
-                continue;
-            }
+        marshal(cluster, wfApp, buildPath);
+        Properties props = getProperties(buildPath, wfName);
+        props.putAll(getWorkflowProperties(cluster));
 
-            ACTION action = (ACTION) object;
-            String actionName = action.getName();
-            if (FALCON_ACTIONS.contains(actionName)) {
-                decorateWithOozieRetries(action);
-                if (isTableStorageType && actionName.equals("recordsize")) {
-                    // adds hive-site.xml in actions classpath
-                    action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
-                }
-            }
-
-            decorateAction(action, cluster, buildPath);
-        }
+        return props;
+    }
 
-        //Create parent workflow
-        Path marshalPath = marshal(cluster, wfApp, buildPath);
-        return getProperties(marshalPath, wfName);
+    private Properties getWorkflowProperties(Cluster cluster) {
+        Properties props = new Properties();
+        props.setProperty("falconDataOperation", "GENERATE");
+        props.setProperty("srcClusterName", "NA");
+        return props;
     }
 
-    protected abstract void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException;
+    protected abstract ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException;
 
     private void setupHiveCredentials(Cluster cluster, Path buildPath, WORKFLOWAPP wfApp) throws FalconException {
         // create hive-site.xml file so actions can use it in the classpath

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 9e1c82d..0ae229c 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -40,6 +40,7 @@ import java.util.Properties;
  */
 public final class OozieUtils {
     public static final JAXBContext WORKFLOW_JAXB_CONTEXT;
+    public static final JAXBContext ACTION_JAXB_CONTEXT;
     public static final JAXBContext COORD_JAXB_CONTEXT;
     public static final JAXBContext BUNDLE_JAXB_CONTEXT;
     protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
@@ -47,6 +48,7 @@ public final class OozieUtils {
     static {
         try {
             WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+            ACTION_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.ACTION.class);
             COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
             BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
             HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/eviction-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/eviction-action.xml b/oozie/src/main/resources/action/feed/eviction-action.xml
new file mode 100644
index 0000000..6d03eb0
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/eviction-action.xml
@@ -0,0 +1,55 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<action name="eviction" xmlns="uri:oozie:workflow:0.3">
+    <java>
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+            <!-- HCatalog jars -->
+            <property>
+                <name>oozie.action.sharelib.for.java</name>
+                <value>hcatalog</value>
+            </property>
+        </configuration>
+        <main-class>org.apache.falcon.retention.FeedEvictor</main-class>
+        <arg>-feedBasePath</arg>
+        <arg>${feedDataPath}</arg>
+        <arg>-falconFeedStorageType</arg>
+        <arg>${falconFeedStorageType}</arg>
+        <arg>-retentionType</arg>
+        <arg>instance</arg>
+        <arg>-retentionLimit</arg>
+        <arg>${limit}</arg>
+        <arg>-timeZone</arg>
+        <arg>${timeZone}</arg>
+        <arg>-frequency</arg>
+        <arg>${frequency}</arg>
+        <arg>-logFile</arg>
+        <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+    </java>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/falcon-table-export.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/falcon-table-export.hql b/oozie/src/main/resources/action/feed/falcon-table-export.hql
new file mode 100644
index 0000000..37fd1b7
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/falcon-table-export.hql
@@ -0,0 +1,18 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+export table ${falconSourceDatabase}.${falconSourceTable} partition ${falconSourcePartition} to '${falconSourceStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/falcon-table-import.hql b/oozie/src/main/resources/action/feed/falcon-table-import.hql
new file mode 100644
index 0000000..653d580
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/falcon-table-import.hql
@@ -0,0 +1,20 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+use ${falconTargetDatabase};
+alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
+import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/replication-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml
new file mode 100644
index 0000000..9da0396
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/replication-action.xml
@@ -0,0 +1,59 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- Replication action -->
+<action name="replication" xmlns="uri:oozie:workflow:0.3">
+    <java>
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property> <!-- hadoop 2 parameter -->
+                <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                <value>true</value>
+            </property>
+            <property> <!-- hadoop 1 parameter -->
+                <name>oozie.launcher.mapreduce.user.classpath.first</name>
+                <value>true</value>
+            </property>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+        </configuration>
+        <main-class>org.apache.falcon.replication.FeedReplicator</main-class>
+        <arg>-Dfalcon.include.path=${sourceRelativePaths}</arg>
+        <arg>-Dmapred.job.queue.name=${queueName}</arg>
+        <arg>-Dmapred.job.priority=${jobPriority}</arg>
+        <arg>-maxMaps</arg>
+        <arg>${maxMaps}</arg>
+        <arg>-mapBandwidthKB</arg>
+        <arg>${mapBandwidthKB}</arg>
+        <arg>-sourcePaths</arg>
+        <arg>${distcpSourcePaths}</arg>
+        <arg>-targetPath</arg>
+        <arg>${distcpTargetPaths}</arg>
+        <arg>-falconFeedStorageType</arg>
+        <arg>${falconFeedStorageType}</arg>
+        <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file>
+    </java>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/table-cleanup.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/table-cleanup.xml b/oozie/src/main/resources/action/feed/table-cleanup.xml
new file mode 100644
index 0000000..23e8df1
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/table-cleanup.xml
@@ -0,0 +1,25 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<action name="cleanup-table-staging-dir" xmlns='uri:oozie:workflow:0.3'>
+    <fs>
+        <delete path="${distcpSourcePaths}"/>
+        <delete path="${distcpTargetPaths}"/>
+    </fs>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/table-export.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/table-export.xml b/oozie/src/main/resources/action/feed/table-export.xml
new file mode 100644
index 0000000..f5f7e66
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/table-export.xml
@@ -0,0 +1,45 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- Table Replication - Export data and metadata to HDFS Staging from Source Hive -->
+<action name="table-export" xmlns='uri:oozie:workflow:0.3'>
+    <hive xmlns="uri:oozie:hive-action:0.2">
+        <job-tracker>${falconSourceJobTracker}</job-tracker>
+        <name-node>${falconSourceNameNode}</name-node>
+        <prepare>
+            <delete path="${distcpSourcePaths}"/>
+        </prepare>
+        <job-xml>${wf:appPath()}/conf/falcon-source-hive-site.xml</job-xml>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+        </configuration>
+        <script>${wf:appPath()}/scripts/falcon-table-export.hql</script>
+        <param>falconSourceDatabase=${falconSourceDatabase}</param>
+        <param>falconSourceTable=${falconSourceTable}</param>
+        <param>falconSourcePartition=${falconSourcePartition}</param>
+        <param>falconSourceStagingDir=${distcpSourcePaths}</param>
+    </hive>
+    <ok to="replication"/>
+    <error to="failed-post-processing"/>
+</action>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/table-import.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/table-import.xml b/oozie/src/main/resources/action/feed/table-import.xml
new file mode 100644
index 0000000..6e9a073
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/table-import.xml
@@ -0,0 +1,42 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- Table Replication - Import data and metadata from HDFS Staging into Target Hive -->
+<action name="table-import" xmlns='uri:oozie:workflow:0.3'>
+    <hive xmlns="uri:oozie:hive-action:0.2">
+        <job-tracker>${falconTargetJobTracker}</job-tracker>
+        <name-node>${falconTargetNameNode}</name-node>
+        <job-xml>${wf:appPath()}/conf/falcon-target-hive-site.xml</job-xml>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+        </configuration>
+        <script>${wf:appPath()}/scripts/falcon-table-import.hql</script>
+        <param>falconTargetDatabase=${falconTargetDatabase}</param>
+        <param>falconTargetTable=${falconTargetTable}</param>
+        <param>falconTargetPartition=${falconTargetPartition}</param>
+        <param>falconTargetStagingDir=${distcpTargetPaths}</param>
+    </hive>
+    <ok to="cleanup-table-staging-dir"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
new file mode 100644
index 0000000..1631d63
--- /dev/null
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -0,0 +1,94 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<action name='succeeded-post-processing' xmlns="uri:oozie:workflow:0.3">
+    <java>
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+        </configuration>
+        <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+        <arg>-cluster</arg>
+        <arg>${cluster}</arg>
+        <arg>-entityType</arg>
+        <arg>${entityType}</arg>
+        <arg>-entityName</arg>
+        <arg>${entityName}</arg>
+        <arg>-nominalTime</arg>
+        <arg>${nominalTime}</arg>
+        <arg>-operation</arg>
+        <arg>${falconDataOperation}</arg>
+        <arg>-workflowId</arg>
+        <arg>${wf:id()}</arg>
+        <arg>-runId</arg>
+        <arg>${wf:run()}</arg>
+        <arg>-status</arg>
+        <arg>${wf:lastErrorNode() == '' ? 'SUCCEEDED' : 'FAILED'}</arg>
+        <arg>-timeStamp</arg>
+        <arg>${timeStamp}</arg>
+        <arg>-brokerImplClass</arg>
+        <arg>${wf:conf("broker.impl.class")}</arg>
+        <arg>-brokerUrl</arg>
+        <arg>${wf:conf("broker.url")}</arg>
+        <arg>-userBrokerImplClass</arg>
+        <arg>${userBrokerImplClass}</arg>
+        <arg>-userBrokerUrl</arg>
+        <arg>${userBrokerUrl}</arg>
+        <arg>-brokerTTL</arg>
+        <arg>${wf:conf("broker.ttlInMins")}</arg>
+        <arg>-feedNames</arg>
+        <arg>${feedNames}</arg>
+        <arg>-feedInstancePaths</arg>
+        <arg>${feedInstancePaths}</arg>
+        <arg>-logFile</arg>
+        <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+        <arg>-workflowEngineUrl</arg>
+        <arg>${workflowEngineUrl}</arg>
+        <arg>-subflowId</arg>
+        <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-action" : ""}</arg>
+        <arg>-userWorkflowEngine</arg>
+        <arg>${userWorkflowEngine}</arg>
+        <arg>-userWorkflowName</arg>
+        <arg>${userWorkflowName}</arg>
+        <arg>-userWorkflowVersion</arg>
+        <arg>${userWorkflowVersion}</arg>
+        <arg>-logDir</arg>
+        <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/</arg>
+        <arg>-workflowUser</arg>
+        <arg>${wf:user()}</arg>
+        <arg>-falconInputFeeds</arg>
+        <arg>${falconInputFeeds}</arg>
+        <arg>-falconInputPaths</arg>
+        <arg>${falconInPaths}</arg>
+        <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+        <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+        <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+        <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+        <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+        <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+    </java>
+    <ok to="end"/>
+    <error to="fail"/>
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/pre-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/pre-process.xml b/oozie/src/main/resources/action/pre-process.xml
new file mode 100644
index 0000000..127ab80
--- /dev/null
+++ b/oozie/src/main/resources/action/pre-process.xml
@@ -0,0 +1,50 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<action name='pre-processing' xmlns="uri:oozie:workflow:0.3">
+    <java>
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+            <!-- HCatalog jars -->
+            <property>
+                <name>oozie.action.sharelib.for.java</name>
+                <value>hcatalog</value>
+            </property>
+        </configuration>
+        <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
+        <arg>-out</arg>
+        <arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
+        <arg>-paths</arg>
+        <arg>${falconInPaths}</arg>
+        <arg>-falconInputFeeds</arg>
+        <arg>${falconInputFeeds}</arg>
+        <arg>-falconInputFeedStorageTypes</arg>
+        <arg>${falconInputFeedStorageTypes}</arg>
+        <capture-output/>
+    </java>
+    <ok to="user-action"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/process/hive-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/process/hive-action.xml b/oozie/src/main/resources/action/process/hive-action.xml
new file mode 100644
index 0000000..0e20557
--- /dev/null
+++ b/oozie/src/main/resources/action/process/hive-action.xml
@@ -0,0 +1,37 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<action name="user-action" xmlns="uri:oozie:workflow:0.3">
+    <hive xmlns="uri:oozie:hive-action:0.2">
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+        </configuration>
+        <script>#USER_WF_PATH#</script>
+    </hive>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/process/oozie-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/process/oozie-action.xml b/oozie/src/main/resources/action/process/oozie-action.xml
new file mode 100644
index 0000000..b6fdec1
--- /dev/null
+++ b/oozie/src/main/resources/action/process/oozie-action.xml
@@ -0,0 +1,25 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<action name='user-action' xmlns="uri:oozie:workflow:0.3">
+    <sub-workflow>
+        <app-path>#USER_WF_PATH#</app-path>
+        <propagate-configuration/>
+    </sub-workflow>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>


Mime
View raw message