kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [1/2] kylin git commit: KYLIN-2764 Build UHC Dict Use MR
Date Mon, 23 Oct 2017 03:59:07 GMT
Repository: kylin
Updated Branches:
  refs/heads/master d0ff77444 -> 48f3fb195


KYLIN-2764 Build UHC Dict Use MR


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3941b4da
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3941b4da
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3941b4da

Branch: refs/heads/master
Commit: 3941b4da36b255c8d44ce3dcea75ecef11577155
Parents: d0ff774
Author: kangkaisen <kangkaisen@meituan.com>
Authored: Fri Jul 21 15:22:30 2017 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Sat Oct 21 22:17:42 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  11 +-
 .../java/org/apache/kylin/cube/CubeManager.java | 102 ++++++++----
 .../apache/kylin/dict/DictionaryGenerator.java  |  14 +-
 .../org/apache/kylin/dict/DictionaryInfo.java   |   4 +
 .../apache/kylin/dict/DictionaryManager.java    |  40 +++++
 .../kylin/dict/GlobalDictionaryBuilder.java     |   9 +-
 .../apache/kylin/dict/IDictionaryBuilder.java   |   2 +-
 .../global/SegmentAppendTrieDictBuilder.java    |   4 +-
 .../kylin/dict/DictionaryProviderTest.java      |   2 +-
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  26 +++-
 .../kylin/engine/mr/JobBuilderSupport.java      |  24 ++-
 .../engine/mr/common/AbstractHadoopJob.java     |   2 +
 .../kylin/engine/mr/common/BatchConstants.java  |   2 +
 .../engine/mr/steps/CreateDictionaryJob.java    |  16 +-
 .../mr/steps/FactDistinctColumnsReducer.java    |   2 +-
 .../kylin/engine/mr/steps/UHCDictionaryJob.java | 154 +++++++++++++++++++
 .../engine/mr/steps/UHCDictionaryMapper.java    | 101 ++++++++++++
 .../mr/steps/UHCDictionaryPartitioner.java      |  30 ++++
 .../engine/mr/steps/UHCDictionaryReducer.java   | 113 ++++++++++++++
 .../test_case_data/sandbox/kylin.properties     |   5 +
 .../dict/ITGlobalDictionaryBuilderTest.java     |   4 +-
 22 files changed, 611 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f3cf6c0..d204f71 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -967,6 +967,10 @@ abstract public class KylinConfigBase implements Serializable {
         return getPropertiesByPrefix("kylin.engine.mr.config-override.");
     }
 
+    public Map<String, String> getUHCMRConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override.");
+    }
+
     public Map<String, String> getSparkConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
@@ -993,9 +997,14 @@ abstract public class KylinConfigBase implements Serializable {
 
     //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
     public int getUHCReducerCount() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "1"));
+        return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5"));
     }
 
+    public boolean isBuildUHCDictWithMREnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict", "true"));
+    }
+
+
     public boolean isBuildDictInReducerEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", "true"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3bb9f21..180c3f4 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -30,7 +30,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -832,6 +831,74 @@ public class CubeManager implements IRealizationProvider {
         return getCube(name);
     }
 
