falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-2017 Fix HiveDR extension issues
Date Tue, 07 Jun 2016 21:06:49 GMT
Repository: falcon
Updated Branches:
  refs/heads/0.10 377452395 -> 7d7b1fb16


FALCON-2017 Fix HiveDR extension issues

Author: Sowmya Ramesh <sramesh@hortonworks.com>

Reviewers: "Venkat Ranganathan <venkat@hortonworks.com>, Balu Vellanki <balu@apache.org>"

Closes #175 from sowmyaramesh/FALCON-2017

(cherry picked from commit ee644dd2a6639085b469f8fae96580671f4d0fe9)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7d7b1fb1
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7d7b1fb1
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7d7b1fb1

Branch: refs/heads/0.10
Commit: 7d7b1fb16492f8d3d2cf8bd4b5e1642622738345
Parents: 3774523
Author: Sowmya Ramesh <sramesh@hortonworks.com>
Authored: Tue Jun 7 14:06:52 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Tue Jun 7 14:07:05 2016 -0700

----------------------------------------------------------------------
 .../runtime/hive-mirroring-secure-workflow.xml  | 48 ++++++++++----------
 .../runtime/hive-mirroring-workflow.xml         | 48 ++++++++++----------
 .../java/org/apache/falcon/hive/HiveDRArgs.java |  7 +--
 .../org/apache/falcon/hive/HiveDROptions.java   | 37 ++++++++++-----
 .../java/org/apache/falcon/hive/HiveDRTool.java |  7 +++
 .../falcon/hive/mapreduce/CopyMapper.java       |  2 +-
 .../org/apache/falcon/hive/util/EventUtils.java | 23 ++++------
 .../org/apache/falcon/hive/util/FileUtils.java  |  4 +-
 .../java/org/apache/falcon/hive/DRTest.java     |  4 +-
 .../mirroring/hive/HiveMirroringExtension.java  | 17 +++++++
 .../hive/HiveMirroringExtensionProperties.java  |  5 +-
 11 files changed, 120 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
index 4bf048f..63e9a67 100644
--- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
+++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml
@@ -96,18 +96,16 @@
             <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-falconLibPath</arg>
-            <arg>${wf:conf("falcon.libpath")}</arg>
             <arg>-sourceCluster</arg>
             <arg>${sourceCluster}</arg>
             <arg>-sourceMetastoreUri</arg>
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
-            <arg>-sourceDatabase</arg>
-            <arg>${sourceDatabase}</arg>
-            <arg>-sourceTable</arg>
-            <arg>${sourceTable}</arg>
+            <arg>-sourceDatabases</arg>
+            <arg>${sourceDatabases}</arg>
+            <arg>-sourceTables</arg>
+            <arg>${sourceTables}</arg>
             <arg>-sourceStagingPath</arg>
             <arg>${sourceStagingPath}</arg>
             <arg>-sourceNN</arg>
@@ -144,8 +142,10 @@
             <arg>${clusterForJobNNKerberosPrincipal}</arg>
             <arg>-tdeEncryptionEnabled</arg>
             <arg>${tdeEncryptionEnabled}</arg>
-            <arg>-jobName</arg>
-            <arg>${jobName}-${nominalTime}</arg>
+            <arg>-hiveJobName</arg>
+            <arg>${hiveJobName}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
             <arg>-executionStage</arg>
             <arg>lastevents</arg>
         </java>
@@ -190,8 +190,6 @@
             <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-falconLibPath</arg>
-            <arg>${wf:conf("falcon.libpath")}</arg>
             <arg>-replicationMaxMaps</arg>
             <arg>${replicationMaxMaps}</arg>
             <arg>-distcpMaxMaps</arg>
@@ -202,10 +200,10 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
-            <arg>-sourceDatabase</arg>
-            <arg>${sourceDatabase}</arg>
-            <arg>-sourceTable</arg>
-            <arg>${sourceTable}</arg>
+            <arg>-sourceDatabases</arg>
+            <arg>${sourceDatabases}</arg>
+            <arg>-sourceTables</arg>
+            <arg>${sourceTables}</arg>
             <arg>-sourceStagingPath</arg>
             <arg>${sourceStagingPath}</arg>
             <arg>-sourceNN</arg>
@@ -244,8 +242,10 @@
             <arg>${clusterForJobNNKerberosPrincipal}</arg>
             <arg>-tdeEncryptionEnabled</arg>
             <arg>${tdeEncryptionEnabled}</arg>
