kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject incubator-kylin git commit: KYLIN-1021 Upload dependent jars of kylin to HDFS and set tmpjars
Date Tue, 20 Oct 2015 07:59:13 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 473180fca -> f4af90e7a


KYLIN-1021 Upload dependent jars of kylin to HDFS and set tmpjars

Signed-off-by: Li, Yang <yangli9@ebay.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f4af90e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f4af90e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f4af90e7

Branch: refs/heads/2.x-staging
Commit: f4af90e7af806d99ba0296902b50701987f14bcb
Parents: 473180f
Author: Feng Yu <hzfengyu@corp.netease.com>
Authored: Tue Oct 20 15:58:54 2015 +0800
Committer: Li, Yang <yangli9@ebay.com>
Committed: Tue Oct 20 15:58:54 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  6 +-
 .../engine/mr/common/AbstractHadoopJob.java     | 66 +++++++++++++++++++-
 2 files changed, 69 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f4af90e7/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index cd19f4c..5f8fb07 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -315,6 +315,10 @@ public class KylinConfig implements Serializable {
         return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
     }
 
+    public String getKylinJobMRLibDir() {
+        return getOptional("kylin.job.mr.lib.dir", "");
+    }
+
     public String getKylinSparkJobJarPath() {
         final String jobJar = getOptional(KYLIN_JOB_JAR_SPARK);
         if (StringUtils.isNotEmpty(jobJar)) {
@@ -508,7 +512,7 @@ public class KylinConfig implements Serializable {
     }
 
     public String getHbaseDefaultCompressionCodec() {
-        return getOptional(HTABLE_DEFAULT_COMPRESSION_CODEC,"");
+        return getOptional(HTABLE_DEFAULT_COMPRESSION_CODEC, "");
     }
 
     public boolean isHiveKeepFlatTable() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f4af90e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index cf492ef..eb7951a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -37,6 +37,7 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -181,6 +182,66 @@ public abstract class AbstractHadoopJob extends Configured implements
Tool {
 
         jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
         logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+
+        // set extra dependencies as tmpjars & tmpfiles if configured
+        setJobTmpJarsAndFiles(job);
+    }
+
+    private void setJobTmpJarsAndFiles(Job job) {
+        String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
+        if (StringUtils.isBlank(mrLibDir))
+            return;
+
+        try {
+            Configuration jobConf = job.getConfiguration();
+            FileSystem fs = FileSystem.get(new Configuration(jobConf));
+            FileStatus[] fList = fs.listStatus(new Path(mrLibDir));
+
+            StringBuilder jarList = new StringBuilder();
+            StringBuilder fileList = new StringBuilder();
+
+            for (FileStatus file : fList) {
+                Path p = file.getPath();
+                StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
+                if (list.length() > 0)
+                    list.append(",");
+                list.append(mrLibDir + "/" + file.getPath().getName());
+            }
+
+            appendTmpFiles(fileList.toString(), jobConf);
+            appendTmpJars(jarList.toString(), jobConf);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void appendTmpJars(String jarList, Configuration conf) {
+        if (StringUtils.isBlank(jarList))
+            return;
+
+        String tmpJars = conf.get("tmpjars", null);
+        if (tmpJars == null) {
+            tmpJars = jarList;
+        } else {
+            tmpJars += "," + jarList;
+        }
+        conf.set("tmpjars", tmpJars);
+        logger.info("Job 'tmpjars' updated -- " + tmpJars);
+    }
+
+    private void appendTmpFiles(String fileList, Configuration conf) {
+        if (StringUtils.isBlank(fileList))
+            return;
+
+        String tmpFiles = conf.get("tmpfiles", null);
+        if (tmpFiles == null) {
+            tmpFiles = fileList;
+        } else {
+            tmpFiles += "," + fileList;
+        }
+        conf.set("tmpfiles", tmpFiles);
+        logger.info("Job 'tmpfiles' updated -- " + tmpFiles);
     }
 
     private String getDefaultMapRedClasspath() {
@@ -200,7 +261,7 @@ public abstract class AbstractHadoopJob extends Configured implements
Tool {
     public static void addInputDirs(String input, Job job) throws IOException {
         addInputDirs(StringSplitter.split(input, ","), job);
     }
-    
+
     public static void addInputDirs(String[] inputs, Job job) throws IOException {
         for (String inp : inputs) {
             inp = inp.trim();
@@ -283,7 +344,8 @@ public abstract class AbstractHadoopJob extends Configured implements
Tool {
         else
             hdfsMetaDir = "file:///" + hdfsMetaDir;
         logger.info("HDFS meta dir is: " + hdfsMetaDir);
-        conf.set("tmpfiles", hdfsMetaDir);
+
+        appendTmpFiles(hdfsMetaDir, conf);
     }
 
     protected void cleanupTempConfFile(Configuration conf) {


Mime
View raw message