kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [2/2] kylin git commit: KYLIN-976 Extract Rewrite related methdos
Date Wed, 02 Dec 2015 03:15:34 GMT
KYLIN-976 Extract Rewrite related methdos


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/63f376a5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/63f376a5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/63f376a5

Branch: refs/heads/KYLIN-976
Commit: 63f376a5aaea84fa66e02ef27b50f11c008790b8
Parents: 6c85ead
Author: Li, Yang <yangli9@ebay.com>
Authored: Tue Dec 1 11:39:31 2015 +0800
Committer: Li, Yang <yangli9@ebay.com>
Committed: Tue Dec 1 11:39:31 2015 +0800

----------------------------------------------------------------------
 .../kylin/cube/CubeCapabilityChecker.java       |   2 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   2 +-
 .../apache/kylin/measure/MeasureIngester.java   |   6 +-
 .../org/apache/kylin/measure/MeasureType.java   |  10 +-
 .../kylin/measure/basic/BasicMeasureType.java   |  11 ++
 .../kylin/measure/basic/BigDecimalIngester.java |   5 -
 .../kylin/measure/basic/DoubleIngester.java     |   5 -
 .../kylin/measure/basic/LongIngester.java       |   5 -
 .../kylin/measure/hllc/HLLCMeasureType.java     |  16 +-
 .../measure/hllc/HLLDistinctCountAggFunc.java   | 153 +++++++++++++++++++
 .../kylin/measure/topn/TopNMeasureType.java     |  10 ++
 .../kylin/metadata/model/FunctionDesc.java      |  39 +++--
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |   2 +-
 .../kylin/query/relnode/OLAPAggregateRel.java   |  13 +-
 .../apache/kylin/query/schema/OLAPTable.java    |  10 +-
 .../query/sqlfunc/HLLDistinctCountAggFunc.java  | 152 ------------------
 17 files changed, 237 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 0c1b3c9..624bb0b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -191,7 +191,7 @@ public class CubeCapabilityChecker {
             if (unmatchedDimensions.isEmpty() && unmatchedAggregations.isEmpty())
                 break;
             
-            MeasureType measureType = MeasureType.create(measure.getFunction());
+            MeasureType measureType = measure.getFunction().getMeasureType();
             if (measureType instanceof BasicMeasureType)
                 continue;
             

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 3e8ee13..050aef2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -819,7 +819,7 @@ public class CubeDesc extends RootPersistentEntity {
         }
 
         for (MeasureDesc measure : measures) {
-            MeasureType aggrType = MeasureType.create(measure.getFunction());
+            MeasureType aggrType = measure.getFunction().getMeasureType();
             result.addAll(aggrType.getColumnsNeedDictionary(measure));
         }
         return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
index 9c7b406..bc387fe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 abstract public class MeasureIngester<V> {
     
     public static MeasureIngester<?> create(MeasureDesc measure) {
-        return MeasureType.create(measure.getFunction()).newIngester();
+        return measure.getFunction().getMeasureType().newIngester();
     }
     
     public static MeasureIngester<?>[] create(Collection<MeasureDesc> measures)
{
@@ -42,5 +42,7 @@ abstract public class MeasureIngester<V> {
 
     abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef,
Dictionary<String>> dictionaryMap);
     
-    abstract public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef,
Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts);
+    public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>>
oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 8e7de6f..0891d1e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -45,10 +45,6 @@ abstract public class MeasureType {
         factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory());
     }
     
-    public static MeasureType create(FunctionDesc function) {
-        return create(function.getExpression(), function.getReturnType());
-    }
-
     public static MeasureType create(String funcName, String dataType) {
         funcName = funcName.toUpperCase();
         dataType = dataType.toLowerCase();
@@ -102,6 +98,12 @@ abstract public class MeasureType {
      * Query
      * ---------------------------------------------------------------------------- */
     
+    // TODO support user defined calcite aggr function
+    
+    abstract public boolean needRewrite();
+    
+    abstract public Class<?> getRewriteAggregationFunctionClass();
+    
     /* ============================================================================
      * Storage
      * ---------------------------------------------------------------------------- */

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index f314870..a6b36bb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -115,5 +115,16 @@ public class BasicMeasureType extends MeasureType {
     private boolean isMin() {
         return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
     }
+
+    @Override
+    public boolean needRewrite() {
+        return !isSum();
+    }
+
+    @Override
+    public Class<?> getRewriteAggregationFunctionClass() {
+        // TODO Auto-generated method stub
+        return null;
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
index ea1495c..721ba00 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -38,9 +38,4 @@ public class BigDecimalIngester extends MeasureIngester<BigDecimal>
{
         else
             return new BigDecimal(values[0]);
     }
-
-    @Override
-    public BigDecimal reEncodeDictionary(BigDecimal value, MeasureDesc measureDesc, Map<TblColRef,
Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict)
{
-        throw new UnsupportedOperationException();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index aaa754a..70ca727 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -43,9 +43,4 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable>
{
             l.set(Double.parseDouble(values[0]));
         return l;
     }
-
-    @Override
-    public DoubleMutable reEncodeDictionary(DoubleMutable value, MeasureDesc measureDesc,
Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>>
newDict) {
-        throw new UnsupportedOperationException();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index bdc1704..2547162 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -43,9 +43,4 @@ public class LongIngester extends MeasureIngester<LongMutable> {
             l.set(Long.parseLong(values[0]));
         return l;
     }
-
-    @Override
-    public LongMutable reEncodeDictionary(LongMutable value, MeasureDesc measureDesc, Map<TblColRef,
Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict)
{
-        throw new UnsupportedOperationException();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 4a73478..fd71c00 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -73,11 +73,6 @@ public class HLLCMeasureType extends MeasureType {
                     hllc.add(v == null ? "__nUlL__" : v);
                 return hllc;
             }
-
-            @Override
-            public HyperLogLogPlusCounter reEncodeDictionary(HyperLogLogPlusCounter value,
MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef,
Dictionary<String>> newDict) {
-                throw new UnsupportedOperationException();
-            }
         };
     }
 
@@ -89,4 +84,15 @@ public class HLLCMeasureType extends MeasureType {
             return new LDCAggregator();
     }
 
+    @Override
+    public boolean needRewrite() {
+        return true;
+    }
+
+    @Override
+    public Class<?> getRewriteAggregationFunctionClass() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
new file mode 100644
index 0000000..b00da5f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xjiang
+ */
+public class HLLDistinctCountAggFunc {
+
+    private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
+
+    public static HyperLogLogPlusCounter init() {
+        return null;
+    }
+
+    public static HyperLogLogPlusCounter initAdd(Object v) {
+        if (v instanceof Long) { // holistic case
+            long l = (Long) v;
+            return new FixedValueHLLCMockup(l);
+        } else {
+            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+            return new HyperLogLogPlusCounter(c);
+        }
+    }
+
+    public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
+        if (v instanceof Long) { // holistic case
+            long l = (Long) v;
+            if (counter == null) {
+                return new FixedValueHLLCMockup(l);
+            } else {
+                if (!(counter instanceof FixedValueHLLCMockup))
+                    throw new IllegalStateException("counter is not FixedValueHLLCMockup");
+
+                ((FixedValueHLLCMockup) counter).set(l);
+                return counter;
+            }
+        } else {
+            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+            if (counter == null) {
+                return new HyperLogLogPlusCounter(c);
+            } else {
+                counter.merge(c);
+                return counter;
+            }
+        }
+    }
+
+    public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1)
{
+        return add(counter0, counter1);
+    }
+
+    public static long result(HyperLogLogPlusCounter counter) {
+        return counter == null ? 0L : counter.getCountEstimate();
+    }
+
+    @SuppressWarnings("serial")
+    private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
+
+        private Long value = null;
+
+        FixedValueHLLCMockup(long value) {
+            this.value = value;
+        }
+
+        public void set(long value) {
+            if (this.value == null) {
+                this.value = value;
+            } else {
+                long oldValue = Math.abs(this.value.longValue());
+                long take = Math.max(oldValue, value);
+                logger.warn("Error to aggregate holistic count distinct, old value " + oldValue
+ ", new value " + value + ", taking " + take);
+                this.value = -take; // make it obvious that this value is wrong
+            }
+        }
+
+        @Override
+        public void clear() {
+            this.value = null;
+        }
+
+        @Override
+        protected void add(long hash) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void merge(HyperLogLogPlusCounter another) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getCountEstimate() {
+            return value;
+        }
+
+        @Override
+        public void writeRegisters(ByteBuffer out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void readRegisters(ByteBuffer in) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = super.hashCode();
+            result = prime * result + (int) (value ^ (value >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (!super.equals(obj))
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
+            if (!value.equals(other.value))
+                return false;
+            return true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index d6c5a6f..77319be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -167,4 +167,14 @@ public class TopNMeasureType extends MeasureType {
             return null;
     }
 
+    @Override
+    public boolean needRewrite() {
+        return false;
+    }
+
+    @Override
+    public Class<?> getRewriteAggregationFunctionClass() {
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 0c36873..17debda 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -51,6 +52,7 @@ public class FunctionDesc {
     private String returnType;
 
     private DataType returnDataType;
+    private MeasureType measureType;
     private boolean isDimensionAsMetric = false;
 
     public void init(TableDesc factTable) {
@@ -77,6 +79,23 @@ public class FunctionDesc {
             setReturnType(colRefs.get(0).getDatatype());
         }
     }
+    
+    public MeasureType getMeasureType() {
+        if (isDimensionAsMetric)
+            return null;
+        
+        if (measureType == null) {
+            measureType = MeasureType.create(getExpression(), getReturnType());
+        }
+        return measureType;
+    }
+
+    public boolean needRewrite() {
+        if (isDimensionAsMetric)
+            return false;
+        
+        return getMeasureType().needRewrite();
+    }
 
     public String getRewriteFieldName() {
         if (isSum()) {
@@ -88,14 +107,19 @@ public class FunctionDesc {
         }
     }
 
-    public boolean needRewrite() {
-        return !isSum() && !isDimensionAsMetric() && !isTopN();
+    public DataType getRewriteFieldType() {
+        if (isCountDistinct() || isTopN())
+            return DataType.ANY;
+        else if (isSum() || isMax() || isMin())
+            return parameter.getColRefs().get(0).getType();
+        else
+            return returnDataType;
     }
 
     public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
         ColumnDesc fakeCol = new ColumnDesc();
         fakeCol.setName(getRewriteFieldName());
-        fakeCol.setDatatype(getSQLType().toString());
+        fakeCol.setDatatype(getRewriteFieldType().toString());
         if (isCount())
             fakeCol.setNullable(false);
         fakeCol.init(sourceTable);
@@ -179,15 +203,6 @@ public class FunctionDesc {
         return count;
     }
     
-    public DataType getSQLType() {
-        if (isCountDistinct() || isTopN())
-            return DataType.ANY;
-        else if (isSum() || isMax() || isMin())
-            return parameter.getColRefs().get(0).getType();
-        else
-            return returnDataType;
-    }
-
     public String getReturnType() {
         return returnType;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b4682dd..bb0c073 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -122,7 +122,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object,
Object, By
         dictMeasures = Lists.newArrayList();
         for (int i = 0; i < measureDescs.size(); i++) {
             MeasureDesc measureDesc = measureDescs.get(i);
-            MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+            MeasureType measureType = measureDesc.getFunction().getMeasureType();
             if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
                 dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 4fc7236..401237b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -124,7 +124,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text,
Text> {
         dictMeasures = Lists.newArrayList();
         for (int i = 0; i < measureDescs.size(); i++) {
             MeasureDesc measureDesc = measureDescs.get(i);
-            MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+            MeasureType measureType = measureDesc.getFunction().getMeasureType();
             if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
                 dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index cbc0c56..9aa70ca 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -59,7 +59,6 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.query.sqlfunc.HLLDistinctCountAggFunc;
 
 import com.google.common.base.Preconditions;
 
@@ -301,10 +300,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         // rebuild function
         RelDataType fieldType = aggCall.getType();
         SqlAggFunction newAgg = aggCall.getAggregation();
-        if (func.isCountDistinct()) {
-            newAgg = createHyperLogLogAggFunction(fieldType);
-        } else if (func.isCount()) {
+        if (func.isCount()) {
             newAgg = SqlStdOperatorTable.SUM0;
+        } else if (func.getMeasureType().getRewriteAggregationFunctionClass() != null) {
+            newAgg = createCustomAggFunction(fieldType, func.getMeasureType().getRewriteAggregationFunctionClass());
         }
 
         // rebuild aggregate call
@@ -312,10 +311,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         return newAggCall;
     }
 
-    private SqlAggFunction createHyperLogLogAggFunction(RelDataType returnType) {
+    private SqlAggFunction createCustomAggFunction(RelDataType returnType, Class<?>
customAggFuncClz) {
         RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
-        SqlIdentifier sqlIdentifier = new SqlIdentifier("HLL_COUNT", new SqlParserPos(1,
1));
-        AggregateFunction aggFunction = AggregateFunctionImpl.create(HLLDistinctCountAggFunc.class);
+        SqlIdentifier sqlIdentifier = new SqlIdentifier(customAggFuncClz.getSimpleName(),
new SqlParserPos(1, 1));
+        AggregateFunction aggFunction = AggregateFunctionImpl.create(customAggFuncClz);
         List<RelDataType> argTypes = new ArrayList<RelDataType>();
         List<SqlTypeFamily> typeFamilies = new ArrayList<SqlTypeFamily>();
         for (FunctionParameter o : aggFunction.getParameters()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
index 94a91bf..a8789ea 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
@@ -184,11 +184,11 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
         HashSet<ColumnDesc> updateColumns = Sets.newHashSet();
         for (MeasureDesc m : mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity()))
{
             if (m.getFunction().isSum()) {
-                FunctionDesc functionDesc = m.getFunction();
-                if (functionDesc.getReturnDataType() != functionDesc.getSQLType() &&
//
-                        functionDesc.getReturnDataType().isBigInt() && //
-                        functionDesc.getSQLType().isIntegerFamily()) {
-                    updateColumns.add(functionDesc.getParameter().getColRefs().get(0).getColumnDesc());
+                FunctionDesc func = m.getFunction();
+                if (func.getReturnDataType() != func.getRewriteFieldType() && //
+                        func.getReturnDataType().isBigInt() && //
+                        func.getRewriteFieldType().isIntegerFamily()) {
+                    updateColumns.add(func.getParameter().getColRefs().get(0).getColumnDesc());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
deleted file mode 100644
index 7881c42..0000000
--- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.sqlfunc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- */
-public class HLLDistinctCountAggFunc {
-
-    private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
-
-    public static HyperLogLogPlusCounter init() {
-        return null;
-    }
-
-    public static HyperLogLogPlusCounter initAdd(Object v) {
-        if (v instanceof Long) { // holistic case
-            long l = (Long) v;
-            return new FixedValueHLLCMockup(l);
-        } else {
-            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
-            return new HyperLogLogPlusCounter(c);
-        }
-    }
-
-    public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
-        if (v instanceof Long) { // holistic case
-            long l = (Long) v;
-            if (counter == null) {
-                return new FixedValueHLLCMockup(l);
-            } else {
-                if (!(counter instanceof FixedValueHLLCMockup))
-                    throw new IllegalStateException("counter is not FixedValueHLLCMockup");
-
-                ((FixedValueHLLCMockup) counter).set(l);
-                return counter;
-            }
-        } else {
-            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
-            if (counter == null) {
-                return new HyperLogLogPlusCounter(c);
-            } else {
-                counter.merge(c);
-                return counter;
-            }
-        }
-    }
-
-    public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1)
{
-        return add(counter0, counter1);
-    }
-
-    public static long result(HyperLogLogPlusCounter counter) {
-        return counter == null ? 0L : counter.getCountEstimate();
-    }
-
-    private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
-
-        private Long value = null;
-
-        FixedValueHLLCMockup(long value) {
-            this.value = value;
-        }
-
-        public void set(long value) {
-            if (this.value == null) {
-                this.value = value;
-            } else {
-                long oldValue = Math.abs(this.value.longValue());
-                long take = Math.max(oldValue, value);
-                logger.warn("Error to aggregate holistic count distinct, old value " + oldValue
+ ", new value " + value + ", taking " + take);
-                this.value = -take; // make it obvious that this value is wrong
-            }
-        }
-
-        @Override
-        public void clear() {
-            this.value = null;
-        }
-
-        @Override
-        protected void add(long hash) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void merge(HyperLogLogPlusCounter another) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public long getCountEstimate() {
-            return value;
-        }
-
-        @Override
-        public void writeRegisters(ByteBuffer out) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void readRegisters(ByteBuffer in) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = super.hashCode();
-            result = prime * result + (int) (value ^ (value >>> 32));
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (!super.equals(obj))
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
-            if (!value.equals(other.value))
-                return false;
-            return true;
-        }
-    }
-
-}


Mime
View raw message