kylin-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] shaofengshi closed pull request #204: KYLIN-3442 Fact distinct columns in Spark
Date Fri, 24 Aug 2018 09:48:18 GMT
shaofengshi closed pull request #204: KYLIN-3442 Fact distinct columns in Spark
URL: https://github.com/apache/kylin/pull/204
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 700aeb5cb0..ceb93373a2 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -100,6 +100,47 @@
             <version>0.9.10</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+        </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
new file mode 100644
index 0000000000..cb5458da77
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.{DataInputBuffer, Writable}
+import org.apache.hadoop.mapred.RawKeyValueIterator
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.counters.GenericCounter
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter
+import org.apache.hadoop.mapreduce.task.{ReduceContextImpl, TaskAttemptContextImpl}
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, TaskAttemptID, TaskType}
+import org.apache.hadoop.util.Progress
+import org.apache.spark._
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+class MultipleOutputsRDD[K, V](self: RDD[(String, (K, V, String))])
+                              (implicit kt: ClassTag[K], vt: ClassTag[V]) extends Serializable {
+
+  def saveAsNewAPIHadoopDatasetWithMultipleOutputs(conf: Configuration) {
+    val hadoopConf = conf
+    val job = NewAPIHadoopJob.getInstance(hadoopConf)
+    val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+    val jobtrackerID = formatter.format(new Date())
+    val stageId = self.id
+    val jobConfiguration = job.getConfiguration
+    val wrappedConf = new SerializableWritable(jobConfiguration)
+    val outfmt = job.getOutputFormatClass
+    val jobFormat = outfmt.newInstance
+
+    if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+      jobFormat.checkOutputSpecs(job)
+    }
+
+    val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V, String))]) => {
+      val config = wrappedConf.value
+
+      val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
+        context.attemptNumber)
+      val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
+      val format = outfmt.newInstance
+
+      format match {
+        case c: Configurable => c.setConf(wrappedConf.value)
+        case _ => ()
+      }
+
+      val committer = format.getOutputCommitter(hadoopContext)
+      committer.setupTask(hadoopContext)
+
+      val recordWriter = format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]]
+
+      val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value, attemptId, new InputIterator(itr), new GenericCounter, new GenericCounter,
+        recordWriter, committer, new DummyReporter, null, kt.runtimeClass, vt.runtimeClass)
+
+      // use hadoop MultipleOutputs
+      val writer = new MultipleOutputs(taskInputOutputContext)
+
+      try {
+        while (itr.hasNext) {
+          val pair = itr.next()
+          writer.write(pair._1, pair._2._1, pair._2._2, pair._2._3)
+        }
+      } finally {
+        writer.close()
+      }
+      committer.commitTask(hadoopContext)
+      1
+    }: Int
+
+    val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
+    val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
+    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+    jobCommitter.setupJob(jobTaskContext)
+    self.context.runJob(self, writeShard)
+    jobCommitter.commitJob(jobTaskContext)
+  }
+
+  class InputIterator(itr: Iterator[_]) extends RawKeyValueIterator {
+    def getKey: DataInputBuffer = null
+    def getValue: DataInputBuffer = null
+    def getProgress: Progress = null
+    def next = itr.hasNext
+    def close() { }
+  }
+}
+
+object MultipleOutputsRDD {
+  def rddToMultipleOutputsRDD[K, V](rdd: JavaPairRDD[String, (Writable, Writable, String)]) = {
+    new MultipleOutputsRDD(rdd)
+  }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index e5451663e5..5fd7213b2b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.engine.spark;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.StringUtil;
@@ -32,9 +35,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  */
 public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
@@ -61,7 +61,7 @@ public CubingJob build() {
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(jobId));
+        result.addTask(createFactDistinctColumnsSparkStep(jobId));
 
         if (isEnableUHCDictStep()) {
             result.addTask(createBuildUHCDictStep(jobId));
@@ -87,6 +87,32 @@ public CubingJob build() {
         return result;
     }
 
+    public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
+
+        sparkExecutable.setClassName(SparkFactDistinct.class.getName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_PATH.getOpt(), tablePath);
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_OUTPUT_PATH.getOpt(), getFactDistinctColumnsPath(jobId));
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_STATS_SAMPLING_PERCENT.getOpt(), String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+
+        sparkExecutable.setJobId(jobId);
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+
+        sparkExecutable.setJars(jars.toString());
+
+        return sparkExecutable;
+    }
 
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final SparkExecutable sparkExecutable = new SparkExecutable();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cb3af317d9..9f4ae3482a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -27,13 +27,10 @@
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeDescManager;
@@ -68,9 +65,6 @@
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,37 +163,8 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
 
         boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
