falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [1/2] falcon git commit: FALCON-1373 HiveDR does not work when job is run on destination cluster. Contributed by Balu Vellanki.
Date Tue, 22 Sep 2015 16:39:25 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 75079c2e5 -> 48d0723cc


FALCON-1373 HiveDR does not work when job is run on destination cluster. Contributed by Balu
Vellanki.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/11f20f47
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/11f20f47
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/11f20f47

Branch: refs/heads/master
Commit: 11f20f4734eccc01d63516b035073237bcb79837
Parents: 75079c2
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Tue Sep 22 21:52:41 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Tue Sep 22 21:52:41 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../java/org/apache/falcon/hive/HiveDRArgs.java |  8 +++---
 .../org/apache/falcon/hive/HiveDROptions.java   | 12 ++++++--
 .../java/org/apache/falcon/hive/HiveDRTool.java | 29 ++++++++++++--------
 .../org/apache/falcon/hive/util/EventUtils.java | 27 ++++++++++++++----
 .../hive-disaster-recovery-secure.properties    |  4 ++-
 .../resources/hive-disaster-recovery.properties |  4 ++-
 7 files changed, 61 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6836690..511c4ec 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,8 @@ Trunk (Unreleased)
     FALCON-1403 Revisit IT cleanup and teardown(Narayan Periwal via Pallavi Rao)
 
   BUG FIXES
+    FALCON-1373 HiveDR does not work when job is run on destination cluster(Balu Vellanki
via Ajay Yadava)
+
     FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi
Rao) 
 
     FALCON-1465 Cluster submission fails with java.lang.IllegalArgumentException in distributed
mode(Ajay Yadava via Sowmya Ramesh)

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
index 1ad6a62..574524d 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
@@ -57,9 +57,6 @@ public enum HiveDRArgs {
             "Target hive metastore kerberos principal", false),
     TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal", "Target hiveserver2 kerberos
principal", false),
 
-    CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
-            "Namenode kerberos principal of cluster on which replication job runs", false),
-
     // num events
     MAX_EVENTS("maxEvents", "number of events to process in this run"),
 
@@ -73,7 +70,10 @@ public enum HiveDRArgs {
     JOB_NAME("drJobName", "unique job name"),
 
     CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"),
-    CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "cluster where job runs write
EP"),
+    JOB_CLUSTER_NN("clusterForJobRunWriteEP", "write end point of cluster where job runs"),
+    JOB_CLUSTER_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
+            "Namenode kerberos principal of cluster on which replication job runs", false),
+
 
     FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false),
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
index 026f6e3..28515e4 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
@@ -77,6 +77,14 @@ public class HiveDROptions {
         throw new HiveReplicationException("Source StagingPath cannot be empty");
     }
 
+    public String getSourceWriteEP() {
+        return context.get(HiveDRArgs.SOURCE_NN);
+    }
+
+    public String getSourceNNKerberosPrincipal() {
+        return context.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL);
+    }
+
     public String getTargetWriteEP() {
         return context.get(HiveDRArgs.TARGET_NN);
     }
@@ -120,11 +128,11 @@ public class HiveDROptions {
     }
 
     public String getJobClusterWriteEP() {
-        return context.get(HiveDRArgs.CLUSTER_FOR_JOB_RUN_WRITE_EP);
+        return context.get(HiveDRArgs.JOB_CLUSTER_NN);
     }
 
     public String getJobClusterNNPrincipal() {
-        return context.get(HiveDRArgs.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL);
+        return context.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL);
     }
 
     public void setSourceStagingDir(String path) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