+    // ============================================================================
+
+
+    /**
+     * Get the columns which need build the dictionary from fact table. (the column exists on fact and is not fk)
+     * @param cubeDesc
+     * @return
+     * @throws IOException
+     */
+    public List<TblColRef> getAllDictColumnsOnFact(CubeDesc cubeDesc) throws IOException {
+        List<TblColRef> factDictCols = new ArrayList<TblColRef>();
+        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+        for (TblColRef col : cubeDesc.getAllColumnsNeedDictionaryBuilt()) {
+
+            String scanTable = dictMgr.decideSourceData(cubeDesc.getModel(), col).getTable();
+            if (cubeDesc.getModel().isFactTable(scanTable)) {
+                factDictCols.add(col);
+            }
+        }
+        return factDictCols;
+    }
+
+    public List<TblColRef> getAllGlobalDictColumns(CubeDesc cubeDesc) {
+        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
+        List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+
+        if (dictionaryDescList == null) {
+            return globalDictCols;
+        }
+
+        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+            if (dictionaryDesc.getBuilderClass() != null) {
+                globalDictCols.add(dictionaryDesc.getColumnRef());
+            }
+        }
+        return globalDictCols;
+    }
+
+    //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+    public List<TblColRef> getAllUHCColumns(CubeDesc cubeDesc) {
+        List<TblColRef> uhcColumns = new ArrayList<TblColRef>();
+        uhcColumns.addAll(getAllGlobalDictColumns(cubeDesc));
+        uhcColumns.addAll(cubeDesc.getShardByColumns());
+
+        //handle PK-FK, see getAllDictColumnsOnFact
+        try {
+            uhcColumns.retainAll(getAllDictColumnsOnFact(cubeDesc));
+        } catch (IOException e) {
+            throw new RuntimeException("Get all dict columns on fact failed");
+        }
+
+        return uhcColumns;
+    }
+
+    public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+        List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc);
+        List<TblColRef> uhcColumns = getAllUHCColumns(cubeDesc);
+        int[] uhcIndex = new int[factDictCols.size()];
+
+        for (int i = 0; i < factDictCols.size(); i++) {
+            if (uhcColumns.contains(factDictCols.get(i))) {
+                uhcIndex[i] = 1;
+            }
+        }
+
+        return uhcIndex;
+    }
+
     /**
      * Calculate the holes (gaps) in segments.
      * @param cubeName
@@ -872,37 +939,4 @@ public class CubeManager implements IRealizationProvider {
         return holes;
     }
 
-    private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
-
-    //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
-    public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
-        List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
-        int[] uhcIndex = new int[dictCols.size()];
-
-        //add GlobalDictionaryColumns
-        List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
-        if (dictionaryDescList != null) {
-            for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
-                if (dictionaryDesc.getBuilderClass() != null
-                        && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
-                    for (int i = 0; i < dictCols.size(); i++) {
-                        if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
-                            uhcIndex[i] = 1;
-                            break;
-                        }
-                    }
-                }
-            }
-        }
-
-        //add ShardByColumns
-        Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
-        for (int i = 0; i < dictCols.size(); i++) {
-            if (shardByColumns.contains(dictCols.get(i))) {
-                uhcIndex[i] = 1;
-            }
-        }
-
-        return uhcIndex;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 61a0664..5fdecdb 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -70,7 +70,7 @@ public class DictionaryGenerator {
         ArrayList<String> samples = new ArrayList<String>(nSamples);
 
         // init the builder
-        builder.init(dictInfo, baseId);
+        builder.init(dictInfo, baseId, null);
 
         // add values
         while (valueEnumerator.moveNext()) {
@@ -111,7 +111,7 @@ public class DictionaryGenerator {
         private String datePattern;
 
         @Override
-        public void init(DictionaryInfo info, int baseId) throws IOException {
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
             this.baseId = baseId;
         }
 
@@ -152,7 +152,7 @@ public class DictionaryGenerator {
     private static class TimeDictBuilder implements IDictionaryBuilder {
 
         @Override
-        public void init(DictionaryInfo info, int baseId) throws IOException {
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
         }
 
         @Override
@@ -176,7 +176,7 @@ public class DictionaryGenerator {
         TrieDictionaryBuilder builder;
         
         @Override
-        public void init(DictionaryInfo info, int baseId) throws IOException {
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
             this.baseId = baseId;
             this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
         }
@@ -200,7 +200,7 @@ public class DictionaryGenerator {
         TrieDictionaryForestBuilder builder;
 
         @Override
-        public void init(DictionaryInfo info, int baseId) throws IOException {
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
             builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
         }
 
@@ -225,7 +225,7 @@ public class DictionaryGenerator {
         NumberDictionaryBuilder builder;
         
         @Override
-        public void init(DictionaryInfo info, int baseId) throws IOException {
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
             this.baseId = baseId;
             this.builder = new NumberDictionaryBuilder();
         }
@@ -249,7 +249,7 @@ public class DictionaryGenerator {
         NumberDictionaryForestBuilder builder;
 
         @Override
-        public void init(DictionaryInfo info, int baseId) throws IOException {
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
             builder = new NumberDictionaryForestBuilder(baseId);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index ae5c0f1..bfb1995 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -52,6 +52,10 @@ public class DictionaryInfo extends RootPersistentEntity {
     public DictionaryInfo() {
     }
 
+    public DictionaryInfo(ColumnDesc col, String dataType) {
+        this(col.getTable().getIdentity(), col.getName(), col.getZeroBasedIndex(), dataType, null);
+    }
+
     public DictionaryInfo(ColumnDesc col, String dataType, TableSignature input) {
         this(col.getTable().getIdentity(), col.getName(), col.getZeroBasedIndex(), dataType, input);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index e97899c..7ce608b 100755
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -29,13 +29,17 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.DataModelManager;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -342,6 +346,42 @@ public class DictionaryManager {
         return dictInfo;
     }
 
+    /**
+     * Decide a dictionary's source data, leverage PK-FK relationship.
+     */
+    public TblColRef decideSourceData(DataModelDesc model, TblColRef col) {
+        // Note FK on fact table is supported by scan the related PK on lookup table
+        // FK on fact table and join type is inner, use PK from lookup instead
+        if (model.isFactTable(col.getTable()) == false)
+            return col;
+
+        // find a lookup table that the col joins as FK
+        for (TableRef lookup : model.getLookupTables()) {
+            JoinDesc lookupJoin = model.getJoinByPKSide(lookup);
+            int find = ArrayUtils.indexOf(lookupJoin.getForeignKeyColumns(), col);
+            if (find < 0)
+                continue;
+
+            // make sure the joins are all inner up to the root
+            if (isAllInnerJoinsToRoot(model, lookupJoin))
+                return lookupJoin.getPrimaryKeyColumns()[find];
+        }
+
+        return col;
+    }
+
+    private boolean isAllInnerJoinsToRoot(DataModelDesc model, JoinDesc join) {
+        while (join != null) {
+            if (join.isInnerJoin() == false)
+                return false;
+
+            TableRef table = join.getFKSide();
+            join = model.getJoinByPKSide(table);
+        }
+        return true;
+    }
+
+
     private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
         final ResourceStore store = DataModelManager.getInstance(config).getStore();
         final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 404d53c..8250fed 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -42,14 +42,17 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
 