-        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
-
-        if (isSequenceFile) {
-            encodedBaseRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
-                    .map(new Function<Text, String[]>() {
-                        @Override
-                        public String[] call(Text text) throws Exception {
-                            String s = Bytes.toString(text.getBytes(), 0, text.getLength());
-                            return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
-                        }
-                    }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
-        } else {
-            SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
-            final Dataset intermediateTable = sparkSession.table(hiveTable);
-            encodedBaseRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
-                @Override
-                public String[] call(Row row) throws Exception {
-                    String[] result = new String[row.size()];
-                    for (int i = 0; i < row.size(); i++) {
-                        final Object o = row.get(i);
-                        if (o != null) {
-                            result[i] = o.toString();
-                        } else {
-                            result[i] = null;
-                        }
-                    }
-                    return result;
-                }
-            }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
-
-        }
+        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
+                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
 
         Long totalCount = 0L;
         if (envConfig.isSparkSanityCheckEnabled()) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 637382cba9..612239741f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -375,7 +375,11 @@ private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOExcepti
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
         dumpList.addAll(segment.getDictionaryPaths());
-        dumpList.add(segment.getStatisticsResourcePath());
+        ResourceStore rs = ResourceStore.getStore(segment.getConfig());
+        if (rs.exists(segment.getStatisticsResourcePath())) {
+            // cube statistics is not available for new segment
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
         JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(), this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
     }
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
new file mode 100644
index 0000000000..61e2e534e8
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -0,0 +1,866 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+public class SparkFactDistinct extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkFactDistinct.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
+            .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
+            .withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+    public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+
+    private Options options;
+
+    public SparkFactDistinct() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_INPUT_TABLE);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
+
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") };
+
+        SparkConf conf = new SparkConf().setAppName("Fact distinct columns for:" + cubeName + " segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        sc.sc().addSparkListener(jobListener);
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+
+        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+
+        final Job job = Job.getInstance(sConf.get());
+
+        final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+        logger.info("RDD Output path: {}", outputPath);
+        logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
+        logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+
+        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+        // calculate source record bytes size
+        final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+
+        final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+
+        JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
+
+        JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD.groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
+
+        JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD.mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
+
+        // make each reducer output to respective dir
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
+
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+        // prevent to create zero-sized default output
+        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
+        MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+
+        multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+
+        // only work for client mode, not work when spark.submit.deployMode=cluster
+        logger.info("Map input records={}", recordRDD.count());
+        logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+
+        HadoopUtil.deleteHDFSMeta(metaUrl);
+    }
+
+    static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercent;
+        private CuboidStatCalculator cuboidStatCalculator;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+        private List<TblColRef> allCols;
+        private int[] columnIndex;
+        private DictColDeduper dictColDeduper;
+        private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap;
+        private ByteBuffer tmpbuf;
+        private LongAccumulator bytesWritten;
+
+        public FlatOutputFucntion(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, int samplingPercent, LongAccumulator bytesWritten) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaurl;
+            this.conf = conf;
+            this.samplingPercent = samplingPercent;
+            this.dimensionRangeInfoMap = Maps.newHashMap();
+            this.bytesWritten = bytesWritten;
+        }
+
+        private void init() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+            CubeDesc cubeDesc = cubeInstance.getDescriptor();
+            CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+
+            reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+            tmpbuf = ByteBuffer.allocate(4096);
+
+            int[] rokeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+
+            Long[] cuboidIds = getCuboidIds(cubeSegment);
+
+            Integer[][] cuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, rokeyColumnIndexes.length);
+
+            boolean isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
+
+            HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length, cubeDesc.getConfig().getCubeStatsHLLPrecision());
+
+            cuboidStatCalculator = new CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet, isNewAlgorithm, cuboidsHLL);
+            allCols = reducerMapping.getAllDimDictCols();
+
+            initDictColDeduper(cubeDesc);
+            initColumnIndex(intermediateTableDesc);
+
+            initialized = true;
+        }
+
+        @Override
+        public Iterator<Tuple2<SelfDefineSortableKey, Text>> call(Iterator<String[]> rowIterator) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            List<String[]> rows = Lists.newArrayList(rowIterator);
+            List<Tuple2<SelfDefineSortableKey, Text>> result = Lists.newArrayList();
+
+            int rowCount = 0;
+
+            for (String[] row : rows) {
+                bytesWritten.add(countSizeInBytes(row));
+
+                for (int i = 0; i < allCols.size(); i++) {
+                    String fieldValue = row[columnIndex[i]];
+                    if (fieldValue == null)
+                        continue;
+
+                    final DataType type = allCols.get(i).getType();
+
+                    //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+                    if (dictColDeduper.isDictCol(i)) {
+                        if (dictColDeduper.add(i, fieldValue)) {
+                            addFieldValue(type, i, fieldValue, result);
+                        }
+                    } else {
+                        DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+                        if (old == null) {
+                            old = new DimensionRangeInfo(fieldValue, fieldValue);
+                            dimensionRangeInfoMap.put(i, old);
+                        } else {
+                            old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+                            old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+                        }
+                    }
+                }
+
+                if (rowCount % 100 < samplingPercent) {
+                    cuboidStatCalculator.putRow(row);
+                }
+
+                if (rowCount % 100 == 0) {
+                    dictColDeduper.resetIfShortOfMem();
+                }
+
+                rowCount++;
+            }
+
+            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+            HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+            HLLCounter hll;
+
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = cuboidsHLL[i];
+                tmpbuf.clear();
+                tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+                tmpbuf.putLong(cuboidIds[i]);
+                Text outputKey = new Text();
+                Text outputValue = new Text();
+                SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+
+                sortableKey.init(outputKey, (byte) 0);
+
+                result.add(new Tuple2<SelfDefineSortableKey, Text>(sortableKey, outputValue));
+            }
+
+            for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+                DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+                DataType dataType = allCols.get(colIndex).getType();
+                addFieldValue(dataType, colIndex, rangeInfo.getMin(), result);
+                addFieldValue(dataType, colIndex, rangeInfo.getMax(), result);
+            }
+
+            return result.iterator();
+        }
+
+        private boolean isUsePutRowKeyToHllNewAlgorithm(CubeDesc cubeDesc) {
+            boolean isUsePutRowKeyToHllNewAlgorithm;
+            if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+                isUsePutRowKeyToHllNewAlgorithm = false;
+                logger.info("Found KylinVersion: {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+            } else {
+                isUsePutRowKeyToHllNewAlgorithm = true;
+                logger.info(
+                        "Found KylinVersion: {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+                        cubeDesc.getVersion());
+            }
+            return isUsePutRowKeyToHllNewAlgorithm;
+        }
+
+        private Long[] getCuboidIds(CubeSegment cubeSegment) {
+            Set<Long> cuboidIdSet = Sets.newHashSet(cubeSegment.getCuboidScheduler().getAllCuboidIds());
+            if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSegment)) {
+                // For cube planner, for every prebuilt cuboid, its related row count stats should be calculated
+                // If the precondition for trigger cube planner phase one is satisfied, we need to calculate row count stats for mandatory cuboids.
+                cuboidIdSet.addAll(cubeSegment.getCubeDesc().getMandatoryCuboids());
+            }
+
+            return cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+        }
+
+        private HLLCounter[] getInitCuboidsHLL(int cuboidSize, int hllPrecision) {
+            HLLCounter[] cuboidsHLL = new HLLCounter[cuboidSize];
+            for (int i = 0; i < cuboidSize; i++) {
+                cuboidsHLL[i] = new HLLCounter(hllPrecision, RegisterType.DENSE);
+            }
+            return cuboidsHLL;
+        }
+
+        private void initDictColDeduper(CubeDesc cubeDesc) {
+            // setup dict col deduper
+            dictColDeduper = new DictColDeduper();
+            Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+            for (int i = 0; i < allCols.size(); i++) {
+                if (dictCols.contains(allCols.get(i)))
+                    dictColDeduper.setIsDictCol(i);
+            }
+        }
+
+        private void initColumnIndex(CubeJoinedFlatTableEnrich intermediateTableDesc) {
+            columnIndex = new int[allCols.size()];
+            for (int i = 0; i < allCols.size(); i++) {
+                TblColRef colRef = allCols.get(i);
+                int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+                columnIndex[i] = columnIndexOnFlatTbl;
+            }
+        }
+
+        private void addFieldValue(DataType type, Integer colIndex, String value,
+                List<Tuple2<SelfDefineSortableKey, Text>> result) {
+            int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+            tmpbuf.clear();
+            byte[] valueBytes = Bytes.toBytes(value);
+            int size = valueBytes.length + 1;
+            if (size >= tmpbuf.capacity()) {
+                tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+            }
+            tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+            tmpbuf.put(valueBytes);
+
+            Text outputKey = new Text();
+            SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+            outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+            sortableKey.init(outputKey, type);
+
+            result.add(new Tuple2<SelfDefineSortableKey, Text>(sortableKey, new Text()));
+
+            // log a few rows for troubleshooting
+            if (result.size() < 10) {
+                logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+            }
+        }
+
+        private int countNewSize(int oldSize, int dataSize) {
+            int newSize = oldSize * 2;
+            while (newSize < dataSize) {
+                newSize = newSize * 2;
+            }
+            return newSize;
+        }
+
+        private int countSizeInBytes(String[] row) {
+            int size = 0;
+            for (String s : row) {
+                size += s == null ? 1 : StringUtil.utf8Length(s);
+                size++; // delimiter
+            }
+            return size;
+        }
+    }
+
+    static class CuboidStatCalculator {
+        private final int nRowKey;
+        private final int[] rowkeyColIndex;
+        private final Long[] cuboidIds;
+        private final Integer[][] cuboidsBitSet;
+        private volatile HLLCounter[] cuboidsHLL;
+
+        //about details of the new algorithm, please see KYLIN-2518
+        private final boolean isNewAlgorithm;
+        private final HashFunction hf;
+        private long[] rowHashCodesLong;
+
+        public CuboidStatCalculator(int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet,
+                boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) {
+            this.nRowKey = rowkeyColIndex.length;
+            this.rowkeyColIndex = rowkeyColIndex;
+            this.cuboidIds = cuboidIds;
+            this.cuboidsBitSet = cuboidsBitSet;
+            this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+            if (!isNewAlgorithm) {
+                this.hf = Hashing.murmur3_32();
+            } else {
+                rowHashCodesLong = new long[nRowKey];
+                this.hf = Hashing.murmur3_128();
+            }
+            this.cuboidsHLL = cuboidsHLL;
+        }
+
+        public void putRow(final String[] row) {
+            String[] copyRow = Arrays.copyOf(row, row.length);
+
+            if (isNewAlgorithm) {
+                putRowKeyToHLLNew(copyRow);
+            } else {
+                putRowKeyToHLLOld(copyRow);
+            }
+        }
+
+        private void putRowKeyToHLLOld(String[] row) {
+            //generate hash for each row key column
+            byte[][] rowHashCodes = new byte[nRowKey][];
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue != null) {
+                    rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+                } else {
+                    rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+                }
+            }
+
+            // user the row key column hash to get a consolidated hash for each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                Hasher hc = hf.newHasher();
+                for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+                    hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+                }
+
+                cuboidsHLL[i].add(hc.hash().asBytes());
+            }
+        }
+
+        private void putRowKeyToHLLNew(String[] row) {
+            //generate hash for each row key column
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue == null)
+                    colValue = "0";
+                byte[] bytes = hc.putString(colValue).hash().asBytes();
+                rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+            }
+
+            // user the row key column hash to get a consolidated hash for each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                long value = 0;
+                for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+                    value += rowHashCodesLong[cuboidsBitSet[i][position]];
+                }
+                cuboidsHLL[i].addHashDirectly(value);
+            }
+        }
+
+        public HLLCounter[] getHLLCounters() {
+            return cuboidsHLL;
+        }
+
+        public Long[] getCuboidIds() {
+            return cuboidIds;
+        }
+    }
+
+    static class FactDistinctPartitioner extends Partitioner {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int totalReducerNum;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+
+        public FactDistinctPartitioner(String cubeName, String metaUrl, SerializableConfiguration conf, int totalReducerNum) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.totalReducerNum = totalReducerNum;
+        }
+
+        private void init() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+            reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+            initialized = true;
+        }
+
+        @Override
+        public int numPartitions() {
+            return totalReducerNum;
+        }
+
+        @Override
+        public int getPartition(Object o) {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            SelfDefineSortableKey skey = (SelfDefineSortableKey) o;
+            Text key = skey.getText();
+            if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+                Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+                return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
+            } else {
+                return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
+            }
+        }
+    }
+
+    static class MultiOutputFunction implements
+            PairFlatMapFunction<Iterator<Tuple2<SelfDefineSortableKey, Iterable<Text>>>, String, Tuple3<Writable, Writable, String>> {
+        private volatile transient boolean initialized = false;
+        private String DICT_FILE_POSTFIX = ".rldict";
+        private String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
+        private String cubeName;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercent;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+        private int taskId;
+        private boolean isStatistics = false;
+        private long baseCuboidId;
+        private List<Long> baseCuboidRowCountInMappers;
+        private Map<Long, HLLCounter> cuboidHLLMap;
+        private TblColRef col;
+        private boolean buildDictInReducer;
+        private IDictionaryBuilder builder;
+        private int rowCount = 0;
+        private long totalRowsBeforeMerge = 0;
+        private KylinConfig cubeConfig;
+        private CubeDesc cubeDesc;
+        private String maxValue = null;
+        private String minValue = null;
+        private List<Tuple2<String, Tuple3<Writable, Writable, String>>> result;
+
+        public MultiOutputFunction(String cubeName, String metaurl, SerializableConfiguration conf, int samplingPercent) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaurl;
+            this.conf = conf;
+            this.samplingPercent = samplingPercent;
+        }
+
+        private void init() throws IOException {
+            taskId = TaskContext.getPartitionId();
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+            cubeDesc = cubeInstance.getDescriptor();
+            cubeConfig = cubeInstance.getConfig();
+            reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+            result = Lists.newArrayList();
+
+            if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+                // hll
+                isStatistics = true;
+                baseCuboidId = cubeInstance.getCuboidScheduler().getBaseCuboidId();
+                baseCuboidRowCountInMappers = Lists.newArrayList();
+                cuboidHLLMap = Maps.newHashMap();
+
+                logger.info("Partition " + taskId + " handling stats");
+            } else {
+                // normal col
+                col = reducerMapping.getColForReducer(taskId);
+                Preconditions.checkNotNull(col);
+
+                // local build dict
+                buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
+                if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
+                    buildDictInReducer = false;
+                }
+
+                if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+                    buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
+                }
+
+                if (buildDictInReducer) {
+                    builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+                    builder.init(null, 0, null);
+                }
+                logger.info("Partition " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer);
+            }
+
+            initialized = true;
+        }
+
+        private void logAFewRows(String value) {
+            if (rowCount < 10) {
+                logger.info("Received value: " + value);
+            }
+        }
+
+        @Override
+        public Iterator<Tuple2<String, Tuple3<Writable, Writable, String>>> call(
+                Iterator<Tuple2<SelfDefineSortableKey, Iterable<Text>>> tuple2Iterator) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            List<Tuple2<SelfDefineSortableKey, Iterable<Text>>> tuples = Lists.newArrayList(tuple2Iterator);
+
+            for (Tuple2<SelfDefineSortableKey, Iterable<Text>> tuple : tuples) {
+                Text key = tuple._1.getText();
+
+                if (isStatistics) {
+                    // for hll
+                    long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+
+                    for (Text value : tuple._2) {
+                        HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
+                        ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
+                        hll.readRegisters(bf);
+
+                        totalRowsBeforeMerge += hll.getCountEstimate();
+
+                        if (cuboidId == baseCuboidId) {
+                            baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+                        }
+
+                        if (cuboidHLLMap.get(cuboidId) != null) {
+                            cuboidHLLMap.get(cuboidId).merge(hll);
+                        } else {
+                            cuboidHLLMap.put(cuboidId, hll);
+                        }
+                    }
+
+                } else {
+                    String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+                    logAFewRows(value);
+                    // if dimension col, compute max/min value
+                    if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                        if (minValue == null || col.getType().compare(minValue, value) > 0) {
+                            minValue = value;
+                        }
+                        if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+                            maxValue = value;
+                        }
+                    }
+
+                    //if dict column
+                    if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                        if (buildDictInReducer) {
+                            builder.addValue(value);
+                        } else {
+                            byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+                            // output written to baseDir/colName/-r-00000 (etc)
+                            String fileName = col.getIdentity() + "/";
+                            result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(
+                                    BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
+                                    NullWritable.get(), new Text(keyBytes), fileName)));
+                        }
+                    }
+                }
+
+                rowCount++;
+            }
+
+            if (isStatistics) {
+                //output the hll info;
+                List<Long> allCuboids = Lists.newArrayList();
+                allCuboids.addAll(cuboidHLLMap.keySet());
+                Collections.sort(allCuboids);
+
+                logMapperAndCuboidStatistics(allCuboids); // for human check
+                outputStatistics(allCuboids, result);
+            } else {
+                //dimension col
+                if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                    outputDimRangeInfo(result);
+                }
+                // dic col
+                if (buildDictInReducer) {
+                    Dictionary<String> dict = builder.build();
+                    outputDict(col, dict, result);
+                }
+            }
+
+            return result.iterator();
+        }
+
+        private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
+            logger.info("Cuboid number for task: " + taskId + "\t" + allCuboids.size());
+            logger.info("Samping percentage: \t" + samplingPercent);
+            logger.info("The following statistics are collected based on sampling data. ");
+            logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+
+            for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+                if (baseCuboidRowCountInMappers.get(i) > 0) {
+                    logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+                }
+            }
+
+            long grantTotal = 0;
+            for (long i : allCuboids) {
+                grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+                logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+            }
+
+            logger.info("Sum of row counts (before merge) is: \t " + totalRowsBeforeMerge);
+            logger.info("After merge, the row count: \t " + grantTotal);
+        }
+
+        private void outputDimRangeInfo(List<Tuple2<String, Tuple3<Writable, Writable, String>>> result) {
+            if (col != null && minValue != null) {
+                // output written to baseDir/colName/colName.dci-r-00000 (etc)
+                String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+                        new Tuple3<Writable, Writable, String>(NullWritable.get(), new Text(minValue.getBytes()),
+                                dimRangeFileName)));
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+                        new Tuple3<Writable, Writable, String>(NullWritable.get(), new Text(maxValue.getBytes()),
+                                dimRangeFileName)));
+                logger.info("write dimension range info for col : " + col.getName() + "  minValue:" + minValue
+                        + " maxValue:" + maxValue);
+            }
+        }
+
+        private void outputDict(TblColRef col, Dictionary<String> dict, List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+                throws IOException, InterruptedException {
+            // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+            String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
+
+            try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos)) {
+                outputStream.writeUTF(dict.getClass().getName());
+                dict.write(outputStream);
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_DICT,
+                        new Tuple3<Writable, Writable, String>(NullWritable.get(),
+                                new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName)));
+            }
+        }
+
+        private void outputStatistics(List<Long> allCuboids, List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+                throws IOException, InterruptedException {
+            // output written to baseDir/statistics/statistics-r-00000 (etc)
+            String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+
+            // mapper overlap ratio at key -1
+            long grandTotal = 0;
+
+            for (HLLCounter hll : cuboidHLLMap.values()) {
+                grandTotal += hll.getCountEstimate();
+            }
+
+            double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new LongWritable(-1),
+                            new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName)));
+
+            // mapper number at key -2
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new LongWritable(-2),
+                            new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName)));
+
+            // sampling percentage at key 0
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new LongWritable(0L),
+                            new BytesWritable(Bytes.toBytes(samplingPercent)), statisticsFileName)));
+
+            ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+            for (long i : allCuboids) {
+                valueBuf.clear();
+                cuboidHLLMap.get(i).writeRegisters(valueBuf);
+                valueBuf.flip();
+
+                byte[] valueCopy = new byte[valueBuf.limit()];
+                System.arraycopy(valueBuf.array(), 0, valueCopy, 0, valueBuf.limit());
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                        new Tuple3<Writable, Writable, String>(new LongWritable(i),
+                                new BytesWritable(valueCopy, valueCopy.length), statisticsFileName)));
+            }
+        }
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 31eebc83c3..82a1a9be06 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -24,23 +24,31 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
 
 public class SparkUtil {
 
@@ -130,6 +138,41 @@ public static void modifySparkHadoopConfiguration(SparkContext sc) throws Except
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
-  }
+    }
+
+    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) {
+        JavaRDD<String[]> recordRDD;
+
+        if (isSequenceFile) {
+            recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+                    .map(new Function<Text, String[]>() {
+                        @Override
+                        public String[] call(Text text) throws Exception {
+                            String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+                            return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+                        }
+                    });
+        } else {
+            SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+            final Dataset intermediateTable = sparkSession.table(hiveTable);
+            recordRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+                @Override
+                public String[] call(Row row) throws Exception {
+                    String[] result = new String[row.size()];
+                    for (int i = 0; i < row.size(); i++) {
+                        final Object o = row.get(i);
+                        if (o != null) {
+                            result[i] = o.toString();
+                        } else {
+                            result[i] = null;
+                        }
+                    }
+                    return result;
+                }
+            });
+        }
+
+        return recordRDD;
+    }
 
 }
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 16bedb50e8..a3e7e68517 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -307,17 +307,14 @@
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <version>2.11.0</version>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-compiler</artifactId>
-            <version>2.11.0</version>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-reflect</artifactId>
-            <version>2.11.0</version>
         </dependency>
     </dependencies>
 
diff --git a/pom.xml b/pom.xml
index d9b9efe20b..cd186599cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
         <spark.version>2.1.2</spark.version>
         <kryo.version>4.0.0</kryo.version>
 
+        <!-- Scala versions -->
+        <scala.version>2.11.0</scala.version>
+
         <!-- <reflections.version>0.9.10</reflections.version> -->
 
         <!-- Calcite Version -->
@@ -895,6 +898,24 @@
                 <version>${tomcat.version}</version>
                 <scope>provided</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-library</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-compiler</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-reflect</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message