kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [08/48] kylin git commit: KYLIN-2811, refine spark cubing
Date Fri, 15 Sep 2017 14:42:28 GMT
KYLIN-2811, refine spark cubing


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2d939a59
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2d939a59
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2d939a59

Branch: refs/heads/ranger
Commit: 2d939a59fdf385bb20b4724e8a6f87879e26bdd7
Parents: ecf4819
Author: Cheng Wang <cheng.wang@kyligence.io>
Authored: Tue Sep 5 19:10:47 2017 +0800
Committer: Roger Shi <rogershijicheng@gmail.com>
Committed: Tue Sep 5 21:14:02 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    | 23 ++---
 .../engine/mr/common/AbstractHadoopJob.java     | 26 ++++++
 .../kylin/engine/mr/common/BatchConstants.java  |  1 +
 .../spark/SparkBatchCubingJobBuilder2.java      | 10 +--
 .../kylin/engine/spark/SparkCubingByLayer.java  | 95 +++++++++-----------
 .../kylin/engine/spark/SparkExecutable.java     | 30 +++++--
 6 files changed, 108 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index a003638..c7c7f60 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -18,15 +18,6 @@
 
 package org.apache.kylin.common;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.restclient.RestClient;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.OrderedProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -41,6 +32,16 @@ import java.nio.charset.Charset;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.OrderedProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
 /**
  */
 public class KylinConfig extends KylinConfigBase {
@@ -58,7 +59,7 @@ public class KylinConfig extends KylinConfigBase {
 
     // thread-local instances, will override SYS_ENV_INSTANCE
     private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = new ThreadLocal<>();
-    
+
     static {
         /*
          * Make Calcite to work with Unicode.
@@ -226,7 +227,7 @@ public class KylinConfig extends KylinConfigBase {
         }
     }
 
-    private static Properties streamToProps(InputStream is) throws IOException {
+    public static Properties streamToProps(InputStream is) throws IOException {
         Properties prop = new Properties();
         prop.load(is);
         IOUtils.closeQuietly(is);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 081ac93..292c57d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -27,10 +27,12 @@ import static org.apache.hadoop.util.StringUtils.formatTime;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -464,6 +466,30 @@ public abstract class AbstractHadoopJob extends Configured implements
Tool {
         }
     }
 
+    public static KylinConfig loadKylinConfigFromHdfs(String uri) {
+        if (uri == null)
+            throw new IllegalArgumentException("meta url should not be null");
+
+        if (!uri.contains("@hdfs"))
+            throw new IllegalArgumentException("meta url should like @hdfs schema");
+
+        logger.info("Ready to load KylinConfig from uri: {}", uri);
+        KylinConfig config;
+        FileSystem fs;
+        int cut = uri.indexOf('@');
+        String realHdfsPath = uri.substring(0, cut) + "/" + KylinConfig.KYLIN_CONF_PROPERTIES_FILE;
+        try {
+            fs = HadoopUtil.getFileSystem(realHdfsPath);
+            InputStream is = fs.open(new Path(realHdfsPath));
+            Properties prop = KylinConfig.streamToProps(is);
+            config = KylinConfig.createKylinConfig(prop);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        KylinConfig.setKylinConfigThreadLocal(config);
+        return config;
+    }
+
     protected void attachTableMetadata(TableDesc table, Configuration conf) throws IOException
{
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.add(table.getResourcePath());

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 84ca006..bbf38e5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -65,6 +65,7 @@ public interface BatchConstants {
     String CFG_OUTPUT_STATISTICS = "statistics";
     String CFG_OUTPUT_PARTITION = "partition";
     String CFG_MR_SPARK_JOB = "mr.spark.job";
+    String CFG_SPARK_META_URL = "spark.meta.url";
 
     /**
      * command line ARGuments

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 779f340..2773f97 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -47,9 +47,10 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2
{
         sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable()
+ "." + flatTableDesc.getTableName());
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(),
seg.getUuid()));
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfDir().getAbsolutePath());
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
+                seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
+                getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
 
         StringBuilder jars = new StringBuilder();
@@ -57,9 +58,6 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2
{
         StringUtil.appendWithSeparator(jars, findJar("org.htrace.HTraceConfiguration", null));
// htrace-core.jar
         StringUtil.appendWithSeparator(jars, findJar("org.apache.htrace.Trace", null)); //
htrace-core.jar
         StringUtil.appendWithSeparator(jars, findJar("org.cloudera.htrace.HTraceConfiguration",
null)); // htrace-core.jar
-        StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.client.HConnection",
null)); // hbase-client.jar
-        StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.HBaseConfiguration",
null)); // hbase-common.jar
-        StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.util.ByteStringer",
null)); // hbase-protocol.jar
         StringUtil.appendWithSeparator(jars, findJar("com.yammer.metrics.core.Gauge", null));
// metrics-core.jar
         StringUtil.appendWithSeparator(jars, findJar("com.google.common.collect.Maps", "guava"));
//guava.jar
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index dab5fb7..94435f5 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,7 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -54,6 +53,7 @@ import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
@@ -63,7 +63,6 @@ import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
@@ -74,7 +73,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
 import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.util.SizeEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,8 +95,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
             .withDescription("Hive Intermediate Table").create("hiveTable");
-    public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true)
-            .withDescription("Configuration Path").create("confPath");
 
     private Options options;
 
@@ -109,7 +105,6 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
-        options.addOption(OPTION_CONF_PATH);
     }
 
     @Override
@@ -117,22 +112,12 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         return options;
     }
 
-    public static KylinConfig getKylinConfigForExecutor(String metaUrl) {
-        File file = new File(SparkFiles.get(KylinConfig.KYLIN_CONF_PROPERTIES_FILE));
-        String confPath = file.getParentFile().getAbsolutePath();
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        final KylinConfig config = KylinConfig.getInstanceFromEnv();
-        config.setMetadataUrl(metaUrl);
-        return config;
-    }
-
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
         String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
-        String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
 
         Class[] kryoClassArray = new Class[] { org.apache.hadoop.io.Text.class,
@@ -147,14 +132,19 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         JavaSparkContext sc = new JavaSparkContext(conf);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
 
-        sc.addFile(confPath + File.separator + KylinConfig.KYLIN_CONF_PROPERTIES_FILE);
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
+        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
 
         final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
 
+        Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
+        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+        final Job job = Job.getInstance(confOverwrite);
+
+        logger.info("RDD Output path: {}", outputPath);
+        setHadoopConf(job);
+
         int countMeasureIndex = 0;
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
             if (measureDesc.getFunction().isCount() == true) {
@@ -194,26 +184,22 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         final int totalLevels = cubeDesc.getBuildLevel();
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels +
1];
         int level = 0;
-        long baseRDDSize = SizeEstimator.estimate(encodedBaseRDD) / (1024 * 1024);
-        int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig, (int)
baseRDDSize);
+        int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
 
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
-        Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
-        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
 
-        saveToHDFS(allRDDs[0], cubeName, metaUrl, cubeSegment, outputPath, 0, confOverwrite);
+        saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job);
 
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
-            long levelRddSize = SizeEstimator.estimate(allRDDs[level - 1]) / (1024 * 1024);
-            partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig, (int)
levelRddSize);
+            partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
             allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName,
segmentId, metaUrl))
                     .reduceByKey(reducerFunction2, partition).persist(storageLevel);
             if (envConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
             }
-            saveToHDFS(allRDDs[level], cubeName, metaUrl, cubeSegment, outputPath, level,
confOverwrite);
+            saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level,
job);
             allRDDs[level - 1].unpersist();
         }
         allRDDs[totalLevels].unpersist();
@@ -221,9 +207,13 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         deleteHDFSMeta(metaUrl);
     }
 
-    private static int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig
kylinConfig,
-            int rddSize) {
-        int baseCuboidSize = (int) Math.min(rddSize, statsReader.estimateLayerSize(level));
+    protected void setHadoopConf(Job job) throws Exception {
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+    }
+
+    protected int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig
kylinConfig) {
+        double baseCuboidSize = statsReader.estimateLayerSize(level);
         float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
         int partition = (int) (baseCuboidSize / rddCut);
         partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
@@ -231,20 +221,13 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         return partition;
     }
 
-    private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String
cubeName, final String metaUrl,
-            final CubeSegment cubeSeg, final String hdfsBaseLocation, int level, Configuration
conf) throws Exception {
+    protected void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String
metaUrl, final String cubeName,
+            final CubeSegment cubeSeg, final String hdfsBaseLocation, int level, Job job)
throws Exception {
         final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation,
level);
 
-        Job job = Job.getInstance(conf);
         IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
         outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level);
 
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Text.class);
-        job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeSeg.getCubeInstance().getName());
-        job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, cubeSeg.getUuid());
-        job.getConfiguration().set(BatchConstants.CFG_MR_SPARK_JOB, "spark");
-
         rdd.mapToPair(
                 new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text,
org.apache.hadoop.io.Text>() {
                     private volatile transient boolean initialized = false;
@@ -253,10 +236,11 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
                     @Override
                     public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>
call(
                             Tuple2<ByteArray, Object[]> tuple2) throws Exception {
-                        if (!initialized) {
+
+                        if (initialized == false) {
                             synchronized (SparkCubingByLayer.class) {
-                                if (!initialized) {
-                                    KylinConfig kylinConfig = getKylinConfigForExecutor(metaUrl);
+                                if (initialized == false) {
+                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
                                     CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
                                     codec = new BufferedMeasureCodec(desc.getMeasures());
                                     initialized = true;
@@ -269,21 +253,22 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
                         return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()),
                                 new org.apache.hadoop.io.Text(encodedBytes));
                     }
-                }).sortByKey().saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
         logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath);
     }
 
-    static class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]>
{
+    static public class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]>
{
         private volatile transient boolean initialized = false;
         private BaseCuboidBuilder baseCuboidBuilder = null;
         private String cubeName;
         private String segmentId;
-        private String metaurl;
+        private String metaUrl;
 
         public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl) {
             this.cubeName = cubeName;
             this.segmentId = segmentId;
-            this.metaurl = metaurl;
+            this.metaUrl = metaurl;
         }
 
         @Override
@@ -291,7 +276,7 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
             if (initialized == false) {
                 synchronized (SparkCubingByLayer.class) {
                     if (initialized == false) {
-                        KylinConfig kConfig = getKylinConfigForExecutor(metaurl);
+                        KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
                         CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
                         CubeDesc cubeDesc = cubeInstance.getDescriptor();
                         CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -327,7 +312,7 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         }
     }
 
-    static class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]>
{
+    static public class BaseCuboidReducerFunction2 implements Function2<Object[], Object[],
Object[]> {
         protected String cubeName;
         protected String metaUrl;
         protected CubeDesc cubeDesc;
@@ -341,7 +326,7 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         }
 
         public void init() {
-            KylinConfig kConfig = getKylinConfigForExecutor(metaUrl);
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
             CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
             cubeDesc = cubeInstance.getDescriptor();
             aggregators = new MeasureAggregators(cubeDesc.getMeasures());
@@ -364,7 +349,7 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         }
     }
 
-    static class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
+    static public class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
         private boolean[] needAggr;
 
         public CuboidReducerFunction2(String cubeName, String metaUrl, boolean[] needAggr)
{
@@ -390,7 +375,7 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
 
     private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR
= new ArrayList(0);
 
-    static class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>,
ByteArray, Object[]> {
+    static public class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray,
Object[]>, ByteArray, Object[]> {
 
         private String cubeName;
         private String segmentId;
@@ -409,11 +394,12 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         }
 
         public void init() {
-            KylinConfig kConfig = getKylinConfigForExecutor(metaUrl);
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
             CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
             this.cubeSegment = cubeInstance.getSegmentById(segmentId);
             this.cubeDesc = cubeInstance.getDescriptor();
             this.cuboidScheduler = cubeDesc.getCuboidScheduler();
+
             this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
             this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
         }
@@ -456,8 +442,7 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         }
     }
 
-    //sanity check
-    private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount,
int thisLevel,
+    protected void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount,
int thisLevel,
             CubeStatsReader cubeStatsReader, final int countMeasureIndex) {
         int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size();
         Long count2 = getRDDCountSum(rdd, countMeasureIndex);
@@ -487,7 +472,7 @@ public class SparkCubingByLayer extends AbstractApplication implements
Serializa
         return count;
     }
 
-    private void deleteHDFSMeta(String metaUrl) throws IOException {
+    protected void deleteHDFSMeta(String metaUrl) throws IOException {
         int cut = metaUrl.indexOf('@');
         String path = metaUrl.substring(0, cut);
         HadoopUtil.getFileSystem(path).delete(new Path(path), true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index d369e3d..7f4b377 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -34,11 +34,13 @@ import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.slf4j.LoggerFactory;
 
@@ -50,11 +52,16 @@ public class SparkExecutable extends AbstractExecutable {
 
     private static final String CLASS_NAME = "className";
     private static final String JARS = "jars";
+    private static final String JOB_ID = "jobId";
 
     public void setClassName(String className) {
         this.setParam(CLASS_NAME, className);
     }
 
+    public void setJobId(String jobId) {
+        this.setParam(JOB_ID, jobId);
+    }
+
     public void setJars(String jars) {
         this.setParam(JARS, jars);
     }
@@ -66,7 +73,7 @@ public class SparkExecutable extends AbstractExecutable {
             tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append("
");
             if (entry.getKey().equals(CLASS_NAME)) {
                 stringBuilder.insert(0, tmp);
-            } else if (entry.getKey().equals(JARS)) {
+            } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)) {
                 // JARS is for spark-submit, not for app
                 continue;
             } else {
@@ -86,6 +93,8 @@ public class SparkExecutable extends AbstractExecutable {
         CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
         final KylinConfig config = cube.getConfig();
 
+        setAlgorithmLayer();
+
         if (KylinConfig.getSparkHome() == null) {
             throw new NullPointerException();
         }
@@ -99,7 +108,8 @@ public class SparkExecutable extends AbstractExecutable {
         hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
 
         if (StringUtils.isEmpty(hadoopConf)) {
-            throw new RuntimeException("kylin_hadoop_conf_dir is empty, check if there's
error in the output of 'kylin.sh start'");
+            throw new RuntimeException(
+                    "kylin_hadoop_conf_dir is empty, check if there's error in the output
of 'kylin.sh start'");
         }
 
         File hiveConfFile = new File(hadoopConf, "hive-site.xml");
@@ -124,7 +134,8 @@ public class SparkExecutable extends AbstractExecutable {
         }
 
         StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class
org.apache.kylin.common.util.SparkEntry ");
+        stringBuilder.append(
+                "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry
");
 
         Map<String, String> sparkConfs = config.getSparkConfigOverride();
         for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -133,7 +144,8 @@ public class SparkExecutable extends AbstractExecutable {
 
         stringBuilder.append("--jars %s %s %s");
         try {
-            String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(),
jars, jobJar, formatArgs());
+            String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(),
jars, jobJar,
+                    formatArgs());
             logger.info("cmd: " + cmd);
             CliCommandExecutor exec = new CliCommandExecutor();
             PatternedLogger patternedLogger = new PatternedLogger(logger);
@@ -146,6 +158,13 @@ public class SparkExecutable extends AbstractExecutable {
         }
     }
 
+    // Spark Cubing can only work in layer algorithm
+    private void setAlgorithmLayer() {
+        ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubingJob cubingJob = (CubingJob) execMgr.getJob(this.getParam(JOB_ID));
+        cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
+    }
+
     private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
@@ -154,7 +173,8 @@ public class SparkExecutable extends AbstractExecutable {
         dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig());
     }
 
-    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt
kylinConfig) throws IOException {
+    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt
kylinConfig)
+            throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
         FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
 


Mime
View raw message