-    @Override
-    public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
+    public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) throws IOException {
         sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn();
         lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
         lock.lock(getLockPath(sourceColumn), Long.MAX_VALUE);
 
         int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
-        String baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir() + "/";
+        if (hdfsDir == null) {
+            //build in Kylin job server
+            hdfsDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+        }
+        String baseDir = hdfsDir + "resources/GlobalDict" + dictInfo.getResourceDir() + "/";
         this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, true);
         this.baseId = baseId;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
index 0934a7d..18bbb07 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
@@ -28,7 +28,7 @@ import org.apache.kylin.common.util.Dictionary;
 public interface IDictionaryBuilder {
 
     /** Sets the dictionary info for the dictionary being built. Mainly for GlobalDictionaryBuilder. */
-    void init(DictionaryInfo info, int baseId) throws IOException;
+    void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException;
     
     /** Add a new value into dictionary, returns it is accepted (not null) or not. */
     boolean addValue(String value);

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
index c8bc13d..e5f2d57 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
@@ -38,14 +38,14 @@ public class SegmentAppendTrieDictBuilder implements IDictionaryBuilder {
     private String sourceColumn;
 
     @Override
-    public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
+    public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) throws IOException {
         sourceColumn = dictInfo.getSourceTable() + "." + dictInfo.getSourceColumn();
 
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         int maxEntriesPerSlice = config.getAppendDictEntrySize();
         //use UUID to make each segment dict in different HDFS dir and support concurrent build
         //use timestamp to make the segment dict easily to delete
-        String baseDir = config.getHdfsWorkingDirectory() + "resources/SegmentDict" + dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/";
+        String baseDir = hdfsDir + "resources/SegmentDict" + dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/";
 
         this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, false);
         this.baseId = baseId;

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
index 4b386a7..7e2e218 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
@@ -84,7 +84,7 @@ public class DictionaryProviderTest extends LocalFileMetadataTestCase{
 
     private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception {
         IDictionaryBuilder builder = DictionaryGenerator.newDictionaryBuilder(type);
-        builder.init(null, 0);
+        builder.init(null, 0, null);
         while (values.hasNext()) {
             builder.addValue(values.next());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 2de3efa..36496fe 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -33,6 +33,7 @@ public final class ExecutableConstants {
     public static final String SOURCE_RECORDS_SIZE = "source_records_size";
 
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
+    public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 51c9056..4b808d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.mr;
 
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
@@ -31,9 +32,12 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 public class BatchCubingJobBuilder2 extends JobBuilderSupport {
     private static final Logger logger = LoggerFactory.getLogger(BatchCubingJobBuilder2.class);
 
@@ -58,6 +62,11 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         // Phase 2: Build Dictionary
         result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+
+        if (isEnableUHCDictStep()) {
+            result.addTask(createBuildUHCDictStep(jobId));
+        }
+
         result.addTask(createBuildDictionaryStep(jobId));
         result.addTask(createSaveStatisticsStep(jobId));
         outputSide.addStepPhase2_BuildDictionary(result);
@@ -75,13 +84,26 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
+    private boolean isEnableUHCDictStep() {
+        if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
+            return false;
+        }
+
+        List<TblColRef> uhcColumns = CubeManager.getInstance(config.getConfig()).getAllUHCColumns(seg.getCubeDesc());
+        if (uhcColumns.size() == 0) {
+            return false;
+        }
+
+        return true;
+    }
+
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final int maxLevel = seg.getCuboidScheduler().getBuildLevel();
         // base cuboid step
         result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
         // n dim cuboid steps
         for (int i = 1; i <= maxLevel; i++) {
-            result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId));
+            result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId));
         }
     }
 
