kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [17/27] incubator-kylin git commit: KYLIN-943 update InMemCubeBuilder to support TopN
Date Tue, 29 Sep 2015 01:04:08 GMT
KYLIN-943 update InMemCubeBuilder to support TopN


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d916ab48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d916ab48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d916ab48

Branch: refs/heads/2.x-staging
Commit: d916ab48c2a48558cb808eb2d97bca82942c3510
Parents: 2d14bee
Author: shaofengshi <shaofengshi@apache.org>
Authored: Tue Sep 22 10:51:39 2015 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Tue Sep 29 09:01:53 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/BuildCubeWithEngineTest.java      |  2 -
 .../cube/inmemcubing/InMemCubeBuilder.java      | 49 ++++++++++++++++++--
 .../engine/mr/steps/InMemCuboidMapper.java      | 10 ++++
 .../apache/kylin/engine/spark/SparkCubing.java  | 10 ++++
 .../test_kylin_cube_without_slr_desc.json       | 14 ------
 ...t_kylin_cube_without_slr_left_join_desc.json | 14 ------
 .../test_case_data/sandbox/kylin_job_conf.xml   |  4 ++
 7 files changed, 69 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d916ab48/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 71b5d11..da6917c 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -194,10 +194,8 @@ public class BuildCubeWithEngineTest {
         f.setTimeZone(TimeZone.getTimeZone("GMT"));
         long date1 = 0;
         long date2 = f.parse("2013-01-01").getTime();
-        long date3 = f.parse("2022-01-01").getTime();
         List<String> result = Lists.newArrayList();
         result.add(buildSegment("test_kylin_cube_topn", date1, date2));
-        result.add(buildSegment("test_kylin_cube_topn", date2, date3));
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d916ab48/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 2536250..3b426b9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -36,6 +37,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.Dictionary;
@@ -84,7 +86,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private CuboidResult baseResult;
     private Object[] totalSumForSanityCheck;
-    private ICuboidCollector resultCollector;
+    private ICuboidCollector resultCollector;    
+    private Map<Integer, Dictionary<String>> topNDisplayColDictMap;
+
 
     public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>>
dictionaryMap) {
         super(cubeDesc, dictionaryMap);
@@ -106,8 +110,25 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             measureIndexMap.put(measureDesc.getName(), i);
         }
         this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+
+        initTopNDisplayColDictionaryMap();
     }
 
+    private void initTopNDisplayColDictionaryMap() {
+        topNDisplayColDictMap = Maps.newHashMap();
+        for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++)
{
+            MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
+            FunctionDesc func = measureDesc.getFunction();
+            if (func.isTopN()) {
+                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+                int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
+                TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length
- 1);
+                Dictionary<String> dictionary = (Dictionary<String>)dictionaryMap.get(displayCol);
+                topNDisplayColDictMap.put(displayColIdx, dictionary);
+            }
+        }
+    }
+    
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
         GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
 
@@ -513,6 +534,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         GTInfo info;
         GTRecord record;
         BlockingQueue<List<String>> input;
+        ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
 
         public InputConverter(GTInfo info, BlockingQueue<List<String>> input)
{
             this.info = info;
@@ -597,11 +619,30 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                 Object value = null;
                 int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
                 FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
-                if (function.isCount() || function.isHolisticCountDistinct()) {
+                if (flatTableIdx == null) {
+                    value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+                }
+                else if (function.isCount() || function.isHolisticCountDistinct()) {
                     // note for holistic count distinct, this value will be ignored
                     value = ONE;
-                } else if (flatTableIdx == null) {
-                    value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+                } else if (function.isTopN()) {
+                    // encode the key column with dict, and get the counter column;
+                    int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
+                    Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
+                    int keyColEncoded = displayColDict.getIdFromValue(row.get(keyColIndex));
+                    valueBuf.clear();
+                    valueBuf.putInt(displayColDict.getSizeOfId());
+                    valueBuf.putInt(keyColEncoded);
+                    if (flatTableIdx.length == 1) {
+                        // only displayCol, use 1.0 as counter
+                        valueBuf.putDouble(1.0);
+                    } else {
+                        // get the counter column value
+                        valueBuf.putDouble(Double.valueOf(row.get(flatTableIdx[0])));
+                    }
+
+                    value = measureCodec.getSerializer(i).valueOf(valueBuf.array());
+
                 } else if (flatTableIdx.length == 1) {
                     value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d916ab48/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 93918cc..e69b8a3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -29,6 +29,7 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -77,6 +78,15 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN,
Object, ByteArr
                 }
             }
         }
+        
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isTopN()) {
+                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
+                TblColRef col = colRefs.get(colRefs.size() - 1);
+                dictionaryMap.put(col, cubeSegment.getDictionary(col));
+            }
+        }
+        
 
         DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
         ExecutorService executorService = Executors.newSingleThreadExecutor();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d916ab48/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index ab08bef..dd1fd99 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -314,6 +314,16 @@ public class SparkCubing extends AbstractSparkApplication {
                 }
             }
         }
+        
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isTopN()) {
+                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
+                TblColRef col = colRefs.get(colRefs.size() - 1);
+                dictionaryMap.put(col, cubeSegment.getDictionary(col));
+            }
+        }
+
+        
         final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new
PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>()
{
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d916ab48/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
index d9e895a..76675f5 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
@@ -152,19 +152,6 @@
     },
     {
       "id": 7,
-      "name": "SELLER_CNT_LONG",
-      "function": {
-        "expression": "COUNT_DISTINCT",
-        "parameter": {
-          "type": "column",
-          "value": "SELLER_ID"
-        },
-        "returntype": "bigint"
-      },
-      "dependent_measure_ref": "SELLER_CNT_HLL"
-    },
-    {
-      "id": 8,
       "name": "SELLER_FORMAT_CNT",
       "function": {
         "expression": "COUNT_DISTINCT",
@@ -258,7 +245,6 @@
               "gmv_min",
               "gmv_max",
               "trans_cnt",
-              "seller_cnt_long",
               "item_count_sum"
             ]
           }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d916ab48/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index db19c7b..d3be1c9 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -152,19 +152,6 @@
     },
     {
       "id": 7,
-      "name": "SELLER_CNT_LONG",
-      "function": {
-        "expression": "COUNT_DISTINCT",
-        "parameter": {
-          "type": "column",
-          "value": "SELLER_ID"
-        },
-        "returntype": "bigint"
-      },
-      "dependent_measure_ref": "SELLER_CNT_HLL"
-    },
-    {
-      "id": 8,
       "name": "SELLER_FORMAT_CNT",
       "function": {
         "expression": "COUNT_DISTINCT",
@@ -258,7 +245,6 @@
               "gmv_min",
               "gmv_max",
               "trans_cnt",
-              "seller_cnt_long",
               "item_count_sum"
             ]
           }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d916ab48/examples/test_case_data/sandbox/kylin_job_conf.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml b/examples/test_case_data/sandbox/kylin_job_conf.xml
index 69def6e..c851565 100644
--- a/examples/test_case_data/sandbox/kylin_job_conf.xml
+++ b/examples/test_case_data/sandbox/kylin_job_conf.xml
@@ -10,6 +10,10 @@
         </description>
     </property>
 
+    <property>
+        <name>mapreduce.map.maxattempts</name>
+        <value>2</value>
+    </property>
 
     <!--
     <property>


Mime
View raw message