falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [03/12] falcon git commit: FALCON-1188 Falcon support for Hive Replication. Contributed by Venkat Ranganathan.
Date Thu, 13 Aug 2015 09:08:47 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
new file mode 100644
index 0000000..c441998
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
@@ -0,0 +1,293 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-dr-hive-workflow'>
+    <start to='last-event'/>
+    <action name="last-event">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>lastevents</arg>
+        </java>
+        <ok to="export-dr-replication"/>
+        <error to="failure"/>
+    </action>
+    <!-- Export Replication action -->
+    <action name="export-dr-replication">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-replicationMaxMaps</arg>
+            <arg>${replicationMaxMaps}</arg>
+            <arg>-distcpMaxMaps</arg>
+            <arg>${distcpMaxMaps}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-distcpMapBandwidth</arg>
+            <arg>${distcpMapBandwidth}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>export</arg>
+        </java>
+        <ok to="import-dr-replication"/>
+        <error to="failure"/>
+    </action>
+    <!-- Import Replication action -->
+    <action name="import-dr-replication">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-replicationMaxMaps</arg>
+            <arg>${replicationMaxMaps}</arg>
+            <arg>-distcpMaxMaps</arg>
+            <arg>${distcpMaxMaps}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-distcpMapBandwidth</arg>
+            <arg>${distcpMapBandwidth}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>import</arg>
+        </java>
+        <ok to="success"/>
+        <error to="failure"/>
+    </action>
+    <decision name="success">
+        <switch>
+            <case to="successAlert">
+                ${drNotificationReceivers ne 'NA'}
+            </case>
+            <default to="end"/>
+        </switch>
+    </decision>
+    <decision name="failure">
+        <switch>
+            <case to="failureAlert">
+                ${drNotificationReceivers ne 'NA'}
+            </case>
+            <default to="fail"/>
+        </switch>
+    </decision>
+    <action name="successAlert">
+        <email xmlns="uri:oozie:email-action:0.2">
+            <to>${drNotificationReceivers}</to>
+            <subject>INFO: Hive DR workflow ${drJobName} completed successfully</subject>
+            <body>
+                The Hive DR workflow ${wf:id()} is successful.
+                Source          = ${sourceCluster}
+                Target          = ${targetCluster}
+                DB Name         = ${sourceDatabase}
+                Table Name      = ${sourceTable}
+            </body>
+        </email>
+        <ok to="end"/>
+        <error to="end"/>
+    </action>
+    <action name="failureAlert">
+        <email xmlns="uri:oozie:email-action:0.2">
+            <to>${drNotificationReceivers}</to>
+            <subject>ERROR: Hive DR workflow ${drJobName} failed</subject>
+            <body>
+                The Hive DR workflow ${wf:id()} had issues and was killed.  The error message is: ${wf:errorMessage(wf:lastErrorNode())}
+                Source          = ${sourceCluster}
+                Target          = ${targetCluster}
+                DB Name         = ${sourceDatabase}
+                Table Name      = ${sourceTable}
+            </body>
+        </email>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>
+            Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+        </message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
new file mode 100644
index 0000000..42ae30b
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+##### NOTE: This is a TEMPLATE file which can be copied and edited
+
+##### Recipe properties
+falcon.recipe.name=hive-disaster-recovery
+
+
+##### Workflow properties
+falcon.recipe.workflow.name=hive-dr-workflow
+# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
+falcon.recipe.workflow.path=/recipes/hive-replication/hive-disaster-recovery-workflow.xml
+
+##### Cluster properties
+
+# Change the cluster name where replication job should run here
+falcon.recipe.cluster.name=backupCluster
+# Change the cluster hdfs write end point here. This is mandatory.
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020
+# Change the cluster validity start time here
+falcon.recipe.cluster.validity.start=2014-10-01T00:00Z
+# Change the cluster validity end time here
+falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
+
+##### Scheduling properties
+
+# Change the process frequency here. Valid frequency type are minutes, hours, days, months
+falcon.recipe.process.frequency=minutes(60)
+
+##### Retry policy properties
+
+falcon.recipe.retry.policy=periodic
+falcon.recipe.retry.delay=minutes(30)
+falcon.recipe.retry.attempts=3
+
+##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=owner=landing,pipeline=adtech
+
+##### ACL properties - Uncomment and change ACL if authorization is enabled
+
+#falcon.recipe.acl.owner=testuser
+#falcon.recipe.acl.group=group
+#falcon.recipe.acl.permission=0x755
+
+##### Custom Job properties
+
+##### Source Cluster DR properties
+sourceCluster=primaryCluster
+sourceMetastoreUri=thrift://localhost:9083
+sourceHiveServer2Uri=hive2://localhost:10000
+# For DB level replicaiton to replicate multiple databases specify comma separated list of tables
+sourceDatabase=default
+# For DB level replication specify * for sourceTable.
+# For table level replication to replicate multiple tables specify comma separated list of tables
+sourceTable=testtable_dr
+sourceStagingPath=/apps/hive/tools/dr
+sourceNN=hdfs://localhost:8020
+
+##### Target Cluster DR properties
+targetCluster=backupCluster
+targetMetastoreUri=thrift://localhost:9083
+targetHiveServer2Uri=hive2://localhost:10000
+targetStagingPath=/apps/hive/tools/dr
+targetNN=hdfs://localhost:8020
+
+# To ceil the max events processed each time job runs. Set it to max value depending on your bandwidth limit.
+# Setting it to -1 will process all the events but can hog up the bandwidth. Use it judiciously!
+maxEvents=-1
+# Change it to specify the maximum number of mappers for replication
+replicationMaxMaps=5
+# Change it to specify the maximum number of mappers for DistCP
+distcpMaxMaps=1
+# Change it to specify the bandwidth in MB for each mapper in DistCP
+distcpMapBandwidth=100
+
+##### Email on failure
+drNotificationReceivers=NA
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index afa91c9..c162125 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -105,6 +105,12 @@
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-webhcat-java-client</artifactId>
+            <version>${hive.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 148f789..11f6bff 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -124,8 +124,17 @@ public class FalconCLI {
     // Recipe Command
     public static final String RECIPE_CMD = "recipe";
     public static final String RECIPE_NAME = "name";
+    public static final String RECIPE_OPERATION= "operation";
     public static final String RECIPE_TOOL_CLASS_NAME = "tool";
 
+    /**
+     * Recipe operation enum.
+     */
+    public static enum RecipeOperation {
+        HDFS_REPLICATION,
+        HIVE_DISASTER_RECOVERY
+    }
+
     private final Properties clientProperties;
 
     public FalconCLI() throws Exception {
@@ -914,6 +923,9 @@ public class FalconCLI {
         Option recipeToolClassName = new Option(RECIPE_TOOL_CLASS_NAME, true, "recipe class");
         recipeOptions.addOption(recipeToolClassName);
 
+        Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe operation");
+        recipeOptions.addOption(recipeOperation);
+
         return recipeOptions;
     }
 
@@ -1005,11 +1017,23 @@ public class FalconCLI {
     private void recipeCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException {
         String recipeName = commandLine.getOptionValue(RECIPE_NAME);
         String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
+        String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION);
 
         validateNotEmpty(recipeName, RECIPE_NAME);
+        validateNotEmpty(recipeOperation, RECIPE_OPERATION);
+        validateRecipeOperations(recipeOperation);
 
-        String result =
-            client.submitRecipe(recipeName, recipeToolClass).getMessage();
+        String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation).toString();
         OUT.get().println(result);
     }
+
+    private static void validateRecipeOperations(String recipeOperation) throws FalconCLIException {
+        for(RecipeOperation operation : RecipeOperation.values()) {
+            if (operation.toString().equalsIgnoreCase(recipeOperation)) {
+                return;
+            }
+        }
+        throw new FalconCLIException("Allowed Recipe operations: "
+                + java.util.Arrays.asList((RecipeOperation.values())));
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 9649e10..d9bdf64 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -963,7 +963,8 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     public APIResult submitRecipe(String recipeName,
-                               String recipeToolClassName) throws FalconCLIException {
+                               String recipeToolClassName,
+                               final String recipeOperation) throws FalconCLIException {
         String recipePath = clientProperties.getProperty("falcon.recipe.path");
 
         if (StringUtils.isEmpty(recipePath)) {
@@ -999,6 +1000,7 @@ public class FalconClient extends AbstractFalconClient {
                 "-" + RecipeToolArgs.RECIPE_FILE_ARG.getName(), recipeFilePath,
                 "-" + RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG.getName(), propertiesFilePath,
                 "-" + RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG.getName(), processFile,
+                "-" + RecipeToolArgs.RECIPE_OPERATION_ARG.getName(), recipeOperation,
             };
 
             if (recipeToolClassName != null) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
new file mode 100644
index 0000000..cf24078
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Properties;
+import java.io.File;
+
+/**
+ * Hdfs Replication recipe tool for Falcon recipes.
+ */
+public class HdfsReplicationRecipeTool implements Recipe {
+
+    private static final String COMMA_SEPARATOR = ",";
+
+    @Override
+    public void validate(final Properties recipeProperties) {
+        for (HdfsReplicationRecipeToolOptions option : HdfsReplicationRecipeToolOptions.values()) {
+            if (recipeProperties.getProperty(option.getName()) == null && option.isRequired()) {
+                throw new IllegalArgumentException("Missing argument: " + option.getName());
+            }
+        }
+    }
+
+    @Override
+    public Properties getAdditionalSystemProperties(final Properties recipeProperties) {
+        Properties additionalProperties = new Properties();
+
+        // Construct fully qualified hdfs src path
+        String srcPaths = recipeProperties.getProperty(HdfsReplicationRecipeToolOptions
+                .REPLICATION_SOURCE_DIR.getName());
+        StringBuilder absoluteSrcPaths = new StringBuilder();
+        String srcFsPath = recipeProperties.getProperty(
+                HdfsReplicationRecipeToolOptions.REPLICATION_SOURCE_CLUSTER_FS_WRITE_ENDPOINT.getName());
+        if (StringUtils.isNotEmpty(srcFsPath)) {
+            srcFsPath = StringUtils.removeEnd(srcFsPath, File.separator);
+        }
+        if (StringUtils.isNotEmpty(srcPaths)) {
+            String[] paths = srcPaths.split(COMMA_SEPARATOR);
+
+            for (String path : paths) {
+                StringBuilder srcpath = new StringBuilder(srcFsPath);
+                srcpath.append(path.trim());
+                srcpath.append(COMMA_SEPARATOR);
+                absoluteSrcPaths.append(srcpath);
+            }
+        }
+
+        additionalProperties.put(HdfsReplicationRecipeToolOptions.REPLICATION_SOURCE_DIR.getName(),
+                StringUtils.removeEnd(absoluteSrcPaths.toString(), COMMA_SEPARATOR));
+        return additionalProperties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
new file mode 100644
index 0000000..4c3b543
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+/**
+ * Hdfs Recipe tool options.
+ */
+public enum HdfsReplicationRecipeToolOptions {
+    REPLICATION_SOURCE_DIR("drSourceDir", "Location of source data to replicate"),
+    REPLICATION_SOURCE_CLUSTER_FS_WRITE_ENDPOINT("drSourceClusterFS", "Source replication cluster end point"),
+    REPLICATION_TARGET_DIR("drTargetDir", "Location on target cluster for replication"),
+    REPLICATION_TARGET_CLUSTER_FS_WRITE_ENDPOINT("drTargetClusterFS", "Target replication cluster end point"),
+    REPLICATION_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during replication"),
+    REPLICATION_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication");
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    HdfsReplicationRecipeToolOptions(String name, String description) {
+        this(name, description, true);
+    }
+
+    HdfsReplicationRecipeToolOptions(String name, String description, boolean isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
new file mode 100644
index 0000000..8b39673
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatDatabase;
+import org.apache.hive.hcatalog.api.HCatTable;
+import org.apache.hive.hcatalog.api.ObjectNotFoundException;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hive.hcatalog.common.HCatException;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Hive Replication recipe tool for Falcon recipes.
+ */
+public class HiveReplicationRecipeTool implements Recipe {
+    private static final String ALL_TABLES = "*";
+
+    @Override
+    public void validate(final Properties recipeProperties) throws Exception {
+        for (HiveReplicationRecipeToolOptions option : HiveReplicationRecipeToolOptions.values()) {
+            if (recipeProperties.getProperty(option.getName()) == null && option.isRequired()) {
+                throw new IllegalArgumentException("Missing argument: " + option.getName());
+            }
+        }
+
+        HCatClient sourceMetastoreClient = null;
+        HCatClient targetMetastoreClient = null;
+        try {
+            // Validate if DB exists - source and target
+            sourceMetastoreClient = getHiveMetaStoreClient(
+                    recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            .REPLICATION_SOURCE_METASTORE_URI.getName()),
+                    recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            .REPLICATION_SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName()),
+                    recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            .REPLICATION_SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()));
+
+            String sourceDbList = recipeProperties.getProperty(
+                    HiveReplicationRecipeToolOptions.REPLICATION_SOURCE_DATABASE.getName());
+
+            if (StringUtils.isEmpty(sourceDbList)) {
+                throw new Exception("No source DB specified in property file");
+            }
+
+            String sourceTableList = recipeProperties.getProperty(
+                    HiveReplicationRecipeToolOptions.REPLICATION_SOURCE_TABLE.getName());
+            if (StringUtils.isEmpty(sourceTableList)) {
+                throw new Exception("No source table specified in property file. For DB replication please specify * "
+                        + "for sourceTable");
+            }
+
+            String[] srcDbs = sourceDbList.split(",");
+            if (srcDbs.length <= 0) {
+                throw new Exception("No source DB specified in property file");
+            }
+            for (String db : srcDbs) {
+                if (!dbExists(sourceMetastoreClient, db)) {
+                    throw new Exception("Database " + db + " doesn't exist on source cluster");
+                }
+            }
+
+            if (!sourceTableList.equals(ALL_TABLES)) {
+                String[] srcTables = sourceTableList.split(",");
+                if (srcTables.length > 0) {
+                    for (String table : srcTables) {
+                        if (!tableExists(sourceMetastoreClient, srcDbs[0], table)) {
+                            throw new Exception("Table " + table + " doesn't exist on source cluster");
+                        }
+                    }
+                }
+            }
+
+            targetMetastoreClient = getHiveMetaStoreClient(
+                    recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            .REPLICATION_TARGET_METASTORE_URI.getName()),
+                    recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            .REPLICATION_TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName()),
+                    recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            .REPLICATION_TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()));
+            // Verify db exists on target
+            for (String db : srcDbs) {
+                if (!dbExists(targetMetastoreClient, db)) {
+                    throw new Exception("Database " + db + " doesn't exist on target cluster");
+                }
+            }
+        } finally {
+            if (sourceMetastoreClient != null) {
+                sourceMetastoreClient.close();
+            }
+            if (targetMetastoreClient != null) {
+                targetMetastoreClient.close();
+            }
+        }
+    }
+
+    @Override
+    public Properties getAdditionalSystemProperties(final Properties recipeProperties) {
+        Properties additionalProperties = new Properties();
+        String recipeName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
+        // Add recipe name as Hive DR job
+        additionalProperties.put(HiveReplicationRecipeToolOptions.HIVE_DR_JOB_NAME.getName(), recipeName);
+        additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_RUN.getName(),
+                recipeProperties.getProperty(RecipeToolOptions.CLUSTER_NAME.getName()));
+        additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_RUN_WRITE_EP.getName(),
+                recipeProperties.getProperty(RecipeToolOptions.CLUSTER_HDFS_WRITE_ENDPOINT.getName()));
+        if (StringUtils.isNotEmpty(recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName()))) {
+            additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(),
+                    recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName()));
+        }
+        return additionalProperties;
+    }
+
+    private HCatClient getHiveMetaStoreClient(String metastoreUrl, String metastorePrincipal,
+                                              String hive2Principal) throws Exception {
+        try {
+            HiveConf hcatConf = createHiveConf(new Configuration(false), metastoreUrl,
+                    metastorePrincipal, hive2Principal);
+            return HCatClient.create(hcatConf);
+        } catch (IOException e) {
+            throw new Exception("Exception creating HCatClient: " + e.getMessage(), e);
+        }
+    }
+
+    private static HiveConf createHiveConf(Configuration conf, String metastoreUrl, String metastorePrincipal,
+                                           String hive2Principal) throws IOException {
+        HiveConf hcatConf = new HiveConf(conf, HiveConf.class);
+
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        if (StringUtils.isNotEmpty(metastorePrincipal)) {
+            hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metastorePrincipal);
+            hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+            hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, "true");
+        }
+        if (StringUtils.isNotEmpty(hive2Principal)) {
+            hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, hive2Principal);
+            hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos");
+        }
+
+        return hcatConf;
+    }
+
+    private static boolean tableExists(HCatClient client, final String database, final String tableName)
+        throws Exception {
+        try {
+            HCatTable table = client.getTable(database, tableName);
+            return table != null;
+        } catch (ObjectNotFoundException e) {
+            System.out.println(e.getMessage());
+            return false;
+        } catch (HCatException e) {
+            throw new Exception("Exception checking if the table exists:" + e.getMessage(), e);
+        }
+    }
+
+    private static boolean dbExists(HCatClient client, final String database)
+        throws Exception {
+        try {
+            HCatDatabase db = client.getDatabase(database);
+            return db != null;
+        } catch (ObjectNotFoundException e) {
+            System.out.println(e.getMessage());
+            return false;
+        } catch (HCatException e) {
+            throw new Exception("Exception checking if the db exists:" + e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
new file mode 100644
index 0000000..ec0465d
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+/**
+ * Hive Recipe tool options.
+ */
+public enum HiveReplicationRecipeToolOptions {
+    REPLICATION_SOURCE_CLUSTER("sourceCluster", "Replication source cluster name"),
+    REPLICATION_SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri"),
+    REPLICATION_SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"),
+    REPLICATION_SOURCE_DATABASE("sourceDatabase", "List of databases to replicate"),
+    REPLICATION_SOURCE_TABLE("sourceTable", "List of tables to replicate"),
+    REPLICATION_SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging path"),
+    REPLICATION_SOURCE_NN("sourceNN", "Source name node"),
+    REPLICATION_SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name node kerberos principal", false),
+    REPLICATION_SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal",
+            "Source hive metastore kerberos principal", false),
+    REPLICATION_SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal",
+            "Source hiveserver2 kerberos principal", false),
+
+    REPLICATION_TARGET_CLUSTER("targetCluster", "Replication target cluster name"),
+    REPLICATION_TARGET_METASTORE_URI("targetMetastoreUri", "Target Hive metastore uri"),
+    REPLICATION_TARGET_HS2_URI("targetHiveServer2Uri", "Target HS2 uri"),
+    REPLICATION_TARGET_STAGING_PATH("targetStagingPath", "Location of target staging path"),
+    REPLICATION_TARGET_NN("targetNN", "Target name node"),
+    REPLICATION_TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name node kerberos principal", false),
+    REPLICATION_TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal",
+            "Target hive metastore kerberos principal", false),
+    REPLICATION_TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal",
+            "Target hiveserver2 kerberos principal", false),
+
+    REPLICATION_MAX_EVENTS("maxEvents", "Maximum events to replicate"),
+    REPLICATION_MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during replication"),
+    DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp"),
+    REPLICATION_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication"),
+    CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job runs", false),
+    CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
+            "Write EP of cluster on which replication job runs", false),
+    CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false),
+    HIVE_DR_JOB_NAME("drJobName", "Unique hive DR job name", false);
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    HiveReplicationRecipeToolOptions(String name, String description) {
+        this(name, description, true);
+    }
+
+    HiveReplicationRecipeToolOptions(String name, String description, boolean isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/Recipe.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/Recipe.java b/client/src/main/java/org/apache/falcon/recipe/Recipe.java
new file mode 100644
index 0000000..609131d
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/Recipe.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import java.util.Properties;
+
+/**
+ * Recipe interface.
+ */
+public interface Recipe {
+    void validate(final Properties recipeProperties) throws Exception;
+    Properties getAdditionalSystemProperties(final Properties recipeProperties);
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java b/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
new file mode 100644
index 0000000..32b0871
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.falcon.cli.FalconCLI.RecipeOperation;
+
+/**
+ * Recipe factory.
+ */
+public final class RecipeFactory {
+
+    private RecipeFactory() {
+    }
+
+    public static Recipe getRecipeToolType(String recipeType) {
+        if (recipeType == null) {
+            return null;
+        }
+
+        if (RecipeOperation.HDFS_REPLICATION.toString().equalsIgnoreCase(recipeType)) {
+            return new HdfsReplicationRecipeTool();
+        } else if (RecipeOperation.HIVE_DISASTER_RECOVERY.toString().equalsIgnoreCase(recipeType)) {
+            return new HiveReplicationRecipeTool();
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
index 069db9f..243ff4d 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
@@ -19,44 +19,44 @@
 package org.apache.falcon.recipe;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.recipe.util.RecipeProcessBuilderUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.commons.cli.Options;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.InputStream;
-import java.io.IOException;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.OutputStream;
-import java.util.Map;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Base recipe tool for Falcon recipes.
  */
 public class RecipeTool extends Configured implements Tool {
     private static final String HDFS_WF_PATH = "falcon" + File.separator + "recipes" + File.separator;
-    private static final String RECIPE_PREFIX = "falcon.recipe.";
-    private static final Pattern RECIPE_VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##");
-
-    private FileSystem hdfsFileSystem;
+    private static final FsPermission FS_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.READ, FsAction.NONE);
+    private static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS";
+    private static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
 
     public static void main(String[] args) throws Exception {
         ToolRunner.run(new Configuration(), new RecipeTool(), args);
@@ -64,25 +64,38 @@ public class RecipeTool extends Configured implements Tool {
 
     @Override
     public int run(String[] arguments) throws Exception {
+
         Map<RecipeToolArgs, String> argMap = setupArgs(arguments);
         if (argMap == null || argMap.isEmpty()) {
             throw new Exception("Arguments passed to recipe is null");
         }
-
+        Configuration conf = getConf();
         String recipePropertiesFilePath = argMap.get(RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG);
         Properties recipeProperties = loadProperties(recipePropertiesFilePath);
         validateProperties(recipeProperties);
 
-        FileSystem fs = getFileSystemForHdfs(recipeProperties);
+        String recipeOperation = argMap.get(RecipeToolArgs.RECIPE_OPERATION_ARG);
+        Recipe recipeType = RecipeFactory.getRecipeToolType(recipeOperation);
+        if (recipeType != null) {
+            recipeType.validate(recipeProperties);
+            Properties props = recipeType.getAdditionalSystemProperties(recipeProperties);
+            if (props != null && !props.isEmpty()) {
+                recipeProperties.putAll(props);
+            }
+        }
 
+        String processFilename;
+
+        FileSystem fs = getFileSystemForHdfs(recipeProperties, conf);
         validateArtifacts(recipeProperties, fs);
 
-        String recipeName = FilenameUtils.getBaseName(recipePropertiesFilePath);
+        String recipeName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
         copyFilesToHdfsIfRequired(recipeProperties, fs, recipeName);
 
-        Map<String, String> overlayMap = getOverlay(recipeProperties);
-        String processFilename = overlayParametersOverTemplate(argMap.get(RecipeToolArgs.RECIPE_FILE_ARG),
-                argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG), overlayMap);
+        processFilename = RecipeProcessBuilderUtils.createProcessFromTemplate(argMap.get(RecipeToolArgs
+                .RECIPE_FILE_ARG), recipeProperties, argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG));
+
+
         System.out.println("Generated process file to be scheduled: ");
         System.out.println(FileUtils.readFileToString(new File(processFilename)));
 
@@ -98,7 +111,7 @@ public class RecipeTool extends Configured implements Tool {
             addOption(options, arg, arg.isRequired());
         }
 
-        CommandLine cmd =  new GnuParser().parse(options, arguments);
+        CommandLine cmd = new GnuParser().parse(options, arguments);
         for (RecipeToolArgs arg : RecipeToolArgs.values()) {
             String optionValue = arg.getOptionValue(cmd);
             if (StringUtils.isNotEmpty(optionValue)) {
@@ -135,7 +148,7 @@ public class RecipeTool extends Configured implements Tool {
         }
     }
 
-    private static void validateArtifacts(final Properties recipeProperties, final FileSystem fs) throws Exception{
+    private static void validateArtifacts(final Properties recipeProperties, final FileSystem fs) throws Exception {
         // validate the WF path
         String wfPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName());
 
@@ -156,53 +169,6 @@ public class RecipeTool extends Configured implements Tool {
         }
     }
 
-    private static Map<String, String> getOverlay(final Properties recipeProperties) {
-        Map<String, String> overlay = new HashMap<String, String>();
-        for (Map.Entry<Object, Object> entry : recipeProperties.entrySet()) {
-            String key = StringUtils.removeStart((String) entry.getKey(), RECIPE_PREFIX);
-            overlay.put(key, (String) entry.getValue());
-        }
-
-        return overlay;
-    }
-
-    private static String overlayParametersOverTemplate(final String templateFile,
-                                                        final String outFilename,
-                                                        Map<String, String> overlay) throws Exception {
-        if (templateFile == null || outFilename == null || overlay == null || overlay.isEmpty()) {
-            throw new IllegalArgumentException("Invalid arguments passed");
-        }
-
-        String line;
-        OutputStream out = null;
-        BufferedReader reader = null;
-
-        try {
-            out = new FileOutputStream(outFilename);
-
-            reader = new BufferedReader(new FileReader(templateFile));
-            while ((line = reader.readLine()) != null) {
-                Matcher matcher = RECIPE_VAR_PATTERN.matcher(line);
-                while (matcher.find()) {
-                    String variable = line.substring(matcher.start(), matcher.end());
-                    String paramString = overlay.get(variable.substring(2, variable.length() - 2));
-                    if (paramString == null) {
-                        throw new Exception("Match not found for the template: " + variable
-                                + ". Please add it in recipe properties file");
-                    }
-                    line = line.replace(variable, paramString);
-                    matcher = RECIPE_VAR_PATTERN.matcher(line);
-                }
-                out.write(line.getBytes());
-                out.write("\n".getBytes());
-            }
-        } finally {
-            IOUtils.closeQuietly(reader);
-            IOUtils.closeQuietly(out);
-        }
-        return outFilename;
-    }
-
     private static void copyFilesToHdfsIfRequired(final Properties recipeProperties,
                                                   final FileSystem fs,
                                                   final String recipeName) throws Exception {
@@ -262,7 +228,7 @@ public class RecipeTool extends Configured implements Tool {
     private static void createDirOnHdfs(String path, FileSystem fs) throws IOException {
         Path hdfsPath = new Path(path);
         if (!fs.exists(hdfsPath)) {
-            fs.mkdirs(hdfsPath);
+            FileSystem.mkdirs(fs, hdfsPath, FS_PERMISSION);
         }
     }
 
@@ -287,19 +253,33 @@ public class RecipeTool extends Configured implements Tool {
         fs.copyFromLocalFile(false, true, new Path(localFilePath), new Path(hdfsFilePath));
     }
 
-    private static Configuration getConfiguration(final String storageEndpoint) throws Exception {
-        Configuration conf = new Configuration();
-        conf.set("fs.defaultFS", storageEndpoint);
-        return conf;
+    private FileSystem getFileSystemForHdfs(final Properties recipeProperties,
+                                            final Configuration conf) throws Exception {
+        String storageEndpoint = RecipeToolOptions.CLUSTER_HDFS_WRITE_ENDPOINT.getName();
+        String nameNode = recipeProperties.getProperty(storageEndpoint);
+        conf.set(FS_DEFAULT_NAME_KEY, nameNode);
+        if (UserGroupInformation.isSecurityEnabled()) {
+            String nameNodePrincipal = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName());
+            conf.set(NN_PRINCIPAL, nameNodePrincipal);
+        }
+        return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf);
     }
 
-    private FileSystem getFileSystemForHdfs(final Properties recipeProperties) throws Exception {
-        if (hdfsFileSystem == null) {
-            String storageEndpoint = RecipeToolOptions.SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT.getName();
-            hdfsFileSystem =  FileSystem.get(
-                    getConfiguration(recipeProperties.getProperty(storageEndpoint)));
-        }
+    private FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
+                                       final Configuration conf) throws Exception {
+        try {
+            final String proxyUserName = ugi.getShortUserName();
+            if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
+                return FileSystem.get(uri, conf);
+            }
 
-        return hdfsFileSystem;
+            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+                public FileSystem run() throws Exception {
+                    return FileSystem.get(uri, conf);
+                }
+            });
+        } catch (InterruptedException ex) {
+            throw new IOException("Exception creating FileSystem:" + ex.getMessage(), ex);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
index baa4846..79d8f18 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
@@ -25,10 +25,11 @@ import org.apache.commons.cli.Option;
  * Recipe tool args.
  */
 public enum RecipeToolArgs {
-    RECIPE_FILE_ARG("file", "recipe file path"),
+    RECIPE_FILE_ARG("file", "recipe template file path"),
     RECIPE_PROPERTIES_FILE_ARG("propertiesFile", "recipe properties file path"),
     RECIPE_PROCESS_XML_FILE_PATH_ARG(
-            "recipeProcessFilePath", "file path of recipe process to be submitted");
+            "recipeProcessFilePath", "file path of recipe process to be submitted"),
+    RECIPE_OPERATION_ARG("recipeOperation", "recipe operation");
 
     private final String name;
     private final String description;

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
index a1c29cd..5df9b0a 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
@@ -18,19 +18,43 @@
 
 package org.apache.falcon.recipe;
 
+import java.util.Map;
+import java.util.HashMap;
+
 /**
  * Recipe tool options.
  */
 public enum RecipeToolOptions {
-    SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT(
-            "falcon.recipe.src.cluster.hdfs.writeEndPoint", "source cluster HDFS write endpoint"),
+    RECIPE_NAME("falcon.recipe.name", "Recipe name", false),
+    CLUSTER_NAME("falcon.recipe.cluster.name", "Cluster name where replication job should run", false),
+    CLUSTER_HDFS_WRITE_ENDPOINT(
+            "falcon.recipe.cluster.hdfs.writeEndPoint", "Cluster HDFS write endpoint"),
+    CLUSTER_VALIDITY_START("falcon.recipe.cluster.validity.start", "Source cluster validity start", false),
+    CLUSTER_VALIDITY_END("falcon.recipe.cluster.validity.end", "Source cluster validity end", false),
+    WORKFLOW_NAME("falcon.recipe.workflow.name", "Workflow name", false),
     WORKFLOW_PATH("falcon.recipe.workflow.path", "Workflow path", false),
-    WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false);
+    WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false),
+    PROCESS_FREQUENCY("falcon.recipe.process.frequency", "Process frequency", false),
+    RETRY_POLICY("falcon.recipe.retry.policy", "Retry policy", false),
+    RETRY_DELAY("falcon.recipe.retry.delay", "Retry delay", false),
+    RETRY_ATTEMPTS("falcon.recipe.retry.attempts", "Retry attempts", false),
+    RECIPE_TAGS("falcon.recipe.tags", "Recipe tags", false),
+    RECIPE_ACL_OWNER("falcon.recipe.acl.owner", "Recipe acl owner", false),
+    RECIPE_ACL_GROUP("falcon.recipe.acl.group", "Recipe acl group", false),
+    RECIPE_ACL_PERMISSION("falcon.recipe.acl.permission", "Recipe acl permission", false),
+    RECIPE_NN_PRINCIPAL("falcon.recipe.nn.principal", "Recipe DFS NN principal", false);
 
     private final String name;
     private final String description;
     private final boolean isRequired;
 
+    public static final Map<String, RecipeToolOptions> OPTIONSMAP = new HashMap<>();
+    static {
+        for (RecipeToolOptions c : RecipeToolOptions.values()) {
+            OPTIONSMAP.put(c.getName(), c);
+        }
+    }
+
     RecipeToolOptions(String name, String description) {
         this(name, description, true);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java b/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
new file mode 100644
index 0000000..9522816
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.process.ACL;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.PolicyType;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Retry;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.recipe.RecipeToolOptions;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.ValidationEvent;
+import javax.xml.bind.ValidationEventHandler;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Recipe builder utility.
+ */
+public final class RecipeProcessBuilderUtils {
+
+    private static final Pattern RECIPE_VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##");
+
+    private RecipeProcessBuilderUtils() {
+    }
+
+    public static String createProcessFromTemplate(final String processTemplateFile, final Properties recipeProperties,
+                                                   final String processFilename) throws Exception {
+        org.apache.falcon.entity.v0.process.Process process = bindAttributesInTemplate(
+                processTemplateFile, recipeProperties);
+        String recipeProcessFilename = createProcessXmlFile(processFilename, process);
+
+        validateProcessXmlFile(recipeProcessFilename);
+        return recipeProcessFilename;
+    }
+
+    private static org.apache.falcon.entity.v0.process.Process
+    bindAttributesInTemplate(final String templateFile, final Properties recipeProperties)
+        throws Exception {
+        if (templateFile == null || recipeProperties == null) {
+            throw new IllegalArgumentException("Invalid arguments passed");
+        }
+
+        Unmarshaller unmarshaller = EntityType.PROCESS.getUnmarshaller();
+        // Validation can be skipped for unmarshalling as we want to bind tempalte with the properties. Vaildation is
+        // hanles as part of marshalling
+        unmarshaller.setSchema(null);
+        unmarshaller.setEventHandler(new ValidationEventHandler() {
+                public boolean handleEvent(ValidationEvent validationEvent) {
+                    return true;
+                }
+            }
+        );
+
+        URL processResourceUrl = new File(templateFile).toURI().toURL();
+        org.apache.falcon.entity.v0.process.Process process =
+                (org.apache.falcon.entity.v0.process.Process) unmarshaller.unmarshal(processResourceUrl);
+
+        /* For optional properties user might directly set them in the process xml and might not set it in properties
+           file. Before doing the submission validation is done to confirm process xml doesn't have RECIPE_VAR_PATTERN
+        */
+
+        String processName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
+        if (StringUtils.isNotEmpty(processName)) {
+            process.setName(processName);
+        }
+
+        // DR process template has only one cluster
+        bindClusterProperties(process.getClusters().getClusters().get(0), recipeProperties);
+
+        // bind scheduling properties
+        String processFrequency = recipeProperties.getProperty(RecipeToolOptions.PROCESS_FREQUENCY.getName());
+        if (StringUtils.isNotEmpty(processFrequency)) {
+            process.setFrequency(Frequency.fromString(processFrequency));
+        }
+
+        bindWorkflowProperties(process.getWorkflow(), recipeProperties);
+        bindRetryProperties(process.getRetry(), recipeProperties);
+        bindACLProperties(process.getACL(), recipeProperties);
+        bindTagsProperties(process, recipeProperties);
+        bindCustomProperties(process.getProperties(), recipeProperties);
+
+        return process;
+    }
+
+    private static void bindClusterProperties(final Cluster cluster,
+                                              final Properties recipeProperties) {
+        // DR process template has only one cluster
+        String clusterName = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_NAME.getName());
+        if (StringUtils.isNotEmpty(clusterName)) {
+            cluster.setName(clusterName);
+        }
+
+        String clusterStartValidity = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_VALIDITY_START.getName());
+        if (StringUtils.isNotEmpty(clusterStartValidity)) {
+            cluster.getValidity().setStart(SchemaHelper.parseDateUTC(clusterStartValidity));
+        }
+
+        String clusterEndValidity = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_VALIDITY_END.getName());
+        if (StringUtils.isNotEmpty(clusterEndValidity)) {
+            cluster.getValidity().setEnd(SchemaHelper.parseDateUTC(clusterEndValidity));
+        }
+    }
+
+    private static void bindWorkflowProperties(final Workflow wf,
+                                               final Properties recipeProperties) {
+        String wfName = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_NAME.getName());
+        if (StringUtils.isNotEmpty(wfName)) {
+            wf.setName(wfName);
+        }
+
+        String wfLibPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_LIB_PATH.getName());
+        if (StringUtils.isNotEmpty(wfLibPath)) {
+            wf.setLib(wfLibPath);
+        } else if (wf.getLib().startsWith("##")) {
+            wf.setLib("");
+        }
+
+        String wfPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName());
+        if (StringUtils.isNotEmpty(wfPath)) {
+            wf.setPath(wfPath);
+        }
+    }
+
+    private static void bindRetryProperties(final Retry processRetry,
+                                            final Properties recipeProperties) {
+        String retryPolicy = recipeProperties.getProperty(RecipeToolOptions.RETRY_POLICY.getName());
+        if (StringUtils.isNotEmpty(retryPolicy)) {
+            processRetry.setPolicy(PolicyType.fromValue(retryPolicy));
+        }
+
+        String retryAttempts = recipeProperties.getProperty(RecipeToolOptions.RETRY_ATTEMPTS.getName());
+        if (StringUtils.isNotEmpty(retryAttempts)) {
+            processRetry.setAttempts(Integer.parseInt(retryAttempts));
+        }
+
+        String retryDelay = recipeProperties.getProperty(RecipeToolOptions.RETRY_DELAY.getName());
+        if (StringUtils.isNotEmpty(retryDelay)) {
+            processRetry.setDelay(Frequency.fromString(retryDelay));
+        }
+    }
+
+    private static void bindACLProperties(final ACL acl,
+                                          final Properties recipeProperties) {
+        String aclowner = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_OWNER.getName());
+        if (StringUtils.isNotEmpty(aclowner)) {
+            acl.setOwner(aclowner);
+        }
+
+        String aclGroup = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_GROUP.getName());
+        if (StringUtils.isNotEmpty(aclGroup)) {
+            acl.setGroup(aclGroup);
+        }
+
+        String aclPermission = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_PERMISSION.getName());
+        if (StringUtils.isNotEmpty(aclPermission)) {
+            acl.setPermission(aclPermission);
+        }
+    }
+
+    private static void bindTagsProperties(final org.apache.falcon.entity.v0.process.Process process,
+                                           final Properties recipeProperties) {
+        String falconSystemTags = process.getTags();
+        String tags = recipeProperties.getProperty(RecipeToolOptions.RECIPE_TAGS.getName());
+        if (StringUtils.isNotEmpty(tags)) {
+            if (StringUtils.isNotEmpty(falconSystemTags)) {
+                tags += ", " + falconSystemTags;
+            }
+            process.setTags(tags);
+        }
+    }
+
+
+    private static void bindCustomProperties(final org.apache.falcon.entity.v0.process.Properties customProperties,
+                                             final Properties recipeProperties) {
+        List<Property> propertyList = new ArrayList<>();
+
+        for (Map.Entry<Object, Object> recipeProperty : recipeProperties.entrySet()) {
+            if (RecipeToolOptions.OPTIONSMAP.get(recipeProperty.getKey().toString()) == null) {
+                addProperty(propertyList, (String) recipeProperty.getKey(), (String) recipeProperty.getValue());
+            }
+        }
+
+        customProperties.getProperties().addAll(propertyList);
+    }
+
+    private static void addProperty(List<Property> propertyList, String name, String value) {
+        Property prop = new Property();
+        prop.setName(name);
+        prop.setValue(value);
+        propertyList.add(prop);
+    }
+
+    private static String createProcessXmlFile(final String outFilename,
+                                               final Entity entity) throws Exception {
+        if (outFilename == null || entity == null) {
+            throw new IllegalArgumentException("Invalid arguments passed");
+        }
+
+        EntityType type = EntityType.PROCESS;
+        OutputStream out = null;
+        try {
+            out = new FileOutputStream(outFilename);
+            type.getMarshaller().marshal(entity, out);
+        } catch (JAXBException e) {
+            throw new Exception("Unable to serialize the entity object " + type + "/" + entity.getName(), e);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+        return outFilename;
+    }
+
+    private static void validateProcessXmlFile(final String processFileName) throws Exception {
+        if (processFileName == null) {
+            throw new IllegalArgumentException("Invalid arguments passed");
+        }
+
+        String line;
+        BufferedReader reader = null;
+
+        try {
+            reader = new BufferedReader(new FileReader(processFileName));
+            while ((line = reader.readLine()) != null) {
+                Matcher matcher = RECIPE_VAR_PATTERN.matcher(line);
+                if (matcher.find()) {
+                    String variable = line.substring(matcher.start(), matcher.end());
+                    throw new Exception("Match not found for the template: " + variable
+                            + " in recipe template file. Please add it in recipe properties file");
+                }
+            }
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 3dd034b..90d765b 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -31,6 +31,8 @@ It builds and installs the package into the local repository, for use as a depen
 [optionally -Doozie.version=<<oozie version>> can be appended to build with a specific version of Oozie. Oozie versions
 >= 4 are supported]
 NOTE: Falcon builds with JDK 1.7 using -noverify option
+      To compile Falcon with Hive Replication, optionally "-P hadoop-2,hivedr" can be appended. For this Hive >= 1.2.0
+      and Oozie >= 4.2.0 should be available.
 
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index e3de6a4..49fb4f7 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Properties;
 
 /**
@@ -47,10 +48,15 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
 
     private static final String[] LIBS = StartupProperties.get().getProperty("shared.libs").split(",");
 
-    private static final FalconPathFilter NON_FALCON_JAR_FILTER = new FalconPathFilter() {
+    private static class FalconLibPath implements  FalconPathFilter {
+        private String[] shareLibs;
+        FalconLibPath(String[] libList) {
+            this.shareLibs = Arrays.copyOf(libList, libList.length);
+        }
+
         @Override
         public boolean accept(Path path) {
-            for (String jarName : LIBS) {
+            for (String jarName : shareLibs) {
                 if (path.getName().startsWith(jarName)) {
                     return true;
                 }
@@ -60,7 +66,7 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
 
         @Override
         public String getJarName(Path path) {
-            for (String jarName : LIBS) {
+            for (String jarName : shareLibs) {
                 if (path.getName().startsWith(jarName)) {
                     return jarName;
                 }
@@ -84,9 +90,10 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
                     "lib");
             Path libext = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(),
                     "libext");
+            FalconPathFilter nonFalconJarFilter = new FalconLibPath(LIBS);
             Properties properties = StartupProperties.get();
             pushLibsToHDFS(fs, properties.getProperty("system.lib.location"), lib,
-                    NON_FALCON_JAR_FILTER);
+                    nonFalconJarFilter);
             pushLibsToHDFS(fs, properties.getProperty("libext.paths"), libext, null);
             pushLibsToHDFS(fs, properties.getProperty("libext.feed.paths"),
                     new Path(libext, EntityType.FEED.name()) , null);
@@ -107,7 +114,6 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
         if (StringUtils.isEmpty(src)) {
             return;
         }
-
         LOG.debug("Copying libs from {}", src);
         createTargetPath(fs, target);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 34a5471..ec04bdf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,7 @@
 		                <artifactId>hadoop-client</artifactId>
 		                <version>${hadoop.version}</version>
                         <scope>provided</scope>
-		                <exclusions>
+                        <exclusions>
                             <exclusion>
                                 <groupId>com.sun.jersey</groupId>
                                 <artifactId>jersey-server</artifactId>
@@ -151,12 +151,12 @@
                                 <groupId>org.apache.hadoop</groupId>
                                 <artifactId>hadoop-annotations</artifactId>
                             </exclusion>
-		                </exclusions>
-		            </dependency>
-		            <dependency>
-		                <groupId>org.apache.hadoop</groupId>
-		                <artifactId>hadoop-hdfs</artifactId>
-		                <version>${hadoop.version}</version>
+                        </exclusions>
+                    </dependency>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-hdfs</artifactId>
+                        <version>${hadoop.version}</version>
                         <scope>provided</scope>
                         <exclusions>
                             <exclusion>
@@ -166,10 +166,10 @@
                         </exclusions>
                     </dependency>
 
-		            <dependency>
-		                <groupId>org.apache.hadoop</groupId>
-		                <artifactId>hadoop-common</artifactId>
-		                <version>${hadoop.version}</version>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-common</artifactId>
+                        <version>${hadoop.version}</version>
                         <scope>provided</scope>
                         <exclusions>
                             <exclusion>
@@ -179,10 +179,10 @@
                         </exclusions>
                     </dependency>
 
-		            <dependency>
-		                <groupId>org.apache.hadoop</groupId>
-		                <artifactId>hadoop-mapreduce-client-common</artifactId>
-		                <version>${hadoop.version}</version>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-mapreduce-client-common</artifactId>
+                        <version>${hadoop.version}</version>
                         <scope>provided</scope>
                         <exclusions>
                             <exclusion>
@@ -200,18 +200,18 @@
                         <scope>test</scope>
                     </dependency>
 
-		            <dependency>
-		                <groupId>org.apache.hadoop</groupId>
-		                <artifactId>hadoop-common</artifactId>
-		                <version>${hadoop.version}</version>
-		                <classifier>tests</classifier>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-common</artifactId>
+                        <version>${hadoop.version}</version>
+                        <classifier>tests</classifier>
                         <scope>test</scope>
                     </dependency>
 
-		            <dependency>
-		                <groupId>org.apache.hadoop</groupId>
-		                <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-		                <version>${hadoop.version}</version>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+                        <version>${hadoop.version}</version>
                         <scope>provided</scope>
                     </dependency>
 
@@ -241,6 +241,13 @@
                         <version>${hadoop.version}</version>
                         <scope>provided</scope>
                     </dependency>
+
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-mapreduce-client-core</artifactId>
+                        <version>${hadoop.version}</version>
+                        <scope>provided</scope>
+                    </dependency>
                 </dependencies>
           </dependencyManagement>
         </profile>
@@ -255,7 +262,7 @@
                             <descriptors>
                                 <descriptor>src/main/assemblies/distributed-package.xml</descriptor>
                                 <descriptor>src/main/assemblies/src-package.xml</descriptor>
-			    </descriptors>
+			                </descriptors>
                             <finalName>apache-falcon-distributed-${project.version}</finalName>
                         </configuration>
                     </plugin>
@@ -368,11 +375,47 @@
             <properties>
                 <excluded.test.groups/>
             </properties>
-         </profile>
+        </profile>
+        <profile>
+            <id>hivedr</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-enforcer-plugin</artifactId>
+                        <version>1.3.1</version>
+                        <executions>
+                            <execution>
+                                <id>enforce-property</id>
+                                <goals>
+                                    <goal>enforce</goal>
+                                </goals>
+                                <configuration>
+                                    <rules>
+                                        <requireProperty>
+                                            <property>hive.version</property>
+                                            <regex>^(1.2.*)</regex>
+                                            <regexMessage>HiveDR only supports hive version >= 1.2.0</regexMessage>
+                                            <property>oozie.version</property>
+                                            <regex>^(4.2.*)</regex>
+                                            <regexMessage>HiveDR only supports oozie version >= 4.2.0</regexMessage>
+                                        </requireProperty>
+                                    </rules>
+                                    <fail>true</fail>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+            <modules>
+                <module>addons/hivedr</module>
+            </modules>
+        </profile>
     </profiles>
 
     <modules>
-	<module>falcon-ui</module>
+        <module>falcon-ui</module>
         <module>checkstyle</module>
         <module>build-tools</module>
         <module>client</module>
@@ -882,12 +925,15 @@
 
             <!--  this is needed for embedded oozie -->
             <dependency>
-                <groupId>org.apache.hive.hcatalog</groupId>
-                <artifactId>hive-webhcat-java-client</artifactId>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-exec</artifactId>
                 <version>${hive.version}</version>
                 <exclusions>
                     <exclusion>
-                        <!-- This implies you cannot use orc files -->
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-hdfs</artifactId>
+                    </exclusion>
+                    <exclusion>
                         <groupId>com.google.protobuf</groupId>
                         <artifactId>protobuf-java</artifactId>
                     </exclusion>
@@ -907,6 +953,18 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.hive.hcatalog</groupId>
+                <artifactId>hive-webhcat-java-client</artifactId>
+                <version>${hive.version}</version>
+                <exclusions>
+                    <exclusion> <!-- conflict with hadoop-auth -->
+                        <groupId>org.apache.httpcomponents</groupId>
+                        <artifactId>httpclient</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
                 <groupId>com.github.stephenc.findbugs</groupId>
                 <artifactId>findbugs-annotations</artifactId>
                 <version>1.3.9-1</version>
@@ -1034,7 +1092,7 @@
                     <version>2.8.1</version>
                 </plugin>
 
-               <plugin>
+                <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
                     <version>2.16</version>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 8c4d6b4..43b6463 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -26,9 +26,9 @@
         <artifactId>falcon-main</artifactId>
         <version>0.7-SNAPSHOT</version>
     </parent>
-    <artifactId>falcon-replication</artifactId>
-    <description>Apache Falcon Replication Module</description>
-    <name>Apache Falcon Replication</name>
+    <artifactId>falcon-distcp-replication</artifactId>
+    <description>Apache Falcon Distcp Replication Module</description>
+    <name>Apache Falcon Distcp Replication</name>
     <packaging>jar</packaging>
 
     <profiles>
@@ -46,6 +46,10 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-distcp</artifactId>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>


Mime
View raw message