@@ -138,7 +160,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
-//        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+        //        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
         return baseCuboidStep;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index c1ed345..2a51c89 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -26,6 +26,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
@@ -81,6 +82,22 @@ public class JobBuilderSupport {
         return result;
     }
 
+    public MapReduceExecutable createBuildUHCDictStep(String jobId) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
+        result.setMapReduceJobClass(UHCDictionaryJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getDictRootPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Build_UHC_Dict" + seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        result.setMapReduceParams(cmd.toString());
+        result.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+        return result;
+    }
+
     public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
         // base cuboid job
         HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
@@ -89,6 +106,7 @@ public class JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_DICT_PATH, getDictRootPath(jobId));
 
         buildDictionaryStep.setJobParams(cmd.toString());
         buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
@@ -104,7 +122,6 @@ public class JobBuilderSupport {
         CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
         CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
 
-
         return result;
     }
 
@@ -176,6 +193,10 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS;
     }
 
+    public String getDictRootPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/dict";
+    }
+
     // ============================================================================
     // static methods also shared by other job flow participant
     // ----------------------------------------------------------------------------
@@ -203,5 +224,4 @@ public class JobBuilderSupport {
         }
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index babf69b..1756251 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -97,6 +97,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             .hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT);
     protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
             .isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT);
