KYLIN-976 Extract Rewrite related methdos
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/63f376a5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/63f376a5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/63f376a5
Branch: refs/heads/KYLIN-976
Commit: 63f376a5aaea84fa66e02ef27b50f11c008790b8
Parents: 6c85ead
Author: Li, Yang <yangli9@ebay.com>
Authored: Tue Dec 1 11:39:31 2015 +0800
Committer: Li, Yang <yangli9@ebay.com>
Committed: Tue Dec 1 11:39:31 2015 +0800
----------------------------------------------------------------------
.../kylin/cube/CubeCapabilityChecker.java | 2 +-
.../org/apache/kylin/cube/model/CubeDesc.java | 2 +-
.../apache/kylin/measure/MeasureIngester.java | 6 +-
.../org/apache/kylin/measure/MeasureType.java | 10 +-
.../kylin/measure/basic/BasicMeasureType.java | 11 ++
.../kylin/measure/basic/BigDecimalIngester.java | 5 -
.../kylin/measure/basic/DoubleIngester.java | 5 -
.../kylin/measure/basic/LongIngester.java | 5 -
.../kylin/measure/hllc/HLLCMeasureType.java | 16 +-
.../measure/hllc/HLLDistinctCountAggFunc.java | 153 +++++++++++++++++++
.../kylin/measure/topn/TopNMeasureType.java | 10 ++
.../kylin/metadata/model/FunctionDesc.java | 39 +++--
.../mr/steps/MergeCuboidFromStorageMapper.java | 2 +-
.../engine/mr/steps/MergeCuboidMapper.java | 2 +-
.../kylin/query/relnode/OLAPAggregateRel.java | 13 +-
.../apache/kylin/query/schema/OLAPTable.java | 10 +-
.../query/sqlfunc/HLLDistinctCountAggFunc.java | 152 ------------------
17 files changed, 237 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 0c1b3c9..624bb0b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -191,7 +191,7 @@ public class CubeCapabilityChecker {
if (unmatchedDimensions.isEmpty() && unmatchedAggregations.isEmpty())
break;
- MeasureType measureType = MeasureType.create(measure.getFunction());
+ MeasureType measureType = measure.getFunction().getMeasureType();
if (measureType instanceof BasicMeasureType)
continue;
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 3e8ee13..050aef2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -819,7 +819,7 @@ public class CubeDesc extends RootPersistentEntity {
}
for (MeasureDesc measure : measures) {
- MeasureType aggrType = MeasureType.create(measure.getFunction());
+ MeasureType aggrType = measure.getFunction().getMeasureType();
result.addAll(aggrType.getColumnsNeedDictionary(measure));
}
return result;
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
index 9c7b406..bc387fe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
abstract public class MeasureIngester<V> {
public static MeasureIngester<?> create(MeasureDesc measure) {
- return MeasureType.create(measure.getFunction()).newIngester();
+ return measure.getFunction().getMeasureType().newIngester();
}
public static MeasureIngester<?>[] create(Collection<MeasureDesc> measures)
{
@@ -42,5 +42,7 @@ abstract public class MeasureIngester<V> {
abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef,
Dictionary<String>> dictionaryMap);
- abstract public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef,
Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts);
+ public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>>
oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 8e7de6f..0891d1e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -45,10 +45,6 @@ abstract public class MeasureType {
factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory());
}
- public static MeasureType create(FunctionDesc function) {
- return create(function.getExpression(), function.getReturnType());
- }
-
public static MeasureType create(String funcName, String dataType) {
funcName = funcName.toUpperCase();
dataType = dataType.toLowerCase();
@@ -102,6 +98,12 @@ abstract public class MeasureType {
* Query
* ---------------------------------------------------------------------------- */
+ // TODO support user defined calcite aggr function
+
+ abstract public boolean needRewrite();
+
+ abstract public Class<?> getRewriteAggregationFunctionClass();
+
/* ============================================================================
* Storage
* ---------------------------------------------------------------------------- */
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index f314870..a6b36bb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -115,5 +115,16 @@ public class BasicMeasureType extends MeasureType {
private boolean isMin() {
return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
}
+
+ @Override
+ public boolean needRewrite() {
+ return !isSum();
+ }
+
+ @Override
+ public Class<?> getRewriteAggregationFunctionClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
index ea1495c..721ba00 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -38,9 +38,4 @@ public class BigDecimalIngester extends MeasureIngester<BigDecimal>
{
else
return new BigDecimal(values[0]);
}
-
- @Override
- public BigDecimal reEncodeDictionary(BigDecimal value, MeasureDesc measureDesc, Map<TblColRef,
Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict)
{
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index aaa754a..70ca727 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -43,9 +43,4 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable>
{
l.set(Double.parseDouble(values[0]));
return l;
}
-
- @Override
- public DoubleMutable reEncodeDictionary(DoubleMutable value, MeasureDesc measureDesc,
Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>>
newDict) {
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index bdc1704..2547162 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -43,9 +43,4 @@ public class LongIngester extends MeasureIngester<LongMutable> {
l.set(Long.parseLong(values[0]));
return l;
}
-
- @Override
- public LongMutable reEncodeDictionary(LongMutable value, MeasureDesc measureDesc, Map<TblColRef,
Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict)
{
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 4a73478..fd71c00 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -73,11 +73,6 @@ public class HLLCMeasureType extends MeasureType {
hllc.add(v == null ? "__nUlL__" : v);
return hllc;
}
-
- @Override
- public HyperLogLogPlusCounter reEncodeDictionary(HyperLogLogPlusCounter value,
MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef,
Dictionary<String>> newDict) {
- throw new UnsupportedOperationException();
- }
};
}
@@ -89,4 +84,15 @@ public class HLLCMeasureType extends MeasureType {
return new LDCAggregator();
}
+ @Override
+ public boolean needRewrite() {
+ return true;
+ }
+
+ @Override
+ public Class<?> getRewriteAggregationFunctionClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
new file mode 100644
index 0000000..b00da5f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xjiang
+ */
+public class HLLDistinctCountAggFunc {
+
+ private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
+
+ public static HyperLogLogPlusCounter init() {
+ return null;
+ }
+
+ public static HyperLogLogPlusCounter initAdd(Object v) {
+ if (v instanceof Long) { // holistic case
+ long l = (Long) v;
+ return new FixedValueHLLCMockup(l);
+ } else {
+ HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+ return new HyperLogLogPlusCounter(c);
+ }
+ }
+
+ public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
+ if (v instanceof Long) { // holistic case
+ long l = (Long) v;
+ if (counter == null) {
+ return new FixedValueHLLCMockup(l);
+ } else {
+ if (!(counter instanceof FixedValueHLLCMockup))
+ throw new IllegalStateException("counter is not FixedValueHLLCMockup");
+
+ ((FixedValueHLLCMockup) counter).set(l);
+ return counter;
+ }
+ } else {
+ HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+ if (counter == null) {
+ return new HyperLogLogPlusCounter(c);
+ } else {
+ counter.merge(c);
+ return counter;
+ }
+ }
+ }
+
+ public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1)
{
+ return add(counter0, counter1);
+ }
+
+ public static long result(HyperLogLogPlusCounter counter) {
+ return counter == null ? 0L : counter.getCountEstimate();
+ }
+
+ @SuppressWarnings("serial")
+ private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
+
+ private Long value = null;
+
+ FixedValueHLLCMockup(long value) {
+ this.value = value;
+ }
+
+ public void set(long value) {
+ if (this.value == null) {
+ this.value = value;
+ } else {
+ long oldValue = Math.abs(this.value.longValue());
+ long take = Math.max(oldValue, value);
+ logger.warn("Error to aggregate holistic count distinct, old value " + oldValue
+ ", new value " + value + ", taking " + take);
+ this.value = -take; // make it obvious that this value is wrong
+ }
+ }
+
+ @Override
+ public void clear() {
+ this.value = null;
+ }
+
+ @Override
+ protected void add(long hash) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void merge(HyperLogLogPlusCounter another) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getCountEstimate() {
+ return value;
+ }
+
+ @Override
+ public void writeRegisters(ByteBuffer out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readRegisters(ByteBuffer in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (int) (value ^ (value >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (!super.equals(obj))
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
+ if (!value.equals(other.value))
+ return false;
+ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index d6c5a6f..77319be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -167,4 +167,14 @@ public class TopNMeasureType extends MeasureType {
return null;
}
+ @Override
+ public boolean needRewrite() {
+ return false;
+ }
+
+ @Override
+ public Class<?> getRewriteAggregationFunctionClass() {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 0c36873..17debda 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model;
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.datatype.DataType;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -51,6 +52,7 @@ public class FunctionDesc {
private String returnType;
private DataType returnDataType;
+ private MeasureType measureType;
private boolean isDimensionAsMetric = false;
public void init(TableDesc factTable) {
@@ -77,6 +79,23 @@ public class FunctionDesc {
setReturnType(colRefs.get(0).getDatatype());
}
}
+
+ public MeasureType getMeasureType() {
+ if (isDimensionAsMetric)
+ return null;
+
+ if (measureType == null) {
+ measureType = MeasureType.create(getExpression(), getReturnType());
+ }
+ return measureType;
+ }
+
+ public boolean needRewrite() {
+ if (isDimensionAsMetric)
+ return false;
+
+ return getMeasureType().needRewrite();
+ }
public String getRewriteFieldName() {
if (isSum()) {
@@ -88,14 +107,19 @@ public class FunctionDesc {
}
}
- public boolean needRewrite() {
- return !isSum() && !isDimensionAsMetric() && !isTopN();
+ public DataType getRewriteFieldType() {
+ if (isCountDistinct() || isTopN())
+ return DataType.ANY;
+ else if (isSum() || isMax() || isMin())
+ return parameter.getColRefs().get(0).getType();
+ else
+ return returnDataType;
}
public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
ColumnDesc fakeCol = new ColumnDesc();
fakeCol.setName(getRewriteFieldName());
- fakeCol.setDatatype(getSQLType().toString());
+ fakeCol.setDatatype(getRewriteFieldType().toString());
if (isCount())
fakeCol.setNullable(false);
fakeCol.init(sourceTable);
@@ -179,15 +203,6 @@ public class FunctionDesc {
return count;
}
- public DataType getSQLType() {
- if (isCountDistinct() || isTopN())
- return DataType.ANY;
- else if (isSum() || isMax() || isMin())
- return parameter.getColRefs().get(0).getType();
- else
- return returnDataType;
- }
-
public String getReturnType() {
return returnType;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b4682dd..bb0c073 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -122,7 +122,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object,
Object, By
dictMeasures = Lists.newArrayList();
for (int i = 0; i < measureDescs.size(); i++) {
MeasureDesc measureDesc = measureDescs.get(i);
- MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+ MeasureType measureType = measureDesc.getFunction().getMeasureType();
if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 4fc7236..401237b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -124,7 +124,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text,
Text> {
dictMeasures = Lists.newArrayList();
for (int i = 0; i < measureDescs.size(); i++) {
MeasureDesc measureDesc = measureDescs.get(i);
- MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+ MeasureType measureType = measureDesc.getFunction().getMeasureType();
if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index cbc0c56..9aa70ca 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -59,7 +59,6 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.query.sqlfunc.HLLDistinctCountAggFunc;
import com.google.common.base.Preconditions;
@@ -301,10 +300,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
// rebuild function
RelDataType fieldType = aggCall.getType();
SqlAggFunction newAgg = aggCall.getAggregation();
- if (func.isCountDistinct()) {
- newAgg = createHyperLogLogAggFunction(fieldType);
- } else if (func.isCount()) {
+ if (func.isCount()) {
newAgg = SqlStdOperatorTable.SUM0;
+ } else if (func.getMeasureType().getRewriteAggregationFunctionClass() != null) {
+ newAgg = createCustomAggFunction(fieldType, func.getMeasureType().getRewriteAggregationFunctionClass());
}
// rebuild aggregate call
@@ -312,10 +311,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
return newAggCall;
}
- private SqlAggFunction createHyperLogLogAggFunction(RelDataType returnType) {
+ private SqlAggFunction createCustomAggFunction(RelDataType returnType, Class<?>
customAggFuncClz) {
RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
- SqlIdentifier sqlIdentifier = new SqlIdentifier("HLL_COUNT", new SqlParserPos(1,
1));
- AggregateFunction aggFunction = AggregateFunctionImpl.create(HLLDistinctCountAggFunc.class);
+ SqlIdentifier sqlIdentifier = new SqlIdentifier(customAggFuncClz.getSimpleName(),
new SqlParserPos(1, 1));
+ AggregateFunction aggFunction = AggregateFunctionImpl.create(customAggFuncClz);
List<RelDataType> argTypes = new ArrayList<RelDataType>();
List<SqlTypeFamily> typeFamilies = new ArrayList<SqlTypeFamily>();
for (FunctionParameter o : aggFunction.getParameters()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
index 94a91bf..a8789ea 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
@@ -184,11 +184,11 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
HashSet<ColumnDesc> updateColumns = Sets.newHashSet();
for (MeasureDesc m : mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity()))
{
if (m.getFunction().isSum()) {
- FunctionDesc functionDesc = m.getFunction();
- if (functionDesc.getReturnDataType() != functionDesc.getSQLType() &&
//
- functionDesc.getReturnDataType().isBigInt() && //
- functionDesc.getSQLType().isIntegerFamily()) {
- updateColumns.add(functionDesc.getParameter().getColRefs().get(0).getColumnDesc());
+ FunctionDesc func = m.getFunction();
+ if (func.getReturnDataType() != func.getRewriteFieldType() && //
+ func.getReturnDataType().isBigInt() && //
+ func.getRewriteFieldType().isIntegerFamily()) {
+ updateColumns.add(func.getParameter().getColRefs().get(0).getColumnDesc());
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
deleted file mode 100644
index 7881c42..0000000
--- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.sqlfunc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- */
-public class HLLDistinctCountAggFunc {
-
- private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
-
- public static HyperLogLogPlusCounter init() {
- return null;
- }
-
- public static HyperLogLogPlusCounter initAdd(Object v) {
- if (v instanceof Long) { // holistic case
- long l = (Long) v;
- return new FixedValueHLLCMockup(l);
- } else {
- HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
- return new HyperLogLogPlusCounter(c);
- }
- }
-
- public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
- if (v instanceof Long) { // holistic case
- long l = (Long) v;
- if (counter == null) {
- return new FixedValueHLLCMockup(l);
- } else {
- if (!(counter instanceof FixedValueHLLCMockup))
- throw new IllegalStateException("counter is not FixedValueHLLCMockup");
-
- ((FixedValueHLLCMockup) counter).set(l);
- return counter;
- }
- } else {
- HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
- if (counter == null) {
- return new HyperLogLogPlusCounter(c);
- } else {
- counter.merge(c);
- return counter;
- }
- }
- }
-
- public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1)
{
- return add(counter0, counter1);
- }
-
- public static long result(HyperLogLogPlusCounter counter) {
- return counter == null ? 0L : counter.getCountEstimate();
- }
-
- private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
-
- private Long value = null;
-
- FixedValueHLLCMockup(long value) {
- this.value = value;
- }
-
- public void set(long value) {
- if (this.value == null) {
- this.value = value;
- } else {
- long oldValue = Math.abs(this.value.longValue());
- long take = Math.max(oldValue, value);
- logger.warn("Error to aggregate holistic count distinct, old value " + oldValue
+ ", new value " + value + ", taking " + take);
- this.value = -take; // make it obvious that this value is wrong
- }
- }
-
- @Override
- public void clear() {
- this.value = null;
- }
-
- @Override
- protected void add(long hash) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void merge(HyperLogLogPlusCounter another) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getCountEstimate() {
- return value;
- }
-
- @Override
- public void writeRegisters(ByteBuffer out) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void readRegisters(ByteBuffer in) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (int) (value ^ (value >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (!super.equals(obj))
- return false;
- if (getClass() != obj.getClass())
- return false;
- FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
- if (!value.equals(other.value))
- return false;
- return true;
- }
- }
-
-}
|