falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [9/9] git commit: FALCON-369 Refactor workflow builder. Contributed by Shwetha GS
Date Thu, 10 Jul 2014 06:57:38 GMT
FALCON-369 Refactor workflow builder. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/185b5888
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/185b5888
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/185b5888

Branch: refs/heads/master
Commit: 185b58885df921a0179ae2aff8e32b4ec591fa15
Parents: 57953f7
Author: Shwetha GS <shwetha.gs@inmobi.com>
Authored: Thu Jul 10 12:27:13 2014 +0530
Committer: Shwetha GS <shwetha.gs@inmobi.com>
Committed: Thu Jul 10 12:27:13 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/falcon/entity/CatalogStorage.java    |  11 +-
 .../org/apache/falcon/entity/ClusterHelper.java |   2 +-
 .../org/apache/falcon/entity/EntityUtil.java    |  47 +-
 .../org/apache/falcon/entity/FeedHelper.java    |  18 +-
 .../org/apache/falcon/entity/ProcessHelper.java |  39 +
 .../apache/falcon/workflow/WorkflowBuilder.java |  51 --
 .../falcon/entity/CatalogStorageTest.java       |   4 +-
 feed/pom.xml                                    | 105 ---
 .../workflow/OozieFeedWorkflowBuilder.java      | 728 ---------------
 .../coordinator/replication-coordinator.xml     |  51 --
 .../config/workflow/falcon-table-export.hql     |  18 -
 .../config/workflow/falcon-table-import.hql     |  20 -
 .../config/workflow/replication-workflow.xml    | 330 -------
 .../config/workflow/retention-workflow.xml      | 208 -----
 .../converter/OozieFeedWorkflowBuilderTest.java | 669 --------------
 feed/src/test/resources/feed.xml                |  56 --
 feed/src/test/resources/fs-replication-feed.xml |  68 --
 feed/src/test/resources/src-cluster.xml         |  40 -
 .../test/resources/table-replication-feed.xml   |  42 -
 feed/src/test/resources/trg-cluster-alpha.xml   |  39 -
 feed/src/test/resources/trg-cluster-beta.xml    |  39 -
 feed/src/test/resources/trg-cluster.xml         |  40 -
 .../apache/falcon/oozie/OozieBundleBuilder.java | 143 +++
 .../falcon/oozie/OozieCoordinatorBuilder.java   | 181 ++++
 .../apache/falcon/oozie/OozieEntityBuilder.java | 306 +++++++
 .../OozieOrchestrationWorkflowBuilder.java      | 302 +++++++
 .../falcon/oozie/feed/FeedBundleBuilder.java    |  65 ++
 .../feed/FeedReplicationCoordinatorBuilder.java | 418 +++++++++
 .../feed/FeedReplicationWorkflowBuilder.java    | 101 +++
 .../feed/FeedRetentionCoordinatorBuilder.java   | 110 +++
 .../feed/FeedRetentionWorkflowBuilder.java      |  87 ++
 .../process/HiveProcessWorkflowBuilder.java     | 103 +++
 .../process/OozieProcessWorkflowBuilder.java    |  43 +
 .../process/PigProcessWorkflowBuilder.java      |  87 ++
 .../oozie/process/ProcessBundleBuilder.java     | 152 ++++
 .../ProcessExecutionCoordinatorBuilder.java     | 336 +++++++
 .../ProcessExecutionWorkflowBuilder.java        | 233 +++++
 .../falcon/workflow/OozieWorkflowBuilder.java   | 636 -------------
 .../workflow/engine/OozieWorkflowEngine.java    |  95 +-
 .../coordinator/replication-coordinator.xml     |  51 ++
 .../resources/workflow/falcon-table-export.hql  |  18 +
 .../resources/workflow/falcon-table-import.hql  |  20 +
 .../workflow/process-parent-workflow.xml        | 278 ++++++
 .../resources/workflow/replication-workflow.xml | 330 +++++++
 .../resources/workflow/retention-workflow.xml   | 208 +++++
 .../feed/OozieFeedWorkflowBuilderTest.java      | 673 ++++++++++++++
 .../falcon/oozie/process/AbstractTestBase.java  | 141 +++
 .../OozieProcessWorkflowBuilderTest.java        | 767 ++++++++++++++++
 .../resources/config/cluster/cluster-0.1.xml    |  44 +
 .../src/test/resources/config/feed/feed-0.1.xml |  63 ++
 .../config/feed/hive-table-feed-out.xml         |  43 +
 .../resources/config/feed/hive-table-feed.xml   |  43 +
 .../test/resources/config/late/late-cluster.xml |  43 +
 .../test/resources/config/late/late-feed1.xml   |  53 ++
 .../test/resources/config/late/late-feed2.xml   |  53 ++
 .../test/resources/config/late/late-feed3.xml   |  53 ++
 .../resources/config/late/late-process1.xml     |  41 +
 .../resources/config/late/late-process2.xml     |  57 ++
 .../config/process/dumb-hive-process.xml        |  39 +
 .../resources/config/process/dumb-process.xml   |  40 +
 .../config/process/hive-process-FSInputFeed.xml |  46 +
 .../process/hive-process-FSOutputFeed.xml       |  46 +
 .../resources/config/process/hive-process.xml   |  46 +
 .../config/process/pig-process-0.1.xml          |  53 ++
 .../config/process/pig-process-table.xml        |  46 +
 .../resources/config/process/process-0.1.xml    |  45 +
 oozie/src/test/resources/feed/feed.xml          |  56 ++
 .../test/resources/feed/fs-replication-feed.xml |  68 ++
 oozie/src/test/resources/feed/src-cluster.xml   |  40 +
 .../resources/feed/table-replication-feed.xml   |  42 +
 .../test/resources/feed/trg-cluster-alpha.xml   |  39 +
 .../test/resources/feed/trg-cluster-beta.xml    |  39 +
 oozie/src/test/resources/feed/trg-cluster.xml   |  40 +
 pom.xml                                         |   2 -
 .../org/apache/falcon/util/EmbeddedServer.java  |   1 +
 process/pom.xml                                 | 118 ---
 .../workflow/OozieProcessWorkflowBuilder.java   | 904 -------------------
 .../config/workflow/process-parent-workflow.xml | 278 ------
 .../falcon/converter/AbstractTestBase.java      |  83 --
 .../OozieProcessWorkflowBuilderTest.java        | 799 ----------------
 .../resources/config/cluster/cluster-0.1.xml    |  44 -
 .../src/test/resources/config/feed/feed-0.1.xml |  63 --
 .../config/feed/hive-table-feed-out.xml         |  43 -
 .../resources/config/feed/hive-table-feed.xml   |  43 -
 .../test/resources/config/late/late-cluster.xml |  43 -
 .../test/resources/config/late/late-feed1.xml   |  53 --
 .../test/resources/config/late/late-feed2.xml   |  53 --
 .../test/resources/config/late/late-feed3.xml   |  53 --
 .../resources/config/late/late-process1.xml     |  41 -
 .../resources/config/late/late-process2.xml     |  57 --
 .../config/process/dumb-hive-process.xml        |  39 -
 .../resources/config/process/dumb-process.xml   |  40 -
 .../config/process/hive-process-FSInputFeed.xml |  46 -
 .../process/hive-process-FSOutputFeed.xml       |  46 -
 .../resources/config/process/hive-process.xml   |  46 -
 .../config/process/pig-process-0.1.xml          |  53 --
 .../config/process/pig-process-table.xml        |  46 -
 .../resources/config/process/process-0.1.xml    |  45 -
 src/main/examples/app/hive/wordcount.hql        |   2 +-
 src/main/examples/app/pig/hcat-wordcount.pig    |   6 +-
 src/main/examples/data/hcat-generate.sh         |   2 +
 .../examples/entity/filesystem/pig-process.xml  |   4 +-
 .../entity/filesystem/replication-feed.xml      |  46 +
 .../entity/filesystem/standalone-cluster.xml    |   2 +-
 .../filesystem/standalone-target-cluster.xml    |  43 +
 src/main/examples/entity/hcat/hcat-in-feed.xml  |   4 +-
 src/main/examples/entity/hcat/hcat-out-feed.xml |   4 +-
 .../examples/entity/hcat/hcat-pig-process.xml   |  10 +-
 .../entity/hcat/hcat-replication-feed.xml       |  42 +
 .../entity/hcat/hcat-standalone-cluster.xml     |   8 +-
 .../hcat/hcat-standalone-target-cluster.xml     |  45 +
 src/main/examples/entity/hcat/hive-process.xml  |  10 +-
 webapp/pom.xml                                  |   1 +
 114 files changed, 6686 insertions(+), 6271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4fd1cae..35eaf7e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-369 Refactor workflow builder. (Shwetha GS)