+    protected static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg()
+            .isRequired(false).withDescription("Dict path").create(BatchConstants.ARG_DICT_PATH);
     protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg()
             .isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL);
     protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 52b6af5..25a67f9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -66,6 +66,7 @@ public interface BatchConstants {
     String CFG_OUTPUT_PARTITION = "partition";
     String CFG_MR_SPARK_JOB = "mr.spark.job";
     String CFG_SPARK_META_URL = "spark.meta.url";
+    String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir";
 
     /**
      * command line ARGuments
@@ -87,6 +88,7 @@ public interface BatchConstants {
     String ARG_INPUT_FORMAT = "inputformat";
     String ARG_LEVEL = "level";
     String ARG_CONF = "conf";
+    String ARG_DICT_PATH = "dictPath";
 
     /**
      * logger and counter

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 98ebbb4..d64d300 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +36,8 @@ import org.apache.kylin.common.util.ByteBufferBackedInputStream;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
 import org.apache.kylin.dict.DictionaryProvider;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
@@ -55,11 +58,13 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
         options.addOption(OPTION_CUBE_NAME);
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_DICT_PATH);
         parseOptions(options, args);
 
         final String cubeName = getOptionValue(OPTION_CUBE_NAME);
         final String segmentID = getOptionValue(OPTION_SEGMENT_ID);
         final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+        final String dictPath = getOptionValue(OPTION_DICT_PATH);
 
         final KylinConfig config = KylinConfig.getInstanceFromEnv();
 
@@ -72,7 +77,16 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
 
             @Override
             public Dictionary<String> getDictionary(TblColRef col) throws IOException {
-                Path colDir = new Path(factColumnsInputPath, col.getIdentity());
+                CubeManager cubeManager = CubeManager.getInstance(config);
+                CubeInstance cube = cubeManager.getCube(cubeName);
+                List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor());
+
+                Path colDir;
+                if (uhcColumns.contains(col)) {
+                    colDir = new Path(dictPath, col.getIdentity());
+                } else {
+                    colDir = new Path(factColumnsInputPath, col.getIdentity());
+                }
                 FileSystem fs = HadoopUtil.getWorkingFileSystem();
 
                 Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 73c8a20..74c8c2c 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -141,7 +141,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
             }
             if (buildDictInReducer) {
                 builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
-                builder.init(null, 0);
+                builder.init(null, 0, null);
             }
             logger.info("Reducer " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
new file mode 100644
index 0000000..485975a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class UHCDictionaryJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(UHCDictionaryJob.class);
+
+    private boolean isSkipped = false;
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_INPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+
+            //add metadata to distributed cache
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            attachCubeMetadata(cube, job.getConfiguration());
+
+            List<TblColRef> uhcColumns = cubeMgr.getAllUHCColumns(cube.getDescriptor());
+            int reducerCount = uhcColumns.size();
+
+            //Note! handle uhc columns is null.
+            boolean hasUHCValue = false;
+            for (TblColRef tblColRef : uhcColumns) {
+                Path path = new Path(input.toString() + "/" + tblColRef.getIdentity());
+                if (HadoopUtil.getFileSystem(path).exists(path)) {
+                    FileInputFormat.addInputPath(job, path);
+                    hasUHCValue = true;
+                }
+            }
+
+            if (!hasUHCValue) {
+                isSkipped = true;
+                return 0;
+            }
+
+            setJobClasspath(job, cube.getConfig());
+            setupMapper();
+            setupReducer(output, reducerCount);
+
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
+            job.getConfiguration().set(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+            job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false");
+
+            //8G memory is enough for all global dict, because the input is sequential and we handle global dict slice by slice
+            job.getConfiguration().set("mapreduce.reduce.memory.mb", "8500");
+            job.getConfiguration().set("mapred.reduce.child.java.opts", "-Xmx8g");
+            //Copying global dict to working dir in GlobalDictHDFSStore maybe elapsed a long time (Maybe we could improve it)
+            //Waiting the global dict lock maybe also take a long time.
+            //So we set 8 hours here
+            job.getConfiguration().set("mapreduce.task.timeout", "28800000");
+
+            //allow user specially set config for uhc step
+            for (Map.Entry<String, String> entry : cube.getConfig().getUHCMRConfigOverride().entrySet()) {
+                job.getConfiguration().set(entry.getKey(), entry.getValue());
+            }
+
+            return waitForCompletion(job);
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    private void setupMapper() throws IOException {
+        job.setInputFormatClass(SequenceFileInputFormat.class);
+        job.setMapperClass(UHCDictionaryMapper.class);
+        job.setMapOutputKeyClass(SelfDefineSortableKey.class);
+        job.setMapOutputValueClass(NullWritable.class);
+    }
+
+    private void setupReducer(Path output, int numberOfReducers) throws IOException {
+        job.setReducerClass(UHCDictionaryReducer.class);
+        job.setPartitionerClass(UHCDictionaryPartitioner.class);
+        job.setNumReduceTasks(numberOfReducers);
+
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
+        FileOutputFormat.setOutputPath(job, output);
+        job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+        //prevent to create zero-sized default output
+        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+    @Override
+    public boolean isSkipped() {
+        return isSkipped;
+    }
+
+    public static void main(String[] args) throws Exception {
+        UHCDictionaryJob job = new UHCDictionaryJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
new file mode 100644
index 0000000..d9d7b60
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class UHCDictionaryMapper extends KylinMapper<NullWritable, Text, SelfDefineSortableKey, NullWritable> {
+    private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryMapper.class);
+
+    protected int index;
+    protected DataType type;
+
+    protected Text outputKey = new Text();
+    private ByteBuffer tmpBuf;
+    private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        tmpBuf = ByteBuffer.allocate(4096);
+
+        Configuration conf = context.getConfiguration();
+        bindCurrentConfiguration(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        CubeInstance cube = CubeManager.getInstance(config).getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
+        List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor());
+
+        FileSplit fileSplit = (FileSplit) context.getInputSplit();
+        String colName = fileSplit.getPath().getParent().getName();
+
+        for (int i = 0; i < uhcColumns.size(); i++) {
+            if (uhcColumns.get(i).getIdentity().equalsIgnoreCase(colName)) {
+                index = i;
+                break;
+            }
+        }
+        type = uhcColumns.get(index).getType();
+
+        //for debug
+        logger.info("column name: " + colName);
+        logger.info("index: " + index);
+        logger.info("type: " + type);
+    }
+
+    @Override
+    public void doMap(NullWritable key, Text value, Context context) throws IOException, InterruptedException {
+        tmpBuf.clear();
+        int size = value.getLength()+ 1;
+        if (size >= tmpBuf.capacity()) {
+            tmpBuf = ByteBuffer.allocate(countNewSize(tmpBuf.capacity(), size));
+        }
+        tmpBuf.put(Bytes.toBytes(index)[3]);
+        tmpBuf.put(value.getBytes(), 0, value.getLength());
+        outputKey.set(tmpBuf.array(), 0, tmpBuf.position());
+
+        sortableKey.init(outputKey, type);
+        context.write(sortableKey, NullWritable.get());
+    }
+
+    private int countNewSize(int oldSize, int dataSize) {
+        int newSize = oldSize * 2;
+        while (newSize < dataSize) {
+            newSize = newSize * 2;
+        }
+        return newSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java
new file mode 100644
index 0000000..5e8ffa6
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.util.BytesUtil;
+
+public class UHCDictionaryPartitioner extends Partitioner<SelfDefineSortableKey, NullWritable> {
+    @Override
+    public int getPartition(SelfDefineSortableKey skey, NullWritable value, int numReduceTasks) {
+        return BytesUtil.readUnsigned(skey.getText().getBytes(), 0, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
new file mode 100644
index 0000000..ce9b792
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer.DICT_FILE_POSTFIX;
+
+public class UHCDictionaryReducer extends KylinReducer<SelfDefineSortableKey, NullWritable, NullWritable, BytesWritable> {
+    private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryReducer.class);
+
+    private IDictionaryBuilder builder;
+    private TblColRef col;
+
+    private MultipleOutputs mos;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        Configuration conf = context.getConfiguration();
+        mos = new MultipleOutputs(context);
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeDesc cubeDesc = cube.getDescriptor();
+        List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cubeDesc);
+
+        int taskId = context.getTaskAttemptID().getTaskID().getId();
+        col = uhcColumns.get(taskId);
+        logger.info("column name: " + col.getIdentity());
+
+        if (cube.getDescriptor().getShardByColumns().contains(col)) {
+            //for ShardByColumns
+            builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+            builder.init(null, 0, null);
+        } else {
+            //for GlobalDictionaryColumns
+            String hdfsDir = conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
+            DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype());
+            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+            builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
+            builder.init(dictionaryInfo, 0, hdfsDir);
+        }
+    }
+
+    @Override
+    public void doReduce(SelfDefineSortableKey skey, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
+        Text key = skey.getText();
+        String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+        builder.addValue(value);
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        Dictionary<String> dict = builder.build();
+        outputDict(col, dict);
+    }
+
+    private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
+        // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+        String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
+            outputStream.writeUTF(dict.getClass().getName());
+            dict.write(outputStream);
+
+            mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName);
+        }
+        mos.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 619bf99..55eb719 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -48,6 +48,8 @@ kylin.storage.url=hbase
 # Working folder in HDFS, make sure user has the right access to the hdfs directory
 kylin.env.hdfs-working-dir=/kylin
 
+kylin.env.zookeeper-connect-string=sandbox.hortonworks.com
+
 # HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020
 # Leave empty if hbase running on same cluster with hive and mapreduce
 #kylin.storage.hbase.cluster-fs=
@@ -55,6 +57,9 @@ kylin.env.hdfs-working-dir=/kylin
 
 kylin.engine.mr.reduce-input-mb=500
 
+kylin.engine.mr.uhc-config-override.mapreduce.reduce.memory.mb=500
+kylin.engine.mr.uhc-config-override.mapred.reduce.child.java.opts=-Xmx400M
+
 
 ### JOB ###
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
index df2ebf7..c578a57 100644
--- a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
@@ -72,7 +72,7 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase {
         finishLatch.await();
 
         GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
-        builder.init(dictionaryInfo, 0);
+        builder.init(dictionaryInfo, 0, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
         builder.addValue("success");
         Dictionary<String> dict = builder.build();
 
@@ -108,7 +108,7 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase {
                 GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
                 startLatch.countDown();
 
-                builder.init(dictionaryInfo, 0);
+                builder.init(dictionaryInfo, 0, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
                 for (int i = 0; i < count; i++) {
                     builder.addValue(prefix + i);
                 }


Mime
View raw message