-            <arg>-jobName</arg>
-            <arg>${jobName}-${nominalTime}</arg>
+            <arg>-hiveJobName</arg>
+            <arg>${hiveJobName}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
             <arg>-executionStage</arg>
             <arg>export</arg>
             <arg>-counterLogDir</arg>
@@ -292,8 +292,6 @@
             <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-falconLibPath</arg>
-            <arg>${wf:conf("falcon.libpath")}</arg>
             <arg>-replicationMaxMaps</arg>
             <arg>${replicationMaxMaps}</arg>
             <arg>-distcpMaxMaps</arg>
@@ -304,10 +302,10 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
-            <arg>-sourceDatabase</arg>
-            <arg>${sourceDatabase}</arg>
-            <arg>-sourceTable</arg>
-            <arg>${sourceTable}</arg>
+            <arg>-sourceDatabases</arg>
+            <arg>${sourceDatabases}</arg>
+            <arg>-sourceTables</arg>
+            <arg>${sourceTables}</arg>
             <arg>-sourceStagingPath</arg>
             <arg>${sourceStagingPath}</arg>
             <arg>-sourceNN</arg>
@@ -346,8 +344,10 @@
             <arg>${clusterForJobNNKerberosPrincipal}</arg>
             <arg>-tdeEncryptionEnabled</arg>
             <arg>${tdeEncryptionEnabled}</arg>
-            <arg>-jobName</arg>
-            <arg>${jobName}-${nominalTime}</arg>
+            <arg>-hiveJobName</arg>
+            <arg>${hiveJobName}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
             <arg>-executionStage</arg>
             <arg>import</arg>
         </java>

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
index 9f9bf92..4f6eec5 100644
--- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
+++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml
@@ -46,18 +46,16 @@
             <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-falconLibPath</arg>
-            <arg>${wf:conf("falcon.libpath")}</arg>
             <arg>-sourceCluster</arg>
             <arg>${sourceCluster}</arg>
             <arg>-sourceMetastoreUri</arg>
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
-            <arg>-sourceDatabase</arg>
-            <arg>${sourceDatabase}</arg>
-            <arg>-sourceTable</arg>
-            <arg>${sourceTable}</arg>
+            <arg>-sourceDatabases</arg>
+            <arg>${sourceDatabases}</arg>
+            <arg>-sourceTables</arg>
+            <arg>${sourceTables}</arg>
             <arg>-sourceStagingPath</arg>
             <arg>${sourceStagingPath}</arg>
             <arg>-sourceNN</arg>
@@ -80,8 +78,10 @@
             <arg>${clusterForJobRunWriteEP}</arg>
             <arg>-tdeEncryptionEnabled</arg>
             <arg>${tdeEncryptionEnabled}</arg>
-            <arg>-jobName</arg>
-            <arg>${jobName}-${nominalTime}</arg>
+            <arg>-hiveJobName</arg>
+            <arg>${hiveJobName}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
             <arg>-executionStage</arg>
             <arg>lastevents</arg>
         </java>
@@ -118,8 +118,6 @@
             <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-falconLibPath</arg>
-            <arg>${wf:conf("falcon.libpath")}</arg>
             <arg>-replicationMaxMaps</arg>
             <arg>${replicationMaxMaps}</arg>
             <arg>-distcpMaxMaps</arg>
@@ -130,10 +128,10 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
-            <arg>-sourceDatabase</arg>
-            <arg>${sourceDatabase}</arg>
-            <arg>-sourceTable</arg>
-            <arg>${sourceTable}</arg>
+            <arg>-sourceDatabases</arg>
+            <arg>${sourceDatabases}</arg>
+            <arg>-sourceTables</arg>
+            <arg>${sourceTables}</arg>
             <arg>-sourceStagingPath</arg>
             <arg>${sourceStagingPath}</arg>
             <arg>-sourceNN</arg>
@@ -158,8 +156,10 @@
             <arg>${clusterForJobRunWriteEP}</arg>
             <arg>-tdeEncryptionEnabled</arg>
             <arg>${tdeEncryptionEnabled}</arg>
-            <arg>-jobName</arg>
-            <arg>${jobName}-${nominalTime}</arg>
+            <arg>-hiveJobName</arg>
+            <arg>${hiveJobName}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
             <arg>-executionStage</arg>
             <arg>export</arg>
             <arg>-counterLogDir</arg>
@@ -198,8 +198,6 @@
             <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-falconLibPath</arg>