+
    FALCON-280 Validate the ACL in Feed entity with the user submitting the entity
    (Jean-Baptiste Onofré via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index f7b592d..89e5b3e 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -28,7 +28,9 @@ import org.apache.falcon.entity.v0.feed.LocationType;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 
@@ -229,19 +231,18 @@ public class CatalogStorage implements Storage {
         return partitions.containsKey(key);
     }
 
-    public String getDatedPartitionKey() {
-        String datedPartitionKey = null;
+    public List<String> getDatedPartitionKeys() {
+        List<String> keys = new ArrayList<String>();
 
         for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
 
             Matcher matcher = FeedDataPath.PATTERN.matcher(entry.getValue());
             if (matcher.find()) {
-                datedPartitionKey = entry.getKey();
-                break;
+                keys.add(entry.getKey());
             }
         }
 
-        return datedPartitionKey;
+        return keys;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 5284d68..cb3ea08 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -140,7 +140,7 @@ public final class ClusterHelper {
         return null;
     }
 
-    public static Map<String, String> geHiveProperties(Cluster cluster) {
+    public static Map<String, String> getHiveProperties(Cluster cluster) {
         if (cluster.getProperties() != null) {
             List<Property> properties = cluster.getProperties().getProperties();
             if (properties != null && !properties.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index b4bc07d..a38e553 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -29,7 +29,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.*;
@@ -435,6 +435,20 @@ public final class EntityUtil {
         return builder.getWorkflowTag(workflowName);
     }
 
+    public static List<String> getWorkflowNames(Entity entity, String cluster) {
+        switch(entity.getEntityType()) {
+        case FEED:
+            return Arrays.asList(getWorkflowName(Tag.RETENTION, entity).toString(),
+                getWorkflowName(Tag.REPLICATION, entity).toString());
+
+        case PROCESS:
+            return Arrays.asList(getWorkflowName(Tag.DEFAULT, entity).toString());
+
+        default:
+        }
+        throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
+    }
+
     public static <T extends Entity> T getClusterView(T entity, String clusterName) {
         switch (entity.getEntityType()) {
         case CLUSTER:
@@ -442,10 +456,10 @@ public final class EntityUtil {
 
         case FEED:
             Feed feed = (Feed) entity.copy();
-            Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
-            Iterator<Cluster> itr = feed.getClusters().getClusters().iterator();
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
+            Iterator<org.apache.falcon.entity.v0.feed.Cluster> itr = feed.getClusters().getClusters().iterator();
             while (itr.hasNext()) {
-                Cluster cluster = itr.next();
+                org.apache.falcon.entity.v0.feed.Cluster cluster = itr.next();
                 //In addition to retaining the required clster, retain the sources clusters if this is the target
                 // cluster
                 //1. Retain cluster if cluster n
@@ -482,7 +496,7 @@ public final class EntityUtil {
 
         case FEED:
             Feed feed = (Feed) entity;
-            for (Cluster cluster : feed.getClusters().getClusters()) {
+            for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
                 clusters.add(cluster.getName());
             }
             break;
@@ -642,4 +656,27 @@ public final class EntityUtil {
         return DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
                 && colo.equals(DeploymentUtil.getCurrentColo()));
     }
+
+    public static Date getNextStartTime(Entity entity, Cluster cluster, Date effectiveTime) {
+        switch(entity.getEntityType()) {
+        case FEED:
+            Feed feed = (Feed) entity;
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+            return getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(), feed.getTimezone(),
+                effectiveTime);
+
+        case PROCESS:
+            Process process = (Process) entity;
+            org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process,
+                cluster.getName());
+            return getNextStartTime(processCluster.getValidity().getStart(), process.getFrequency(),
+                process.getTimezone(), effectiveTime);
+
+        default:
+        }
+
+        throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 44d8d01..8c61ac2 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.util.BuildProperties;
 
 import java.net.URISyntaxException;
 import java.util.Arrays;
@@ -252,7 +253,7 @@ public final class FeedHelper {
                                        Feed feed, CatalogStorage storage, Tag tag, String suffix) {
         String stagingDirPath = getStagingDir(clusterEntity, feed, storage, tag);
 
-        String datedPartitionKey = storage.getDatedPartitionKey();
+        String datedPartitionKey = storage.getDatedPartitionKeys().get(0);
         String datedPartitionKeySuffix = datedPartitionKey + "=${coord:dataOutPartitionValue('output',"
                 + "'" + datedPartitionKey + "')}";
         return stagingDirPath + "/"
@@ -273,4 +274,19 @@ public final class FeedHelper {
                 + storage.getDatabase() + "/"
                 + storage.getTable();
     }
+
+    public static Properties getUserWorkflowProperties(String policy) {
+        Properties props = new Properties();
+        props.put("userWorkflowName", policy + "-policy");
+        props.put("userWorkflowEngine", "falcon");
+
+        String version;
+        try {
+            version = BuildProperties.get().getProperty("build.version");
+        } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
+            version = "0.5";
+        }
+        props.put("userWorkflowVersion", version);
+        return props;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index ece8982..44dac3c 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -26,6 +26,11 @@ import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
 
 /**
  * Helper methods for accessing process members.
@@ -75,4 +80,38 @@ public final class ProcessHelper {
 
         return storageType;
     }
+
+    public static Path getUserWorkflowPath(Process process, org.apache.falcon.entity.v0.cluster.Cluster cluster,
+        Path buildPath) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+            Path wfPath = new Path(process.getWorkflow().getPath());
+            if (fs.isFile(wfPath)) {
+                return new Path(buildPath.getParent(), EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
+            } else {
+                return new Path(buildPath.getParent(), EntityUtil.PROCESS_USER_DIR);
+            }
+        } catch(IOException e) {
+            throw new FalconException("Failed to get workflow path", e);
+        }
+    }
+
+    public static Path getUserLibPath(Process process, org.apache.falcon.entity.v0.cluster.Cluster cluster,
+        Path buildPath) throws FalconException {
+        try {
+            if (process.getWorkflow().getLib() == null) {
+                return null;
+            }
+            Path libPath = new Path(process.getWorkflow().getLib());
+
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+            if (fs.isFile(libPath)) {
+                return new Path(buildPath.getParent(), EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
+            } else {
+                return new Path(buildPath.getParent(), EntityUtil.PROCESS_USERLIB_DIR);
+            }
+        } catch(IOException e) {
+            throw new FalconException("Failed to get user lib path", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
deleted file mode 100644
index 1f9a8c8..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.util.ReflectionUtils;
-
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Builder for building workflow definition for the underlying scheduler.
- * @param <T>
- */
-public abstract class WorkflowBuilder<T extends Entity> {
-    protected T entity;
-
-    protected WorkflowBuilder(T entity) {
-        this.entity = entity;
-    }
-
-    public T getEntity() {
-        return  entity;
-    }
-
-    public static WorkflowBuilder<Entity> getBuilder(String engine, Entity entity) throws FalconException {
-        String classKey = engine + "." + entity.getEntityType().name().toLowerCase() + ".workflow.builder";
-        return ReflectionUtils.getInstance(classKey, entity.getEntityType().getEntityClass(), entity);
-    }
-
-    public abstract Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException;
-
-    public abstract String[] getWorkflowNames();
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
index 972066d..5d06431 100644
--- a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -51,7 +51,7 @@ public class CatalogStorageTest {
         Assert.assertTrue(storage.hasPartition("region"));
         Assert.assertNull(storage.getPartitionValue("unknown"));
         Assert.assertFalse(storage.hasPartition("unknown"));
-        Assert.assertEquals(storage.getDatedPartitionKey(), "ds");
+        Assert.assertEquals(storage.getDatedPartitionKeys().get(0), "ds");
     }
 
     @Test
@@ -67,7 +67,7 @@ public class CatalogStorageTest {
         Assert.assertTrue(storage.hasPartition("region"));
         Assert.assertNull(storage.getPartitionValue("unknown"));
         Assert.assertFalse(storage.hasPartition("unknown"));
-        Assert.assertEquals(storage.getDatedPartitionKey(), "ds");
+        Assert.assertEquals(storage.getDatedPartitionKeys().get(0), "ds");
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/feed/pom.xml
----------------------------------------------------------------------
diff --git a/feed/pom.xml b/feed/pom.xml
deleted file mode 100644
index ab82b77..0000000
--- a/feed/pom.xml
+++ /dev/null
@@ -1,105 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-    
-       http://www.apache.org/licenses/LICENSE-2.0
-        
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.falcon</groupId>
-        <artifactId>falcon-main</artifactId>
-        <version>0.6-incubating-SNAPSHOT</version>
-    </parent>
-    <artifactId>falcon-feed</artifactId>
-    <description>Apache Falcon Feed Module</description>
-    <name>Apache Falcon Feed</name>
-    <packaging>jar</packaging>
-
-    <profiles>
-        <profile>
-            <id>hadoop-1</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-core</artifactId>
-                </dependency>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-test</artifactId>
-                </dependency>
-            </dependencies>
-        </profile>
-        <profile>
-            <id>hadoop-2</id>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-common</artifactId>
-                </dependency>
-                <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-hdfs</artifactId>
-                </dependency>
-                <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-hdfs</artifactId>
-                       <classifier>tests</classifier>
-                 </dependency>
-                 <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-common</artifactId>
-                       <classifier>tests</classifier>
-                 </dependency>
-            </dependencies>
-        </profile>
-    </profiles>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-common</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-oozie-adaptor</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-messaging</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-test-util</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
-        </dependency>
-    </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
deleted file mode 100644
index 4e300bf..0000000
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ /dev/null
@@ -1,728 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
-import org.apache.falcon.oozie.coordinator.ACTION;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Workflow definition builder for feed replication & retention.
- */
-public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
-    private static final Logger LOG = LoggerFactory.getLogger(OozieFeedWorkflowBuilder.class);
-
-    public OozieFeedWorkflowBuilder(Feed entity) {
-        super(entity);
-    }
-
-    @Override
-    public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
-        Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
-
-        for (String clusterName : clusters) {
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, clusterName);
-            if (!feedCluster.getValidity().getStart().before(feedCluster.getValidity().getEnd())) {
-                LOG.info("feed validity start <= end for cluster {}. Skipping schedule", clusterName);
-                break;
-            }
-
-            Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
-            Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
-
-            if (!map(cluster, bundlePath)) {
-                break;
-            }
-            propertiesMap.put(clusterName, createAppProperties(clusterName, bundlePath, CurrentUser.getUser()));
-        }
-        return propertiesMap;
-    }
-
-    @Override
-    public Date getNextStartTime(Feed feed, String cluster, Date now) throws FalconException {
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster);
-        return EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(),
-                feed.getFrequency(), feed.getTimezone(), now);
-    }
-
-    @Override
-    public String[] getWorkflowNames() {
-        return new String[]{
-                EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString(),
-                EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString(), };
-    }
-
-    private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
-    private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
-
-    @Override
-    public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
-        List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
-        COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
-        if (retentionCoord != null) {
-            coords.add(retentionCoord);
-        }
-        List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
-        coords.addAll(replicationCoords);
-        return coords;
-    }
-
-    private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
-
-        if (feedCluster.getValidity().getEnd().before(new Date())) {
-            LOG.warn("Feed Retention is not applicable as Feed's end time for cluster {} is not in the future",
-                    cluster.getName());
-            return null;
-        }
-
-        return retentionMapper.getRetentionCoordinator(cluster, bundlePath, entity, feedCluster);
-    }
-
-    private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
-        throws FalconException {
-        List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
-        if (FeedHelper.getCluster(entity, targetCluster.getName()).getType() == ClusterType.TARGET) {
-            for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) {
-                if (feedCluster.getType() == ClusterType.SOURCE) {
-                    String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
-                    Path coordPath = getCoordPath(bundlePath, coordName);
-                    Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName());
-
-                    // workflow is serialized to a specific dir
-                    Path sourceSpecificWfPath = new Path(coordPath, srcCluster.getName());
-
-                    // Different workflow for each source since hive credentials vary for each cluster
-                    replicationMapper.createReplicationWorkflow(
-                            targetCluster, srcCluster, sourceSpecificWfPath, coordName);
-
-                    COORDINATORAPP coord = replicationMapper.createAndGetCoord(
-                            entity, srcCluster, targetCluster, sourceSpecificWfPath);
-
-                    if (coord != null) {
-                        replicationCoords.add(coord);
-                    }
-                }
-            }
-        }
-
-        return replicationCoords;
-    }
-
-    @Override
-    protected Map<String, String> getEntityProperties() {
-        Map<String, String> props = new HashMap<String, String>();
-        if (entity.getProperties() != null) {
-            for (Property prop : entity.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-        return props;
-    }
-
-    private final class RetentionOozieWorkflowMapper {
-
-        private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
-
-        private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster) throws FalconException {
-            COORDINATORAPP retentionApp = new COORDINATORAPP();
-            String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
-            retentionApp.setName(coordName);
-            retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
-            retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
-            retentionApp.setTimezone(feed.getTimezone().getID());
-            TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
-            if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
-                retentionApp.setFrequency("${coord:hours(6)}");
-            } else {
-                retentionApp.setFrequency("${coord:days(1)}");
-            }
-
-            Path wfPath = getCoordPath(bundlePath, coordName);
-            retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
-            return retentionApp;
-        }
-
-        private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
-            throws FalconException {
-            ACTION retentionAction = new ACTION();
-            WORKFLOW retentionWorkflow = new WORKFLOW();
-            createRetentionWorkflow(cluster, wfPath, wfName);
-            retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
-
-            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfName);
-            props.put("timeZone", entity.getTimezone().getID());
-            props.put("frequency", entity.getFrequency().getTimeUnit().name());
-
-            final Storage storage = FeedHelper.createStorage(cluster, entity);
-            props.put("falconFeedStorageType", storage.getType().name());
-
-            String feedDataPath = storage.getUriTemplate();
-            props.put("feedDataPath",
-                feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
-            props.put("limit", feedCluster.getRetention().getLimit().toString());
-
-            props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
-            props.put(ARG.feedNames.getPropName(), entity.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
-
-            props.put("falconInputFeeds", entity.getName());
-            props.put("falconInPaths", IGNORE);
-
-            propagateUserWorkflowProperties(props, "eviction");
-
-            retentionWorkflow.setConfiguration(getCoordConfig(props));
-            retentionAction.setWorkflow(retentionWorkflow);
-            return retentionAction;
-        }
-
-        private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
-            try {
-                WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
-                retWfApp.setName(wfName);
-                addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
-                addOozieRetries(retWfApp);
-
-                if (shouldSetupHiveConfiguration(cluster, entity)) {
-                    setupHiveCredentials(cluster, wfPath, retWfApp);
-                }
-
-                marshal(cluster, retWfApp, wfPath);
-            } catch(IOException e) {
-                throw new FalconException("Unable to create retention workflow", e);
-            }
-        }
-
-        private void setupHiveCredentials(Cluster cluster, Path wfPath,
-                                          WORKFLOWAPP workflowApp) throws FalconException {
-            if (isSecurityEnabled) {
-                // add hcatalog credentials for secure mode and add a reference to each action
-                addHCatalogCredentials(workflowApp, cluster, HIVE_CREDENTIAL_NAME);
-            }
-
-            // create hive-site.xml file so actions can use it in the classpath
-            createHiveConfiguration(cluster, wfPath, ""); // no prefix since only one hive instance
-
-            for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
-                if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
-                    continue;
-                }
-
-                org.apache.falcon.oozie.workflow.ACTION action =
-                        (org.apache.falcon.oozie.workflow.ACTION) object;
-                String actionName = action.getName();
-                if ("eviction".equals(actionName)) {
-                    // add reference to hive-site conf to each action
-                    action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
-
-                    if (isSecurityEnabled) {
-                        // add a reference to credential in the action
-                        action.setCred(HIVE_CREDENTIAL_NAME);
-                    }
-                }
-            }
-        }
-    }
-
-    private class ReplicationOozieWorkflowMapper {
-        private static final String MR_MAX_MAPS = "maxMaps";
-        private static final String MR_MAP_BANDWIDTH = "mapBandwidthKB";
-
-        private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
-        private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
-        private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
-
-        private static final String TIMEOUT = "timeout";
-        private static final String PARALLEL = "parallel";
-
-        private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth";
-        private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth";
-
-        /**
-         * This method is called for each source serializing a workflow for each source per
-         * target. Additionally, hive credentials are recorded in the workflow definition.
-         *
-         * @param targetCluster target cluster
-         * @param sourceCluster source cluster
-         * @param wfPath workflow path
-         * @param wfName workflow name
-         * @throws FalconException
-         */
-        private void createReplicationWorkflow(Cluster targetCluster, Cluster sourceCluster,
-                                               Path wfPath, String wfName) throws FalconException {
-            WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
-            repWFapp.setName(wfName);
-
-            try {
-                addLibExtensionsToWorkflow(targetCluster, repWFapp, EntityType.FEED, "replication");
-            } catch (IOException e) {
-                throw new FalconException("Unable to add lib extensions to workflow", e);
-            }
-
-            addOozieRetries(repWFapp);
-
-            if (shouldSetupHiveConfiguration(targetCluster, entity)) {
-                setupHiveCredentials(targetCluster, sourceCluster, repWFapp);
-            }
-
-            marshal(targetCluster, repWFapp, wfPath);
-        }
-
-        private void setupHiveCredentials(Cluster targetCluster, Cluster sourceCluster,
-                                          WORKFLOWAPP workflowApp) {
-            if (isSecurityEnabled) {
-                // add hcatalog credentials for secure mode and add a reference to each action
-                addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME);
-                addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME);
-            }
-
-            // hive-site.xml file is created later in coordinator initialization but
-            // actions are set to point to that here
-
-            for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
-                if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
-                    continue;
-                }
-
-                org.apache.falcon.oozie.workflow.ACTION action =
-                        (org.apache.falcon.oozie.workflow.ACTION) object;
-                String actionName = action.getName();
-                if ("recordsize".equals(actionName)) {
-                    // add reference to hive-site conf to each action
-                    action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
-
-                    if (isSecurityEnabled) { // add a reference to credential in the action
-                        action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
-                    }
-                } else if ("table-export".equals(actionName)) {
-                    if (isSecurityEnabled) { // add a reference to credential in the action
-                        action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
-                    }
-                } else if ("table-import".equals(actionName)) {
-                    if (isSecurityEnabled) { // add a reference to credential in the action
-                        action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
-                    }
-                }
-            }
-        }
-
-        private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
-                                                 Path wfPath) throws FalconException {
-            long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
-            Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
-            Date sourceEndDate = getEndDate(feed, srcCluster);
-
-            Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
-            Date targetEndDate = getEndDate(feed, trgCluster);
-
-            if (noOverlapExists(sourceStartDate, sourceEndDate,
-                targetStartDate, targetEndDate)) {
-                LOG.warn("Not creating replication coordinator, as the source cluster: {} and target cluster: {} do "
-                    + "not have overlapping dates", srcCluster.getName(), trgCluster.getName());
-                return null;
-            }
-
-            COORDINATORAPP replicationCoord;
-            try {
-                replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
-            } catch (FalconException e) {
-                throw new FalconException("Cannot unmarshall replication coordinator template", e);
-            }
-
-            String coordName = EntityUtil.getWorkflowName(
-                Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
-            String start = sourceStartDate.after(targetStartDate)
-                ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
-            String end = sourceEndDate.before(targetEndDate)
-                ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
-
-            initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
-            setCoordControls(feed, replicationCoord);
-
-            final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
-            initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
-
-            final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
-            initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
-
-            ACTION replicationWorkflowAction = getReplicationWorkflowAction(
-                srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
-            replicationCoord.setAction(replicationWorkflowAction);
-
-            return replicationCoord;
-        }
-
-        private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
-            Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
-            return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
-        }
-
-        private Date getEndDate(Feed feed, Cluster cluster) {
-            return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
-        }
-
-        private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
-            Date targetStartDate, Date targetEndDate) {
-            return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
-        }
-
-        private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
-            Feed feed, String start, String end, long delayInMillis) {
-            replicationCoord.setName(coordName);
-            replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
-            if (delayInMillis > 0) {
-                long delayInMins = -1 * delayInMillis / (1000 * 60);
-                String elExp = "${now(0," + delayInMins + ")}";
-
-                replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
-                replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
-            }
-
-            replicationCoord.setStart(start);
-            replicationCoord.setEnd(end);
-            replicationCoord.setTimezone(feed.getTimezone().getID());
-        }
-
-        private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
-            Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
-            long delayInMillis=0;
-            if (replicationDelay != null) {
-                delayInMillis = ExpressionHelper.get().evaluate(
-                    replicationDelay.toString(), Long.class);
-            }
-
-            return delayInMillis;
-        }
-
-        private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
-            long frequencyInMillis = ExpressionHelper.get().evaluate(
-                feed.getFrequency().toString(), Long.class);
-            long timeoutInMillis = frequencyInMillis * 6;
-            if (timeoutInMillis < THIRTY_MINUTES) {
-                timeoutInMillis = THIRTY_MINUTES;
-            }
-
-            Map<String, String> props = getEntityProperties();
-            String timeout = props.get(TIMEOUT);
-            if (timeout!=null) {
-                try{
-                    timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
-                } catch (Exception ignore) {
-                    LOG.error("Unable to evaluate timeout:", ignore);
-                }
-            }
-            replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-            replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
-
-            String parallelProp = props.get(PARALLEL);
-            int parallel = 1;
-            if (parallelProp != null) {
-                try {
-                    parallel = Integer.parseInt(parallelProp);
-                } catch (NumberFormatException ignore) {
-                    LOG.error("Unable to parse parallel:", ignore);
-                }
-            }
-            replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
-        }
-
-        private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
-            Storage sourceStorage) throws FalconException {
-            SYNCDATASET inputDataset = (SYNCDATASET)
-                replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
-            String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
-            if (sourceStorage.getType() == Storage.TYPE.TABLE) {
-                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-            }
-            inputDataset.setUriTemplate(uriTemplate);
-
-            setDatasetValues(inputDataset, feed, srcCluster);
-
-            if (feed.getAvailabilityFlag() == null) {
-                inputDataset.setDoneFlag("");
-            } else {
-                inputDataset.setDoneFlag(feed.getAvailabilityFlag());
-            }
-        }
-
-        private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
-            Storage targetStorage) throws FalconException {
-            SYNCDATASET outputDataset = (SYNCDATASET)
-                replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
-
-            String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
-            if (targetStorage.getType() == Storage.TYPE.TABLE) {
-                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-            }
-            outputDataset.setUriTemplate(uriTemplate);
-
-            setDatasetValues(outputDataset, feed, targetCluster);
-        }
-
-        private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
-            dataset.setInitialInstance(SchemaHelper.formatDateUTC(
-                FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
-            dataset.setTimezone(feed.getTimezone().getID());
-            dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-        }
-
-        private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
-                                                    String wfName, Storage sourceStorage,
-                                                    Storage targetStorage) throws FalconException {
-            ACTION replicationAction = new ACTION();
-            WORKFLOW replicationWF = new WORKFLOW();
-
-            replicationWF.setAppPath(getStoragePath(wfPath.toString()));
-            Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfName);
-            props.put("srcClusterName", srcCluster.getName());
-            props.put("srcClusterColo", srcCluster.getColo());
-            if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
-                props.put(MR_MAX_MAPS, getDefaultMaxMaps());
-            }
-            if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
-                props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
-            }
-
-            // the storage type is uniform across source and target feeds for replication
-            props.put("falconFeedStorageType", sourceStorage.getType().name());
-
-            String instancePaths = null;
-            if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
-                String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, entity);
-                instancePaths = pathsWithPartitions;
-
-                propagateFileSystemCopyProperties(pathsWithPartitions, props);
-            } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
-                instancePaths = "${coord:dataIn('input')}";
-
-                final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
-                propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
-                final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
-                propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
-                propagateTableCopyProperties(srcCluster, sourceTableStorage,
-                    trgCluster, targetTableStorage, props);
-                setupHiveConfiguration(srcCluster, trgCluster, wfPath);
-            }
-
-            propagateLateDataProperties(entity, instancePaths, sourceStorage.getType().name(), props);
-            propagateUserWorkflowProperties(props, "replication");
-
-            replicationWF.setConfiguration(getCoordConfig(props));
-            replicationAction.setWorkflow(replicationWF);
-
-            return replicationAction;
-        }
-
-        private String getDefaultMaxMaps() {
-            return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
-        }
-
-        private String getDefaultMapBandwidth() {
-            return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidthKB", "102400");
-        }
-
-        private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
-            Feed feed) throws FalconException {
-            String srcPart = FeedHelper.normalizePartitionExpression(
-                FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
-            srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
-
-            String targetPart = FeedHelper.normalizePartitionExpression(
-                FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
-            targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
-
-            StringBuilder pathsWithPartitions = new StringBuilder();
-            pathsWithPartitions.append("${coord:dataIn('input')}/")
-                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
-            String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
-            parts = StringUtils.stripEnd(parts, "/");
-            return parts;
-        }
-
-        private void propagateFileSystemCopyProperties(String pathsWithPartitions,
-            Map<String, String> props) throws FalconException {
-            props.put("sourceRelativePaths", pathsWithPartitions);
-
-            props.put("distcpSourcePaths", "${coord:dataIn('input')}");
-            props.put("distcpTargetPaths", "${coord:dataOut('output')}");
-        }
-
-        private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
-            Map<String, String> props, String prefix) {
-            props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
-            props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
-            props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
-
-            props.put(prefix + "Database", tableStorage.getDatabase());
-            props.put(prefix + "Table", tableStorage.getTable());
-            props.put(prefix + "Partition", "(${coord:dataInPartitions('input', 'hive-export')})");
-        }
-
-        private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
-                                            Path wfPath) throws FalconException {
-            Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
-
-            try {
-                // copy import export scripts to stagingDir
-                Path scriptPath = new Path(wfPath, "scripts");
-                copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
-                copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
-
-                // create hive conf to stagingDir
-                Path confPath = new Path(wfPath + "/conf");
-                persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-");
-                persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-");
-            } catch (IOException e) {
-                throw new FalconException("Unable to create hive conf files", e);
-            }
-        }
-
-        private void copyHiveScript(FileSystem fs, Path scriptPath,
-            String localScriptPath, String scriptName) throws IOException {
-            OutputStream out = null;
-            InputStream in = null;
-            try {
-                out = fs.create(new Path(scriptPath, scriptName));
-                in = OozieFeedWorkflowBuilder.class.getResourceAsStream(localScriptPath + scriptName);
-                IOUtils.copy(in, out);
-            } finally {
-                IOUtils.closeQuietly(in);
-                IOUtils.closeQuietly(out);
-            }
-        }
-
-        private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
-            Cluster trgCluster, CatalogStorage targetStorage,
-            Map<String, String> props) {
-            // create staging dirs for export at source & set it as distcpSourcePaths
-            String sourceStagingPath =
-                FeedHelper.getStagingPath(srcCluster, entity, sourceStorage, Tag.REPLICATION,
-                        NOMINAL_TIME_EL + "/" + trgCluster.getName());
-            props.put("distcpSourcePaths", sourceStagingPath);
-
-            // create staging dirs for import at target & set it as distcpTargetPaths
-            String targetStagingPath =
-                FeedHelper.getStagingPath(trgCluster, entity, targetStorage, Tag.REPLICATION,
-                        NOMINAL_TIME_EL + "/" + trgCluster.getName());
-            props.put("distcpTargetPaths", targetStagingPath);
-
-            props.put("sourceRelativePaths", IGNORE); // this will bot be used for Table storage.
-        }
-
-        private void propagateLateDataProperties(Feed feed, String instancePaths,
-            String falconFeedStorageType, Map<String, String> props) {
-            // todo these pairs are the same but used in different context
-            // late data handler - should-record action
-            props.put("falconInputFeeds", feed.getName());
-            props.put("falconInPaths", instancePaths);
-
-            // storage type for each corresponding feed - in this case only one feed is involved
-            // needed to compute usage based on storage type in LateDataHandler
-            props.put("falconInputFeedStorageTypes", falconFeedStorageType);
-
-            // falcon post processing
-            props.put(ARG.feedNames.getPropName(), feed.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
-        }
-    }
-
-    private void addOozieRetries(WORKFLOWAPP workflow) {
-        for (Object object : workflow.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
-                continue;
-            }
-            org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
-            String actionName = action.getName();
-            if (FALCON_ACTIONS.contains(actionName)) {
-                decorateWithOozieRetries(action);
-            }
-        }
-    }
-
-    private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
-        props.put("userWorkflowName", policy + "-policy");
-        props.put("userWorkflowEngine", "falcon");
-
-        String version;
-        try {
-            version = BuildProperties.get().getProperty("build.version");
-        } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
-            version = "0.5";
-        }
-        props.put("userWorkflowVersion", version);
-    }
-
-    protected boolean shouldSetupHiveConfiguration(Cluster cluster,
-                                                   Feed feed) throws FalconException {
-        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
-        return Storage.TYPE.TABLE == storageType;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/feed/src/main/resources/config/coordinator/replication-coordinator.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/coordinator/replication-coordinator.xml b/feed/src/main/resources/config/coordinator/replication-coordinator.xml
deleted file mode 100644
index 693b0bd..0000000
--- a/feed/src/main/resources/config/coordinator/replication-coordinator.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<coordinator-app name="#NAME#" frequency="#FREQUENCY#"
-                 start="#START_TIME#" end="#END_TIME" timezone="#TIMEZONE#"
-                 xmlns="uri:oozie:coordinator:0.3">
-    <controls>
-        <concurrency>1</concurrency>
-        <execution>FIFO</execution>
-    </controls>
-    <datasets>
-        <dataset name="input-dataset" frequency="#FEED_FREQ#"
-                 initial-instance="#START_TIME#" timezone="#TIMEZONE#">
-            <uri-template>#FEED_PATH#</uri-template>
-        </dataset>
-        <dataset name="output-dataset" frequency="#FEED_FREQ#"
-                 initial-instance="#START_TIME#" timezone="#TIMEZONE#">
-            <uri-template>#FEED_PATH#</uri-template>
-        </dataset>
-    </datasets>
-    <input-events>
-        <data-in name="input" dataset="input-dataset">
-            <instance>${coord:current(0)}</instance>
-        </data-in>
-    </input-events>
-    <output-events>
-        <data-out name="output" dataset="output-dataset">
-            <instance>${coord:current(0)}</instance>
-        </data-out>
-    </output-events>
-    <action>
-        <workflow>
-            <app-path>#WF_PATH#</app-path>
-            <configuration/>
-        </workflow>
-    </action>
-</coordinator-app>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/feed/src/main/resources/config/workflow/falcon-table-export.hql
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/falcon-table-export.hql b/feed/src/main/resources/config/workflow/falcon-table-export.hql
deleted file mode 100644
index 37fd1b7..0000000
--- a/feed/src/main/resources/config/workflow/falcon-table-export.hql
+++ /dev/null
@@ -1,18 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-export table ${falconSourceDatabase}.${falconSourceTable} partition ${falconSourcePartition} to '${falconSourceStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/feed/src/main/resources/config/workflow/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/falcon-table-import.hql b/feed/src/main/resources/config/workflow/falcon-table-import.hql
deleted file mode 100644
index 653d580..0000000
--- a/feed/src/main/resources/config/workflow/falcon-table-import.hql
+++ /dev/null
@@ -1,20 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-use ${falconTargetDatabase};
-alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
-import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
deleted file mode 100644
index 0748acf..0000000
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ /dev/null
@@ -1,330 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-feed-parent-workflow'>
-    <start to='should-record'/>
-    <decision name='should-record'>
-        <switch>
-            <case to="recordsize">
-                ${shouldRecord=="true"}
-            </case>
-            <default to="replication-decision"/>
-        </switch>
-    </decision>
-    <action name='recordsize'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-                <!-- HCatalog jars -->
-                <property>
-                    <name>oozie.action.sharelib.for.java</name>
-                    <value>hcatalog</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
-            <arg>-out</arg>
-            <arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
-            <arg>-paths</arg>
-            <arg>${falconInPaths}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputFeedStorageTypes</arg>
-            <arg>${falconInputFeedStorageTypes}</arg>
-            <capture-output/>
-        </java>
-        <ok to="replication-decision"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <decision name="replication-decision">
-        <switch>
-            <case to="table-export">
-                ${falconFeedStorageType == "TABLE"}
-            </case>
-            <default to="replication"/>
-        </switch>
-    </decision>
-    <!-- Table Replication - Export data and metadata to HDFS Staging from Source Hive -->
-    <action name="table-export">
-        <hive xmlns="uri:oozie:hive-action:0.2">
-            <job-tracker>${falconSourceJobTracker}</job-tracker>
-            <name-node>${falconSourceNameNode}</name-node>
-            <prepare>
-                <delete path="${distcpSourcePaths}"/>
-            </prepare>
-            <job-xml>${wf:appPath()}/conf/falcon-source-hive-site.xml</job-xml>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <script>${wf:appPath()}/scripts/falcon-table-export.hql</script>
-            <param>falconSourceDatabase=${falconSourceDatabase}</param>
-            <param>falconSourceTable=${falconSourceTable}</param>
-            <param>falconSourcePartition=${falconSourcePartition}</param>
-            <param>falconSourceStagingDir=${distcpSourcePaths}</param>
-        </hive>
-        <ok to="replication"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <!-- Replication action -->
-    <action name="replication">
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property> <!-- hadoop 2 parameter -->
-                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
-                    <value>true</value>
-                </property>
-                <property> <!-- hadoop 1 parameter -->
-                    <name>oozie.launcher.mapreduce.user.classpath.first</name>
-                    <value>true</value>
-                </property>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.replication.FeedReplicator</main-class>
-            <arg>-Dfalcon.include.path=${sourceRelativePaths}</arg>
-            <arg>-Dmapred.job.queue.name=${queueName}</arg>
-            <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-maxMaps</arg>
-            <arg>${maxMaps}</arg>
-            <arg>-mapBandwidthKB</arg>
-            <arg>${mapBandwidthKB}</arg>
-            <arg>-sourcePaths</arg>
-            <arg>${distcpSourcePaths}</arg>
-            <arg>-targetPath</arg>
-            <arg>${distcpTargetPaths}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
-            <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file>
-        </java>
-        <ok to="post-replication-decision"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <decision name="post-replication-decision">
-        <switch>
-            <case to="table-import">
-                ${falconFeedStorageType == "TABLE"}
-            </case>
-            <default to="succeeded-post-processing"/>
-        </switch>
-    </decision>
-    <!-- Table Replication - Import data and metadata from HDFS Staging into Target Hive -->
-    <action name="table-import">
-        <hive xmlns="uri:oozie:hive-action:0.2">
-            <job-tracker>${falconTargetJobTracker}</job-tracker>
-            <name-node>${falconTargetNameNode}</name-node>
-            <job-xml>${wf:appPath()}/conf/falcon-target-hive-site.xml</job-xml>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <script>${wf:appPath()}/scripts/falcon-table-import.hql</script>
-            <param>falconTargetDatabase=${falconTargetDatabase}</param>
-            <param>falconTargetTable=${falconTargetTable}</param>
-            <param>falconTargetPartition=${falconTargetPartition}</param>
-            <param>falconTargetStagingDir=${distcpTargetPaths}</param>
-        </hive>
-        <ok to="cleanup-table-staging-dir"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name="cleanup-table-staging-dir">
-        <fs>
-            <delete path="${distcpSourcePaths}"/>
-            <delete path="${distcpTargetPaths}"/>
-        </fs>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name='succeeded-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>REPLICATE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>SUCCEEDED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-userWorkflowName</arg>
-            <arg>${userWorkflowName}</arg>
-            <arg>-userWorkflowVersion</arg>
-            <arg>${userWorkflowVersion}</arg>
-            <arg>-userWorkflowEngine</arg>
-            <arg>${userWorkflowEngine}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputPaths</arg>
-            <arg>${falconInPaths}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="end"/>
-        <error to="fail"/>
-    </action>
-    <action name='failed-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>REPLICATE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>FAILED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="fail"/>
-        <error to="fail"/>
-    </action>
-    <kill name="fail">
-        <message>
-            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-        </message>
-    </kill>
-    <end name='end'/>
-</workflow-app>


Mime
View raw message