index 712efe8..bebdb0b 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -62,7 +62,8 @@ public class HiveDRTool extends Configured implements Tool {
     private static final String META_PATH_FILE_SUFFIX = ".metapath";
 
     private FileSystem jobFS;
-    private FileSystem targetClusterFs;
+    private FileSystem sourceClusterFS;
+    private FileSystem targetClusterFS;
 
     private HiveDROptions inputOptions;
     private DRStatusStore drStore;
@@ -117,15 +118,18 @@ public class HiveDRTool extends Configured implements Tool {
         inputOptions = parseOptions(args);
         LOG.info("Input Options: {}", inputOptions);
 
+        Configuration sourceConf = FileUtils.getConfiguration(inputOptions.getSourceWriteEP(),
+                inputOptions.getSourceNNKerberosPrincipal());
+        sourceClusterFS = FileSystem.get(sourceConf);
         Configuration targetConf = FileUtils.getConfiguration(inputOptions.getTargetWriteEP(),
                 inputOptions.getTargetNNKerberosPrincipal());
-        targetClusterFs = FileSystem.get(targetConf);
+        targetClusterFS = FileSystem.get(targetConf);
         jobConf = FileUtils.getConfiguration(inputOptions.getJobClusterWriteEP(),
                 inputOptions.getJobClusterNNPrincipal());
         jobFS = FileSystem.get(jobConf);
 
         // init DR status store
-        drStore = new HiveDRStatusStore(targetClusterFs);
+        drStore = new HiveDRStatusStore(targetClusterFS);
         eventSoucerUtil = new EventSourcerUtils(jobConf, inputOptions.shouldKeepHistory(),
inputOptions.getJobName());
     }
 
@@ -219,12 +223,12 @@ public class HiveDRTool extends Configured implements Tool {
         Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath());
         Path targetStagingPath = new Path(inputOptions.getTargetStagingPath());
         LOG.info("Source staging path: {}", sourceStagingPath);
-        if (!FileSystem.mkdirs(jobFS, sourceStagingPath, STAGING_DIR_PERMISSION)) {
+        if (!FileSystem.mkdirs(sourceClusterFS, sourceStagingPath, STAGING_DIR_PERMISSION))
{
             throw new IOException("mkdir failed for " + sourceStagingPath);
         }
 
         LOG.info("Target staging path: {}", targetStagingPath);
-        if (!FileSystem.mkdirs(targetClusterFs, targetStagingPath, STAGING_DIR_PERMISSION))
{
+        if (!FileSystem.mkdirs(targetClusterFS, targetStagingPath, STAGING_DIR_PERMISSION))
{
             throw new IOException("mkdir failed for " + targetStagingPath);
         }
     }
@@ -234,12 +238,12 @@ public class HiveDRTool extends Configured implements Tool {
         Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath());
         Path targetStagingPath = new Path(inputOptions.getTargetStagingPath());
         try {
-            if (jobFS.exists(sourceStagingPath)) {
-                jobFS.delete(sourceStagingPath, true);
+            if (sourceClusterFS.exists(sourceStagingPath)) {
+                sourceClusterFS.delete(sourceStagingPath, true);
             }
 
-            if (targetClusterFs.exists(targetStagingPath)) {
-                targetClusterFs.delete(targetStagingPath, true);
+            if (targetClusterFS.exists(targetStagingPath)) {
+                targetClusterFS.delete(targetStagingPath, true);
             }
         } catch (IOException e) {
             LOG.error("Unable to cleanup staging dir:", e);
@@ -320,8 +324,11 @@ public class HiveDRTool extends Configured implements Tool {
             if (jobFS != null) {
                 jobFS.close();
             }
-            if (targetClusterFs != null) {
-                targetClusterFs.close();
+            if (targetClusterFS != null) {
+                targetClusterFS.close();
+            }
+            if (sourceClusterFS != null) {
+                sourceClusterFS.close();
             }
         } catch (IOException e) {
             LOG.error("Closing FS failed", e);

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
index 0b4200c..f8397ff 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -61,16 +61,19 @@ public class EventUtils {
     private String sourceDatabase = null;
     private String sourceNN = null;
     private String sourceNNKerberosPrincipal = null;
+    private String jobNN = null;
+    private String jobNNKerberosPrincipal = null;
     private String targetHiveServer2Uri = null;
     private String targetStagingPath = null;
     private String targetNN = null;
     private String targetNNKerberosPrincipal = null;
-    private String targetStagingUri = null;
+    private String fullyQualifiedTargetStagingPath = null;
     private List<Path> sourceCleanUpList = null;
     private List<Path> targetCleanUpList = null;
     private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class);
 
     private FileSystem sourceFileSystem = null;
+    private FileSystem jobFileSystem = null;
     private FileSystem targetFileSystem = null;
     private Connection sourceConnection = null;
     private Connection targetConnection = null;
@@ -85,6 +88,8 @@ public class EventUtils {
         sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
         sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
         sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
+        jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName());
+        jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName());
         targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
         targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName())
                 + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName());
@@ -128,14 +133,15 @@ public class EventUtils {
 
     public void initializeFS() throws IOException {
         LOG.info("Initializing staging directory");
-        targetStagingUri = new Path(targetNN, targetStagingPath).toString();
+        fullyQualifiedTargetStagingPath = new Path(targetNN, targetStagingPath).toString();
         sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, sourceNNKerberosPrincipal));
+        jobFileSystem = FileSystem.get(FileUtils.getConfiguration(jobNN, jobNNKerberosPrincipal));
         targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, targetNNKerberosPrincipal));
     }
 
     private String readEvents(Path eventFileName) throws IOException {
         StringBuilder eventString = new StringBuilder();
-        BufferedReader in = new BufferedReader(new InputStreamReader(sourceFileSystem.open(eventFileName)));
+        BufferedReader in = new BufferedReader(new InputStreamReader(jobFileSystem.open(eventFileName)));
         try {
             String line;
             while ((line=in.readLine())!=null) {
@@ -302,16 +308,25 @@ public class EventUtils {
         DistCpOptions options = getDistCpOptions(srcStagingPaths);
         DistCp distCp = new DistCp(conf, options);
         LOG.info("Started DistCp with source Path: {} \ttarget path: {}", StringUtils.join(srcStagingPaths.toArray()),
-                targetStagingUri);
+                fullyQualifiedTargetStagingPath);
         Job distcpJob = distCp.execute();
         LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
         LOG.info("Completed DistCp");
     }
 
     public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) {
-        srcStagingPaths.toArray(new Path[srcStagingPaths.size()]);
+        /*
+         * Add the fully qualified sourceNameNode to srcStagingPath uris. This will
+         * ensure DistCp will succeed when the job is run on target cluster.
+         */
+        List<Path> fullyQualifiedSrcStagingPaths = new ArrayList<Path>();
+        for (Path srcPath : srcStagingPaths) {
+            fullyQualifiedSrcStagingPaths.add(new Path(sourceNN, srcPath.toString()));
+        }
+        fullyQualifiedSrcStagingPaths.toArray(new Path[fullyQualifiedSrcStagingPaths.size()]);
 
-        DistCpOptions distcpOptions = new DistCpOptions(srcStagingPaths, new Path(targetStagingUri));
+        DistCpOptions distcpOptions = new DistCpOptions(fullyQualifiedSrcStagingPaths,
+                new Path(fullyQualifiedTargetStagingPath));
         /* setSyncFolder to false to retain dir structure as in source at the target. If
set to true all files will be
         copied to the same staging sir at target resulting in DuplicateFileException in DistCp.
         */

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
index b2d670a..cc70ac4 100644
--- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
@@ -72,6 +72,7 @@ sourceDatabase=default
 # For DB level replication specify * for sourceTable.
 # For table level replication to replicate multiple tables specify comma separated list of
tables
 sourceTable=testtable_dr
+## Please specify staging dir in the source without fully qualified domain name.
 sourceStagingPath=/apps/hive/tools/dr
 sourceNN=hdfs://localhost:8020
 # Specify kerberos principal required to access source namenode and hive servers, optional
on non-secure cluster.
@@ -83,6 +84,7 @@ sourceHive2KerberosPrincipal=hive/_HOST@EXAMPLE.COM
 targetCluster=backupCluster
 targetMetastoreUri=thrift://localhost:9083
 targetHiveServer2Uri=hive2://localhost:10000
+## Please specify staging dir in the target without fully qualified domain name.
 targetStagingPath=/apps/hive/tools/dr
 targetNN=hdfs://localhost:8020
 # Specify kerberos principal required to access target namenode and hive servers, optional
on non-secure cluster.
@@ -101,4 +103,4 @@ distcpMaxMaps=1
 distcpMapBandwidth=100
 
 ##### Email on failure
-drNotificationReceivers=NA
\ No newline at end of file
+drNotificationReceivers=NA

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
index 42ae30b..c2cbf5d 100644
--- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
@@ -70,6 +70,7 @@ sourceDatabase=default
 # For DB level replication specify * for sourceTable.
 # For table level replication to replicate multiple tables specify comma separated list of
tables
 sourceTable=testtable_dr
+## Please specify staging dir in the source without fully qualified domain name.
 sourceStagingPath=/apps/hive/tools/dr
 sourceNN=hdfs://localhost:8020
 
@@ -77,6 +78,7 @@ sourceNN=hdfs://localhost:8020
 targetCluster=backupCluster
 targetMetastoreUri=thrift://localhost:9083
 targetHiveServer2Uri=hive2://localhost:10000
+## Please specify staging dir in the target without fully qualified domain name.
 targetStagingPath=/apps/hive/tools/dr
 targetNN=hdfs://localhost:8020
 
@@ -91,4 +93,4 @@ distcpMaxMaps=1
 distcpMapBandwidth=100
 
 ##### Email on failure
-drNotificationReceivers=NA
\ No newline at end of file
+drNotificationReceivers=NA


Mime
View raw message