-            <arg>${wf:conf("falcon.libpath")}</arg>
             <arg>-replicationMaxMaps</arg>
             <arg>${replicationMaxMaps}</arg>
             <arg>-distcpMaxMaps</arg>
@@ -210,10 +208,10 @@
             <arg>${sourceMetastoreUri}</arg>
             <arg>-sourceHiveServer2Uri</arg>
             <arg>${sourceHiveServer2Uri}</arg>
-            <arg>-sourceDatabase</arg>
-            <arg>${sourceDatabase}</arg>
-            <arg>-sourceTable</arg>
-            <arg>${sourceTable}</arg>
+            <arg>-sourceDatabases</arg>
+            <arg>${sourceDatabases}</arg>
+            <arg>-sourceTables</arg>
+            <arg>${sourceTables}</arg>
             <arg>-sourceStagingPath</arg>
             <arg>${sourceStagingPath}</arg>
             <arg>-sourceNN</arg>
@@ -238,8 +236,10 @@
             <arg>${clusterForJobRunWriteEP}</arg>
             <arg>-tdeEncryptionEnabled</arg>
             <arg>${tdeEncryptionEnabled}</arg>
-            <arg>-jobName</arg>
-            <arg>${jobName}-${nominalTime}</arg>
+            <arg>-hiveJobName</arg>
+            <arg>${hiveJobName}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
             <arg>-executionStage</arg>
             <arg>import</arg>
         </java>

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
index 71b9043..d891487 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
@@ -30,8 +30,9 @@ public enum HiveDRArgs {
     SOURCE_CLUSTER("sourceCluster", "source cluster"),
     SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"),
     SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"),
-    SOURCE_DATABASE("sourceDatabase", "comma source databases"),
-    SOURCE_TABLE("sourceTable", "comma source tables"),
+    SOURCE_DATABASES("sourceDatabases", "comma source databases"),
+    SOURCE_DATABASE("sourceDatabase", "First source database"),
+    SOURCE_TABLES("sourceTables", "comma source tables"),
     SOURCE_STAGING_PATH("sourceStagingPath", "source staging path for data", false),
 
     // source hadoop endpoints
@@ -70,7 +71,7 @@ public enum HiveDRArgs {
     // Map Bandwidth
     DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false),
 
-    JOB_NAME("jobName", "unique job name"),
+    JOB_NAME("hiveJobName", "unique job name"),
 
     CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"),
     JOB_CLUSTER_NN("clusterForJobRunWriteEP", "write end point of cluster where job runs"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
index 0096727..215be35 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
@@ -63,21 +63,29 @@ public class HiveDROptions {
     }
 
     public List<String> getSourceDatabases() {
-        return Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASE).trim().split(","));
+        return Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASES).trim().split(","));
     }
 
     public List<String> getSourceTables() {
-        return Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLE).trim().split(","));
+        return Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLES).trim().split(","));
     }
 
     public String getSourceStagingPath() {
+        return context.get(HiveDRArgs.SOURCE_STAGING_PATH);
+    }
+
+
+    public void setSourceStagingPath() {
         String stagingPath = context.get(HiveDRArgs.SOURCE_STAGING_PATH);
-        if (StringUtils.isNotBlank(stagingPath)) {
-            stagingPath = StringUtils.removeEnd(stagingPath, File.separator);
-            return stagingPath + File.separator + getJobName();
+        String srcStagingPath;
+        if ("NA".equalsIgnoreCase(stagingPath)) {
+            stagingPath = StringUtils.removeEnd(FileUtils.DEFAULT_EVENT_STORE_PATH, File.separator);
+            srcStagingPath = stagingPath + File.separator + getJobName();
         } else {
-            return FileUtils.DEFAULT_EVENT_STORE_PATH + getJobName();
+            stagingPath = StringUtils.removeEnd(stagingPath, File.separator);
+            srcStagingPath = stagingPath + File.separator + getJobName();
         }
+        context.put(HiveDRArgs.SOURCE_STAGING_PATH, srcStagingPath);
     }
 
     public String getSourceWriteEP() {
@@ -109,13 +117,20 @@ public class HiveDROptions {
     }
 
     public String getTargetStagingPath() {
+        return context.get(HiveDRArgs.TARGET_STAGING_PATH);
+    }
+
+    public void setTargetStagingPath() {
         String stagingPath = context.get(HiveDRArgs.TARGET_STAGING_PATH);
-        if (StringUtils.isNotBlank(stagingPath)) {
-            stagingPath = StringUtils.removeEnd(stagingPath, File.separator);
-            return stagingPath + File.separator + getJobName();
+        String targetStagingPath;
+        if ("NA".equalsIgnoreCase(stagingPath)) {
+            stagingPath = StringUtils.removeEnd(FileUtils.DEFAULT_EVENT_STORE_PATH, File.separator);
+            targetStagingPath = stagingPath + File.separator + getJobName();
         } else {
-            return FileUtils.DEFAULT_EVENT_STORE_PATH + getJobName();
+            stagingPath = StringUtils.removeEnd(stagingPath, File.separator);
+            targetStagingPath = stagingPath + File.separator + getJobName();
         }
+        context.put(HiveDRArgs.TARGET_STAGING_PATH, targetStagingPath);
     }
 
     public String getReplicationMaxMaps() {
@@ -151,7 +166,7 @@ public class HiveDROptions {
     }
 
     public static HiveDROptions create(String[] args) throws ParseException {
-        Map<HiveDRArgs, String> options = new HashMap<HiveDRArgs, String>();
+        Map<HiveDRArgs, String> options = new HashMap<>();
 
         CommandLine cmd = getCommand(args);
         for (HiveDRArgs arg : HiveDRArgs.values()) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
index 17eec22..e45b0d8 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -136,6 +136,13 @@ public class HiveDRTool extends Configured implements Tool {
         inputOptions = parseOptions(args);
         LOG.info("Input Options: {}", inputOptions);
 
+        // Update the source staging path
+        inputOptions.setSourceStagingPath();
+        inputOptions.setTargetStagingPath();
+
+        LOG.info("srcStaginPath: {}", inputOptions.getSourceStagingPath());
+        LOG.info("tgtStaginPath: {}", inputOptions.getTargetStagingPath());
+
         Configuration sourceConf = FileUtils.getConfiguration(inputOptions.getSourceWriteEP(),
                 inputOptions.getSourceNNKerberosPrincipal());
         sourceClusterFS = FileSystem.get(sourceConf);

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
index 08e0551..e2297ef 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -75,7 +75,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text>
{
         // In case of export stage, populate custom counters
         if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
                 .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())
-                && !eventUtils.isCountersMapEmtpy()) {
+                && !eventUtils.isCountersMapEmpty()) {
             context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment(
                     eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName()));
             context.getCounter(ReplicationJobCountersList.COPY).increment(

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
index 3b088f7..590a7e3 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.sql.Connection;
@@ -95,17 +94,15 @@ public class EventUtils {
         sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
         sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
         sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
-        sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName())
-                + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName());
+        sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName());
         jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName());
         jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName());
         targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
-        targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName())
-                + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName());
+        targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName());
         targetNN = conf.get(HiveDRArgs.TARGET_NN.getName());
         targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName());
