kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [05/27] kylin git commit: KYLIN-2217 Reducers build dictionaries locally
Date Fri, 02 Dec 2016 13:18:58 GMT
KYLIN-2217 Reducers build dictionaries locally

Signed-off-by: Li Yang <liyang@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1af08e4b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1af08e4b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1af08e4b

Branch: refs/heads/KYLIN-1875
Commit: 1af08e4b8875d33bfc5dd124fed72d6042456c32
Parents: b1b90ad
Author: xiefan46 <958034172@qq.com>
Authored: Wed Nov 23 09:48:55 2016 +0800
Committer: Li Yang <liyang@apache.org>
Committed: Wed Nov 30 15:31:58 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   9 +-
 .../java/org/apache/kylin/cube/CubeManager.java |   3 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  |  25 ++-
 .../apache/kylin/dict/DictionaryManager.java    |  17 +-
 .../apache/kylin/dict/DictionaryProvider.java   |  28 ++++
 .../dict/DictionaryReducerLocalGenerator.java   | 156 +++++++++++++++++++
 .../dict/IDictionaryReducerLocalBuilder.java    |  31 ++++
 .../kylin/dict/DictionaryProviderTest.java      | 109 +++++++++++++
 .../storage/translate/ColumnValueRange.java     |   2 +-
 .../engine/mr/steps/CreateDictionaryJob.java    |  44 +++++-
 .../mr/steps/FactDistinctColumnsReducer.java    | 123 +++++++++++++--
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |  54 +++----
 .../storage/hbase/cube/v1/CubeStorageQuery.java |   6 +-
 13 files changed, 547 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7dcc771..766c04d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -719,7 +719,14 @@ abstract public class KylinConfigBase implements Serializable {
 
     //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
     public int getUHCReducerCount() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "3"));
+        return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "1"));
+    }
+
+    public boolean isReducerLocalBuildDict() {
+        if (getUHCReducerCount() != 1) {
+            return false;
+        }
+        return Boolean.parseBoolean(getOptional("kylin.engine.mr.reducer-local-build-dict",
"true"));
     }
 
     public String getYarnStatusCheckUrl() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index b4422d2..119a21a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -213,6 +213,7 @@ public class CubeManager implements IRealizationProvider {
         return result;
     }
 
+
     public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, ReadableTable
inpTable) throws IOException {
         CubeDesc cubeDesc = cubeSeg.getCubeDesc();
         if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
@@ -221,6 +222,7 @@ public class CubeManager implements IRealizationProvider {
         String builderClass = cubeDesc.getDictionaryBuilderClass(col);
         DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(cubeDesc.getModel(),
col, inpTable, builderClass);
 
+
         saveDictionaryInfo(cubeSeg, col, dictInfo);
         return dictInfo;
     }
@@ -266,7 +268,6 @@ public class CubeManager implements IRealizationProvider {
         } catch (IOException e) {
             throw new IllegalStateException("Failed to get dictionary for cube segment" +
cubeSeg + ", col" + col, e);
         }
-
         return (Dictionary<String>) info.getDictionaryObject();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index a6aeb96..a4e1df0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -22,11 +22,13 @@ import java.io.IOException;
 import java.util.Set;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.DictionaryProvider;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -44,21 +46,30 @@ public class DictionaryGeneratorCLI {
 
     private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
 
-    public static void processSegment(KylinConfig config, String cubeName, String segmentID,
DistinctColumnValuesProvider factTableValueProvider) throws IOException {
+    public static void processSegment(KylinConfig config, String cubeName, String segmentID,
DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws
IOException {
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeSegment segment = cube.getSegmentById(segmentID);
 
-        processSegment(config, segment, factTableValueProvider);
+        processSegment(config, segment, factTableValueProvider, dictProvider);
     }
 
-    private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider
factTableValueProvider) throws IOException {
+    private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider
factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
         CubeManager cubeMgr = CubeManager.getInstance(config);
 
         // dictionary
         for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
             logger.info("Building dictionary for " + col);
             ReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, factTableValueProvider);
-            cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+            if (config.isReducerLocalBuildDict() && dictProvider != null) {
+                Dictionary<String> dict = dictProvider.getDictionary(col);
+                if (dict != null) {
+                    cubeMgr.saveDictionary(cubeSeg, col, inpTable, dict);
+                } else {
+                    cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+                }
+            } else {
+                cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+            }
         }
 
         // snapshot
@@ -68,19 +79,19 @@ public class DictionaryGeneratorCLI {
             if (cubeSeg.getModel().isLookupTable(table))
                 toSnapshot.add(table.getTableIdentity());
         }
-        
+
         for (String tableIdentity : toSnapshot) {
             logger.info("Building snapshot of " + tableIdentity);
             cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity);
         }
     }