-        sourceCleanUpList = new ArrayList<Path>();
-        targetCleanUpList = new ArrayList<Path>();
+        sourceCleanUpList = new ArrayList<>();
+        targetCleanUpList = new ArrayList<>();
         countersMap = new HashMap<>();
     }
 
@@ -169,7 +166,7 @@ public class EventUtils {
     }
 
     public void processEvents(String event) throws Exception {
-        listReplicationStatus = new ArrayList<ReplicationStatus>();
+        listReplicationStatus = new ArrayList<>();
         String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM);
         String dbName = new String(Base64.decodeBase64(eventSplit[0]), "UTF-8");
         String tableName = new String(Base64.decodeBase64(eventSplit[1]), "UTF-8");
@@ -203,7 +200,7 @@ public class EventUtils {
                                  List<Path> cleanUpList, boolean isImportStatements)
         throws SQLException, HiveReplicationException, IOException {
         String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM);
-        List<Command> deserializeCommand = new ArrayList<Command>();
+        List<Command> deserializeCommand = new ArrayList<>();
         for (String command : commandList) {
             Command cmd = ReplicationUtils.deserializeCommand(command);
             deserializeCommand.add(cmd);
@@ -269,7 +266,7 @@ public class EventUtils {
     }
 
     private static List<Path> getCleanUpPaths(List<String> cleanupLocations)
{
-        List<Path> cleanupLocationPaths = new ArrayList<Path>();
+        List<Path> cleanupLocationPaths = new ArrayList<>();
         for (String cleanupLocation : cleanupLocations) {
             cleanupLocationPaths.add(new Path(cleanupLocation));
         }
@@ -330,7 +327,7 @@ public class EventUtils {
 
     public DistCpOptions getDistCpOptions() {
         // DistCpOptions expects the first argument to be a file OR a list of Paths
-        List<Path> sourceUris=new ArrayList<Path>();
+        List<Path> sourceUris=new ArrayList<>();
         sourceUris.add(new Path(sourceStagingUri));
         DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri));
 
@@ -350,8 +347,8 @@ public class EventUtils {
         return countersMap.get(counterKey);
     }
 
-    public boolean isCountersMapEmtpy() {
-        return countersMap.size() == 0 ? true : false;
+    public boolean isCountersMapEmpty() {
+        return countersMap.size() == 0;
     }
 
     public void cleanEventsDirectory() throws IOException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
index 001d10a..ce80586 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
@@ -33,8 +33,8 @@ import java.io.IOException;
  */
 public final class FileUtils {
 
-    public static final String DEFAULT_EVENT_STORE_PATH = DRStatusStore.BASE_DEFAULT_STORE_PATH
-            + File.separator + "Events";
+    public static final String DEFAULT_EVENT_STORE_PATH = StringUtils.removeEnd(DRStatusStore
+            .BASE_DEFAULT_STORE_PATH,  File.separator) + File.separator + "Events";
     public static final FsPermission FS_PERMISSION_700 = new FsPermission(FsAction.ALL, FsAction.NONE,
FsAction.NONE);
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
index 1f44b62..a9c5661 100644
--- a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
@@ -25,8 +25,8 @@ public class DRTest {
     public void testHiveDr(String[] args) {
         String[] testArgs = {
             "-sourceMetastoreUri", "thrift://localhost:9083",
-            "-sourceDatabase", "default",
-            "-sourceTable", "test",
+            "-sourceDatabases", "default",
+            "-sourceTables", "test",
             "-sourceStagingPath", "/apps/hive/tools/dr",
             "-sourceNN", "hdfs://localhost:8020",
             "-sourceRM", "local",

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
index 949aea5..75759df 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
@@ -39,6 +39,7 @@ public class HiveMirroringExtension extends AbstractExtension {
     private static final String ALL_TABLES = "*";
     private static final String COMMA_DELIMITER = ",";
     private static final String SECURE_RESOURCE = "-secure";
+    private static final String NOT_APPLICABLE = "NA";
 
     @Override
     public String getName() {
@@ -122,6 +123,12 @@ public class HiveMirroringExtension extends AbstractExtension {
         additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(),
                 jobName);
 
+        // Get the first source DB
+        additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_DATABASE.getName(),
+                extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_DATABASES
+                .getName()).trim().split(",")[0]
+        );
+
         String clusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName());
         // Add required properties of cluster where job should run
         additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN.getName(),
@@ -230,6 +237,16 @@ public class HiveMirroringExtension extends AbstractExtension {
             additionalProperties.put(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(),
"false");
         }
 
+        if (StringUtils.isBlank(
+                extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName())))
{
+            additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName(),
NOT_APPLICABLE);
+        }
+
+        if (StringUtils.isBlank(
+                extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName())))
{
+            additionalProperties.put(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName(),
NOT_APPLICABLE);
+        }
+
         return additionalProperties;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d7b1fb1/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
index 6c4f58d..7e80712 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
@@ -27,6 +27,7 @@ public enum HiveMirroringExtensionProperties {
     SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri", false),
     SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"),
     SOURCE_DATABASES("sourceDatabases", "List of databases to replicate"),
+    SOURCE_DATABASE("sourceDatabase", "Database to verify the setup connection", false),
     SOURCE_TABLES("sourceTables", "List of tables to replicate", false),
     SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging path", false),
     SOURCE_NN("sourceNN", "Source name node", false),
@@ -50,13 +51,13 @@ public enum HiveMirroringExtensionProperties {
     MAX_EVENTS("maxEvents", "Maximum events to replicate", false),
     MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during replication", false),
     DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false),
-    MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during
replication"),
+    MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during
replication", false),
     CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job runs", false),
     CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("Job cluster kerberos principal",
             "Write EP of cluster on which replication job runs", false),
     CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which
replication job runs", false),
     TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled",
false),
-    HIVE_MIRRORING_JOB_NAME("jobName", "Unique hive replication job name", false);
+    HIVE_MIRRORING_JOB_NAME("hiveJobName", "Unique hive replication job name", false);
 
     private final String name;
     private final String description;


Mime
View raw message