-    
+
     private static ReadableTable decideInputTable(DataModelDesc model, TblColRef col, DistinctColumnValuesProvider
factTableValueProvider) {
         KylinConfig config = model.getConfig();
         DictionaryManager dictMgr = DictionaryManager.getInstance(config);
         TblColRef srcCol = dictMgr.decideSourceData(model, col);
         String srcTable = srcCol.getTable();
-        
+
         ReadableTable inpTable;
         if (model.isFactTable(srcTable)) {
             inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 6178234..0caef14 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -87,6 +87,7 @@ public class DictionaryManager {
     private KylinConfig config;
     private LoadingCache<String, DictionaryInfo> dictCache; // resource
 
+
     // path ==>
     // DictionaryInfo
 
@@ -275,10 +276,12 @@ public class DictionaryManager {
         return buildDictionary(model, col, inpTable, null);
     }
 
+
     public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, ReadableTable
inpTable, String builderClass) throws IOException {
         if (inpTable.exists() == false)
             return null;
 
+
         logger.info("building dictionary for " + col);
 
         DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable);
@@ -291,6 +294,12 @@ public class DictionaryManager {
         logger.info("Building dictionary object " + JsonUtil.writeValueAsString(dictInfo));
 
         Dictionary<String> dictionary;
+        dictionary = buildDictFromReadableTable(inpTable, dictInfo, builderClass, col);
+        return trySaveNewDict(dictionary, dictInfo);
+    }
+
+    private Dictionary<String> buildDictFromReadableTable(ReadableTable inpTable, DictionaryInfo
dictInfo, String builderClass, TblColRef col) throws IOException {
+        Dictionary<String> dictionary;
         IDictionaryValueEnumerator columnValueEnumerator = null;
         try {
             columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(),
dictInfo.getSourceColumnIndex());
@@ -304,7 +313,7 @@ public class DictionaryManager {
             if (columnValueEnumerator != null)
                 columnValueEnumerator.close();
         }
-        return trySaveNewDict(dictionary, dictInfo);
+        return dictionary;
     }
 
     public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, ReadableTable
inpTable, Dictionary<String> dictionary) throws IOException {
@@ -336,19 +345,19 @@ public class DictionaryManager {
         // FK on fact table and join type is inner, use PK from lookup instead
         if (model.isFactTable(col.getTable()) == false)
             return col;
-        
+
         // find a lookup table that the col joins as FK
         for (TableRef lookup : model.getLookupTables()) {
             JoinDesc lookupJoin = model.getJoinByPKSide(lookup);
             int find = ArrayUtils.indexOf(lookupJoin.getForeignKeyColumns(), col);
             if (find < 0)
                 continue;
-        
+
             // make sure the joins are all inner up to the root
             if (isAllInnerJoinsToRoot(model, lookupJoin))
                 return lookupJoin.getPrimaryKeyColumns()[find];
         }
-        
+
         return col;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
new file mode 100644
index 0000000..6387535
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * Created by xiefan on 16-11-23.
+ */
+public interface DictionaryProvider {
+    public Dictionary<String> getDictionary(TblColRef col);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
new file mode 100644
index 0000000..35d379a
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.datatype.DataType;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+/**
+ * Created by xiefan on 16-11-16.
+ *
+ * TODO:sample,mergeDict
+ */
+public class DictionaryReducerLocalGenerator {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DictionaryReducerLocalGenerator.class);
+
+    private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd"
};
+
+    public static IDictionaryReducerLocalBuilder getBuilder(DataType dataType) {
+        Preconditions.checkNotNull(dataType, "dataType cannot be null");
+
+        IDictionaryReducerLocalBuilder builder;
+        if (dataType.isDateTimeFamily()) {
+            if (dataType.isDate())
+                builder = new DateDictBuilder();
+            else
+                builder = new TimeDictBuilder();
+        } else if (dataType.isNumberFamily()) {
+            builder = new NumberDictBuilder(0);
+        } else {
+            builder = new StringDictBuilder(0);
+        }
+        return builder;
+    }
+
+    private static class DateDictBuilder implements IDictionaryReducerLocalBuilder {
+
+        private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd"
};
+
+        private String matchPattern = null;
+
+        private boolean isRecognizeFormat = false;
+
+        private SimpleDateFormat sdf;
+
+        @Override
+        public Dictionary<String> build(int baseId) throws Exception {
+            if (isRecognizeFormat) {
+                return new DateStrDictionary(matchPattern, baseId);
+            } else {
+                throw new IllegalStateException("Date format not match");
+            }
+        }
+
+        @Override
+        public void addValue(String value) throws Exception {
+            if (matchPattern == null) { //init match pattern
+                for (String ptn : DATE_PATTERNS) {
+                    matchPattern = ptn;
+                    SimpleDateFormat sdf = new SimpleDateFormat(ptn);
+                    try {
+                        sdf.parse(value);
+                        isRecognizeFormat = true;
+                        break;
+                    } catch (ParseException e) {
+
+                    }
+                }
+                sdf = new SimpleDateFormat(matchPattern);
+            }
+            if (!isRecognizeFormat) {
+                throw new IllegalStateException("Date format not match");
+            }
+            try {
+                sdf.parse(value);
+            } catch (ParseException e) {
+                isRecognizeFormat = false;
+                logger.info("Unrecognized date value: " + value);
+            }
+        }
+
+    }
+
+    private static class TimeDictBuilder implements IDictionaryReducerLocalBuilder {
+
+        @Override
+        public Dictionary<String> build(int baseId) {
+            return new TimeStrDictionary();
+        }
+
+        @Override
+        public void addValue(String value) {
+
+        }
+
+    }
+
+    private static class StringDictBuilder implements IDictionaryReducerLocalBuilder {
+
+        private TrieDictionaryForestBuilder<String> builder;
+
+        public StringDictBuilder(int baseId) {
+            builder = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(),
0);
+        }
+
+        @Override
+        public Dictionary<String> build(int baseId) {
+            return builder.build();
+        }
+
+        @Override
+        public void addValue(String value) {
+            builder.addValue(value);
+        }
+
+    }
+
+    public static class NumberDictBuilder implements IDictionaryReducerLocalBuilder {
+
+        private NumberDictionaryForestBuilder builder;
+
+        public NumberDictBuilder(int baseId) {
+            builder = new NumberDictionaryForestBuilder(baseId);
+        }
+
+        @Override
+        public Dictionary<String> build(int baseId) {
+            return builder.build();
+        }
+
+        @Override
+        public void addValue(String value) {
+            builder.addValue(value);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
new file mode 100644
index 0000000..19b1d28
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Dictionary;
+
+/**
+ * Created by xiefan on 16-11-16.
+ */
+public interface IDictionaryReducerLocalBuilder {
+    Dictionary<String> build(int baseId) throws Exception;
+
+    void addValue(String value) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
new file mode 100644
index 0000000..0225737
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
@@ -0,0 +1,109 @@
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.ParameterizedType;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Created by xiefan on 16-11-23.
+ */
+public class DictionaryProviderTest {
+
+    @Test
+    public void testReadWrite() throws Exception{
+        //string dict
+        Dictionary<String> dict = getDict(DataType.getType("string"),
+                Arrays.asList(new String[]{"a","b"}).iterator());
+        readWriteTest(dict);
+        //number dict
+        Dictionary<String> dict2 = getDict(DataType.getType("long"),
+                Arrays.asList(new String[]{"1","2"}).iterator());
+        readWriteTest(dict2);
+
+        //date dict
+        Dictionary<String> dict3 = getDict(DataType.getType("datetime"),
+                Arrays.asList(new String[]{"20161122","20161123"}).iterator());
+        readWriteTest(dict3);
+
+        //date dict
+        Dictionary<String> dict4 = getDict(DataType.getType("datetime"),
+                Arrays.asList(new String[]{"2016-11-22","2016-11-23"}).iterator());
+        readWriteTest(dict4);
+
+        //date dict
+        try {
+            Dictionary<String> dict5 = getDict(DataType.getType("date"),
+                    Arrays.asList(new String[]{"2016-11-22", "20161122"}).iterator());
+            readWriteTest(dict5);
+            fail("Date format not correct.Should throw exception");
+        }catch (IllegalStateException e){
+            //correct
+        }
+    }
+
+    @Test
+    public void testReadWriteTime(){
+        System.out.println(Long.MAX_VALUE);
+        System.out.println(Long.MIN_VALUE);
+    }
+
+
+    private Dictionary<String> getDict(DataType type, Iterator<String> values)
throws Exception{
+        IDictionaryReducerLocalBuilder builder = DictionaryReducerLocalGenerator.getBuilder(type);
+        while(values.hasNext()){
+            builder.addValue(values.next());
+        }
+        return builder.build(0);
+    }
+
+    private void readWriteTest(Dictionary<String> dict) throws Exception{
+        final String path = "src/test/resources/dict/tmp_dict";
+        File f = new File(path);
+        f.deleteOnExit();
+        f.createNewFile();
+        String dictClassName = dict.getClass().getName();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+        out.writeUTF(dictClassName);
+        dict.write(out);
+        out.close();
+        //read dict
+        DataInputStream in = null;
+        Dictionary<String> dict2 = null;
+        try {
+            File f2 = new File(path);
+            in = new DataInputStream(new FileInputStream(f2));
+            String dictClassName2 = in.readUTF();
+            dict2 = (Dictionary<String>) ClassUtil.newInstance(dictClassName2);
+            dict2.readFields(in);
+        }catch(IOException e){
+            e.printStackTrace();
+        }finally {
+            if(in != null){
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        assertTrue(dict.equals(dict2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
index 0dc1afa..56b1106 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
@@ -168,7 +168,7 @@ public class ColumnValueRange {
 
     // remove invalid EQ/IN values and round start/end according to dictionary
     public void preEvaluateWithDict(Dictionary<String> dict) {
-        if (dict == null)
+        if (dict == null || dict.getSize() == 0)
             return;
 
         if (equalValues != null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 5d7cb21..63005f9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -19,15 +19,25 @@
 package org.apache.kylin.engine.mr.steps;
 
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
+import org.apache.kylin.dict.DictionaryProvider;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.SortedColumnDFSFile;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ReadableTable;
 
+import java.io.IOException;
+
 /**
  * @author ysong1
  */
@@ -48,13 +58,45 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
         final String segmentID = getOptionValue(OPTION_SEGMENT_ID);
         final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
 
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
 
         DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider()
{
             @Override
             public ReadableTable getDistinctValuesFor(TblColRef col) {
                 return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getName(),
col.getType());
             }
+        }, new DictionaryProvider() {
+
+            @Override
+            public Dictionary<String> getDictionary(TblColRef col) {
+                if (!config.isReducerLocalBuildDict()) {
+                    return null;
+                }
+                FSDataInputStream is = null;
+                try {
+                    Path colDir = new Path(factColumnsInputPath, col.getName());
+                    Path outputFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+                    Configuration conf = HadoopUtil.getCurrentConfiguration();
+                    FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName());
+                    is = fs.open(outputFile);
+                    String dictClassName = is.readUTF();
+                    Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
+                    dict.readFields(is);
+                    logger.info("DictionaryProvider read dict form file : " + outputFile.getName());
+                    return dict;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    return null;
+                } finally {
+                    if (is != null) {
+                        try {
+                            is.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            }
         });
 
         return returnCode;

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 6e24d61..5511626 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,14 +38,19 @@ import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryReducerLocalGenerator;
+import org.apache.kylin.dict.IDictionaryReducerLocalBuilder;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsWriter;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,10 +80,19 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
 
     protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
 
+    //local build dict
+    private boolean isReducerLocalBuildDict;
+    private IDictionaryReducerLocalBuilder builder;
+    private FastDateFormat dateFormat;
+    private long timeMaxValue = Long.MIN_VALUE;
+    private long timeMinValue = Long.MAX_VALUE;
+    public static final String DICT_FILE_POSTFIX = ".RLD";
+    public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".PCI";
+    private boolean isPartitionCol = false;
+
     @Override
     protected void setup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
-
         Configuration conf = context.getConfiguration();
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
@@ -102,14 +118,36 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         } else if (collectStatistics && (taskId == numberOfTasks - 2)) {
             // partition col
             isStatistics = false;
+            isPartitionCol = true;
             col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
             colValues = Lists.newLinkedList();
+            DataType partitionColType = col.getType();
+            if (partitionColType.isDate()) {
+                dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+            } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+                dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+            } else if (partitionColType.isStringFamily()) {
+                String partitionDateFormat = cubeDesc.getModel().getPartitionDesc().getPartitionDateFormat();
+                if (StringUtils.isEmpty(partitionDateFormat)) {
+                    partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+                }
+                dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+            } else {
+                throw new IllegalStateException("Type " + partitionColType + " is not valid
partition column type");
+            }
         } else {
             // col
             isStatistics = false;
             col = columnList.get(ReducerIdToColumnIndex.get(taskId));
             colValues = Lists.newLinkedList();
         }
+
+        //local build dict
+        isReducerLocalBuildDict = config.isReducerLocalBuildDict();
+        if (col != null && isReducerLocalBuildDict) {
+            builder = DictionaryReducerLocalGenerator.getBuilder(col.getType());
+        }
+
     }
 
     private void initReducerIdToColumnIndex(KylinConfig config) throws IOException {
@@ -150,11 +188,26 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
                 }
             }
         } else {
-            colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
-            if (colValues.size() == 1000000) { //spill every 1 million
-                logger.info("spill values to disk...");
-                outputDistinctValues(col, colValues, context);
-                colValues.clear();
+            if (isReducerLocalBuildDict) {
+                String value = new String(key.getBytes(), 1, key.getLength() - 1);
+                //partition col
+                try {
+                    if (isPartitionCol) {
+                        long time = dateFormat.parse(value).getTime();
+                        timeMinValue = Math.min(timeMinValue, time);
+                        timeMaxValue = Math.max(timeMaxValue, time);
+                    }
+                    builder.addValue(value);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            } else {
+                colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength()
- 1)));
+                if (colValues.size() == 1000000) { //spill every 1 million
+                    logger.info("spill values to disk...");
+                    outputDistinctValues(col, colValues, context);
+                    colValues.clear();
+                }
             }
         }
 
@@ -191,12 +244,64 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         }
     }
 
+    private void outputDict(TblColRef col, Dictionary<String> dict, Context context)
throws IOException {
+        final String fileName = col.getName() + DICT_FILE_POSTFIX;
+        FSDataOutputStream out = getOutputStream(context, fileName);
+        try {
+            String dictClassName = dict.getClass().getName();
+            out.writeUTF(dictClassName);
+            dict.write(out);
+            logger.info("reducer id is:+" + taskId + " colName:" + col.getName() + "  writing
dict at file : " + fileName + "  dict class:" + dictClassName);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    private void outputPartitionInfo(Context context) throws IOException {
+        final String fileName = col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
+        FSDataOutputStream out = getOutputStream(context, fileName);
+        try {
+            out.writeLong(timeMinValue);
+            out.writeLong(timeMaxValue);
+            logger.info("write partition info for col : " + col.getName() + "  minValue:"
+ timeMinValue + " maxValue:" + timeMaxValue);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws
IOException {
+        final Configuration conf = context.getConfiguration();
+        final FileSystem fs = FileSystem.get(conf);
+        final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
+        final Path colDir = new Path(outputPath, col.getName());
+        final Path outputFile = new Path(colDir, outputFileName);
+        FSDataOutputStream out = null;
+        if (!fs.exists(colDir)) {
+            fs.mkdirs(colDir);
+        }
+        fs.deleteOnExit(outputFile);
+        out = fs.create(outputFile);
+        return out;
+    }
+
     @Override
     protected void doCleanup(Context context) throws IOException, InterruptedException {
         if (isStatistics == false) {
-            if (colValues.size() > 0) {
-                outputDistinctValues(col, colValues, context);
-                colValues.clear();
+            if (isReducerLocalBuildDict) {
+                try {
+                    if (isPartitionCol) {
+                        outputPartitionInfo(context);
+                    }
+                    Dictionary<String> dict = builder.build(0);
+                    outputDict(col, dict, context);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            } else {
+                if (colValues.size() > 0) {
+                    outputDistinctValues(col, colValues, context);
+                    colValues.clear();
+                }
             }
         } else {
             //output the hll info;

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index eb06f07..977196c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -19,25 +19,23 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.text.ParseException;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.kylin.common.util.DateFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.SortedColumnDFSFile;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,39 +81,25 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-        final DataType partitionColType = partitionCol.getType();
-        final FastDateFormat dateFormat;
-        if (partitionColType.isDate()) {
-            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
-        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
-            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
-        } else if (partitionColType.isStringFamily()) {
-            String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
-            if (StringUtils.isEmpty(partitionDateFormat)) {
-                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
-            }
-            dateFormat = DateFormat.getDateFormat(partitionDateFormat);
-        } else {
-            throw new IllegalStateException("Type " + partitionColType + " is not valid partition
column type");
-        }
-
-        final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        //final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(),
-1);
-        final ReadableTable readableTable = new SortedColumnDFSFile(factDistinctPath + "/"
+ partitionCol.getName(), partitionCol.getType());
-        final ReadableTable.TableReader tableReader = readableTable.getReader();
+        final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+        Path colDir = new Path(factColumnsInputPath, partitionCol.getName());
+        Path outputFile = new Path(colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
+        Configuration conf = HadoopUtil.getCurrentConfiguration();
+        FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName());
+        FSDataInputStream is = null;
         long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
         try {
-            while (tableReader.next()) {
-                long time = dateFormat.parse(tableReader.getRow()[0]).getTime();
-                minValue = Math.min(minValue, time);
-                maxValue = Math.max(maxValue, time);
-            }
-        } catch (ParseException e) {
+            is = fs.open(outputFile);
+            long min = is.readLong();
+            long max = is.readLong();
+            minValue = Math.min(min, minValue);
+            maxValue = Math.max(max, maxValue);
+        } catch (IOException e) {
             throw new IOException(e);
         } finally {
-            IOUtils.closeQuietly(tableReader);
+            IOUtils.closeQuietly(is);
         }
-
+        logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue);
         segment.setDateRangeStart(minValue);
         segment.setDateRangeEnd(maxValue);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1af08e4b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 9af0faf..02aa64a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -437,7 +437,11 @@ public class CubeStorageQuery implements IStorageQuery {
         // build row key range for each cube segment
         StringBuilder sb = new StringBuilder("hbasekeyrange trace: ");
         for (CubeSegment cubeSeg : segs) {
-
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords()
== 0) {
+                logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
+                continue;
+            }
             // consider derived (lookup snapshot), filter on dimension may
             // differ per segment
             List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter,
cubeSeg);


Mime
View raw message