Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5E114171E4 for ; Wed, 2 Dec 2015 03:15:34 +0000 (UTC) Received: (qmail 32484 invoked by uid 500); 2 Dec 2015 03:15:34 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 32454 invoked by uid 500); 2 Dec 2015 03:15:34 -0000 Mailing-List: contact commits-help@kylin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.apache.org Delivered-To: mailing list commits@kylin.apache.org Received: (qmail 32440 invoked by uid 99); 2 Dec 2015 03:15:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 03:15:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C1E53E08B3; Wed, 2 Dec 2015 03:15:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyang@apache.org To: commits@kylin.apache.org Date: Wed, 02 Dec 2015 03:15:34 -0000 Message-Id: <74aa3cbdb1b74dfbae7e55e72e0b6dfa@git.apache.org> In-Reply-To: <333cc6991504463eac937bbba0f5a020@git.apache.org> References: <333cc6991504463eac937bbba0f5a020@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] kylin git commit: KYLIN-976 Extract Rewrite related methdos KYLIN-976 Extract Rewrite related methdos Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/63f376a5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/63f376a5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/63f376a5 Branch: refs/heads/KYLIN-976 Commit: 63f376a5aaea84fa66e02ef27b50f11c008790b8 Parents: 6c85ead Author: Li, Yang Authored: Tue Dec 1 11:39:31 2015 +0800 Committer: Li, Yang Committed: Tue Dec 1 11:39:31 2015 +0800 ---------------------------------------------------------------------- .../kylin/cube/CubeCapabilityChecker.java | 2 +- .../org/apache/kylin/cube/model/CubeDesc.java | 2 +- .../apache/kylin/measure/MeasureIngester.java | 6 +- .../org/apache/kylin/measure/MeasureType.java | 10 +- .../kylin/measure/basic/BasicMeasureType.java | 11 ++ .../kylin/measure/basic/BigDecimalIngester.java | 5 - .../kylin/measure/basic/DoubleIngester.java | 5 - .../kylin/measure/basic/LongIngester.java | 5 - .../kylin/measure/hllc/HLLCMeasureType.java | 16 +- .../measure/hllc/HLLDistinctCountAggFunc.java | 153 +++++++++++++++++++ .../kylin/measure/topn/TopNMeasureType.java | 10 ++ .../kylin/metadata/model/FunctionDesc.java | 39 +++-- .../mr/steps/MergeCuboidFromStorageMapper.java | 2 +- .../engine/mr/steps/MergeCuboidMapper.java | 2 +- .../kylin/query/relnode/OLAPAggregateRel.java | 13 +- .../apache/kylin/query/schema/OLAPTable.java | 10 +- .../query/sqlfunc/HLLDistinctCountAggFunc.java | 152 ------------------ 17 files changed, 237 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java index 0c1b3c9..624bb0b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java @@ -191,7 +191,7 @@ public class CubeCapabilityChecker { if (unmatchedDimensions.isEmpty() && unmatchedAggregations.isEmpty()) break; - MeasureType measureType = MeasureType.create(measure.getFunction()); + MeasureType measureType = measure.getFunction().getMeasureType(); if (measureType instanceof BasicMeasureType) continue; http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 3e8ee13..050aef2 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -819,7 +819,7 @@ public class CubeDesc extends RootPersistentEntity { } for (MeasureDesc measure : measures) { - MeasureType aggrType = MeasureType.create(measure.getFunction()); + MeasureType aggrType = measure.getFunction().getMeasureType(); result.addAll(aggrType.getColumnsNeedDictionary(measure)); } return result; http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java index 9c7b406..bc387fe 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java @@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef; abstract public class MeasureIngester { public static MeasureIngester create(MeasureDesc measure) { - return MeasureType.create(measure.getFunction()).newIngester(); + return measure.getFunction().getMeasureType().newIngester(); } public static MeasureIngester[] create(Collection measures) { @@ -42,5 +42,7 @@ abstract public class MeasureIngester { abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map> dictionaryMap); - abstract public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map> oldDicts, Map> newDicts); + public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map> oldDicts, Map> newDicts) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index 8e7de6f..0891d1e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -45,10 +45,6 @@ abstract public class MeasureType { factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory()); } - public static MeasureType create(FunctionDesc function) { - return create(function.getExpression(), function.getReturnType()); - } - public static MeasureType create(String funcName, String dataType) { funcName = funcName.toUpperCase(); dataType = dataType.toLowerCase(); @@ -102,6 +98,12 @@ abstract public class MeasureType { * Query * ---------------------------------------------------------------------------- */ + // TODO support user defined calcite aggr function + + abstract public boolean needRewrite(); + + abstract public Class getRewriteAggregationFunctionClass(); + /* ============================================================================ * Storage * ---------------------------------------------------------------------------- */ http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java index f314870..a6b36bb 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java @@ -115,5 +115,16 @@ public class BasicMeasureType extends MeasureType { private boolean isMin() { return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName); } + + @Override + public boolean needRewrite() { + return !isSum(); + } + + @Override + public Class getRewriteAggregationFunctionClass() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java index ea1495c..721ba00 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java @@ -38,9 +38,4 @@ public class BigDecimalIngester extends MeasureIngester { else return new BigDecimal(values[0]); } - - @Override - public BigDecimal reEncodeDictionary(BigDecimal value, MeasureDesc measureDesc, Map> oldDict, Map> newDict) { - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java index aaa754a..70ca727 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java @@ -43,9 +43,4 @@ public class DoubleIngester extends MeasureIngester { l.set(Double.parseDouble(values[0])); return l; } - - @Override - public DoubleMutable reEncodeDictionary(DoubleMutable value, MeasureDesc measureDesc, Map> oldDict, Map> newDict) { - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java index bdc1704..2547162 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java @@ -43,9 +43,4 @@ public class LongIngester extends MeasureIngester { l.set(Long.parseLong(values[0])); return l; } - - @Override - public LongMutable reEncodeDictionary(LongMutable value, MeasureDesc measureDesc, Map> oldDict, Map> newDict) { - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java index 4a73478..fd71c00 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -73,11 +73,6 @@ public class HLLCMeasureType extends MeasureType { hllc.add(v == null ? "__nUlL__" : v); return hllc; } - - @Override - public HyperLogLogPlusCounter reEncodeDictionary(HyperLogLogPlusCounter value, MeasureDesc measureDesc, Map> oldDict, Map> newDict) { - throw new UnsupportedOperationException(); - } }; } @@ -89,4 +84,15 @@ public class HLLCMeasureType extends MeasureType { return new LDCAggregator(); } + @Override + public boolean needRewrite() { + return true; + } + + @Override + public Class getRewriteAggregationFunctionClass() { + // TODO Auto-generated method stub + return null; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java new file mode 100644 index 0000000..b00da5f --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hllc; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author xjiang + */ +public class HLLDistinctCountAggFunc { + + private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class); + + public static HyperLogLogPlusCounter init() { + return null; + } + + public static HyperLogLogPlusCounter initAdd(Object v) { + if (v instanceof Long) { // holistic case + long l = (Long) v; + return new FixedValueHLLCMockup(l); + } else { + HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; + return new HyperLogLogPlusCounter(c); + } + } + + public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) { + if (v instanceof Long) { // holistic case + long l = (Long) v; + if (counter == null) { + return new FixedValueHLLCMockup(l); + } else { + if (!(counter instanceof FixedValueHLLCMockup)) + throw new IllegalStateException("counter is not FixedValueHLLCMockup"); + + ((FixedValueHLLCMockup) counter).set(l); + return counter; + } + } else { + HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; + if (counter == null) { + return new HyperLogLogPlusCounter(c); + } else { + counter.merge(c); + return counter; + } + } + } + + public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) { + return add(counter0, counter1); + } + + public static long result(HyperLogLogPlusCounter counter) { + return counter == null ? 0L : counter.getCountEstimate(); + } + + @SuppressWarnings("serial") + private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter { + + private Long value = null; + + FixedValueHLLCMockup(long value) { + this.value = value; + } + + public void set(long value) { + if (this.value == null) { + this.value = value; + } else { + long oldValue = Math.abs(this.value.longValue()); + long take = Math.max(oldValue, value); + logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take); + this.value = -take; // make it obvious that this value is wrong + } + } + + @Override + public void clear() { + this.value = null; + } + + @Override + protected void add(long hash) { + throw new UnsupportedOperationException(); + } + + @Override + public void merge(HyperLogLogPlusCounter another) { + throw new UnsupportedOperationException(); + } + + @Override + public long getCountEstimate() { + return value; + } + + @Override + public void writeRegisters(ByteBuffer out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void readRegisters(ByteBuffer in) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (value ^ (value >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (getClass() != obj.getClass()) + return false; + FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj; + if (!value.equals(other.value)) + return false; + return true; + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java index d6c5a6f..77319be 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java @@ -167,4 +167,14 @@ public class TopNMeasureType extends MeasureType { return null; } + @Override + public boolean needRewrite() { + return false; + } + + @Override + public Class getRewriteAggregationFunctionClass() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index 0c36873..17debda 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model; import java.util.ArrayList; import java.util.Collection; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.datatype.DataType; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -51,6 +52,7 @@ public class FunctionDesc { private String returnType; private DataType returnDataType; + private MeasureType measureType; private boolean isDimensionAsMetric = false; public void init(TableDesc factTable) { @@ -77,6 +79,23 @@ public class FunctionDesc { setReturnType(colRefs.get(0).getDatatype()); } } + + public MeasureType getMeasureType() { + if (isDimensionAsMetric) + return null; + + if (measureType == null) { + measureType = MeasureType.create(getExpression(), getReturnType()); + } + return measureType; + } + + public boolean needRewrite() { + if (isDimensionAsMetric) + return false; + + return getMeasureType().needRewrite(); + } public String getRewriteFieldName() { if (isSum()) { @@ -88,14 +107,19 @@ public class FunctionDesc { } } - public boolean needRewrite() { - return !isSum() && !isDimensionAsMetric() && !isTopN(); + public DataType getRewriteFieldType() { + if (isCountDistinct() || isTopN()) + return DataType.ANY; + else if (isSum() || isMax() || isMin()) + return parameter.getColRefs().get(0).getType(); + else + return returnDataType; } public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) { ColumnDesc fakeCol = new ColumnDesc(); fakeCol.setName(getRewriteFieldName()); - fakeCol.setDatatype(getSQLType().toString()); + fakeCol.setDatatype(getRewriteFieldType().toString()); if (isCount()) fakeCol.setNullable(false); fakeCol.init(sourceTable); @@ -179,15 +203,6 @@ public class FunctionDesc { return count; } - public DataType getSQLType() { - if (isCountDistinct() || isTopN()) - return DataType.ANY; - else if (isSum() || isMax() || isMin()) - return parameter.getColRefs().get(0).getType(); - else - return returnDataType; - } - public String getReturnType() { return returnType; } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java index b4682dd..bb0c073 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java @@ -122,7 +122,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper(i, measureType.newIngester())); } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 4fc7236..401237b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -124,7 +124,7 @@ public class MergeCuboidMapper extends KylinMapper { dictMeasures = Lists.newArrayList(); for (int i = 0; i < measureDescs.size(); i++) { MeasureDesc measureDesc = measureDescs.get(i); - MeasureType measureType = MeasureType.create(measureDesc.getFunction()); + MeasureType measureType = measureDesc.getFunction().getMeasureType(); if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) { dictMeasures.add(new Pair(i, measureType.newIngester())); } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index cbc0c56..9aa70ca 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -59,7 +59,6 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.query.sqlfunc.HLLDistinctCountAggFunc; import com.google.common.base.Preconditions; @@ -301,10 +300,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { // rebuild function RelDataType fieldType = aggCall.getType(); SqlAggFunction newAgg = aggCall.getAggregation(); - if (func.isCountDistinct()) { - newAgg = createHyperLogLogAggFunction(fieldType); - } else if (func.isCount()) { + if (func.isCount()) { newAgg = SqlStdOperatorTable.SUM0; + } else if (func.getMeasureType().getRewriteAggregationFunctionClass() != null) { + newAgg = createCustomAggFunction(fieldType, func.getMeasureType().getRewriteAggregationFunctionClass()); } // rebuild aggregate call @@ -312,10 +311,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { return newAggCall; } - private SqlAggFunction createHyperLogLogAggFunction(RelDataType returnType) { + private SqlAggFunction createCustomAggFunction(RelDataType returnType, Class customAggFuncClz) { RelDataTypeFactory typeFactory = getCluster().getTypeFactory(); - SqlIdentifier sqlIdentifier = new SqlIdentifier("HLL_COUNT", new SqlParserPos(1, 1)); - AggregateFunction aggFunction = AggregateFunctionImpl.create(HLLDistinctCountAggFunc.class); + SqlIdentifier sqlIdentifier = new SqlIdentifier(customAggFuncClz.getSimpleName(), new SqlParserPos(1, 1)); + AggregateFunction aggFunction = AggregateFunctionImpl.create(customAggFuncClz); List argTypes = new ArrayList(); List typeFamilies = new ArrayList(); for (FunctionParameter o : aggFunction.getParameters()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java index 94a91bf..a8789ea 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java @@ -184,11 +184,11 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab HashSet updateColumns = Sets.newHashSet(); for (MeasureDesc m : mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity())) { if (m.getFunction().isSum()) { - FunctionDesc functionDesc = m.getFunction(); - if (functionDesc.getReturnDataType() != functionDesc.getSQLType() && // - functionDesc.getReturnDataType().isBigInt() && // - functionDesc.getSQLType().isIntegerFamily()) { - updateColumns.add(functionDesc.getParameter().getColRefs().get(0).getColumnDesc()); + FunctionDesc func = m.getFunction(); + if (func.getReturnDataType() != func.getRewriteFieldType() && // + func.getReturnDataType().isBigInt() && // + func.getRewriteFieldType().isIntegerFamily()) { + updateColumns.add(func.getParameter().getColRefs().get(0).getColumnDesc()); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java deleted file mode 100644 index 7881c42..0000000 --- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.query.sqlfunc; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xjiang - */ -public class HLLDistinctCountAggFunc { - - private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class); - - public static HyperLogLogPlusCounter init() { - return null; - } - - public static HyperLogLogPlusCounter initAdd(Object v) { - if (v instanceof Long) { // holistic case - long l = (Long) v; - return new FixedValueHLLCMockup(l); - } else { - HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; - return new HyperLogLogPlusCounter(c); - } - } - - public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) { - if (v instanceof Long) { // holistic case - long l = (Long) v; - if (counter == null) { - return new FixedValueHLLCMockup(l); - } else { - if (!(counter instanceof FixedValueHLLCMockup)) - throw new IllegalStateException("counter is not FixedValueHLLCMockup"); - - ((FixedValueHLLCMockup) counter).set(l); - return counter; - } - } else { - HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; - if (counter == null) { - return new HyperLogLogPlusCounter(c); - } else { - counter.merge(c); - return counter; - } - } - } - - public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) { - return add(counter0, counter1); - } - - public static long result(HyperLogLogPlusCounter counter) { - return counter == null ? 0L : counter.getCountEstimate(); - } - - private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter { - - private Long value = null; - - FixedValueHLLCMockup(long value) { - this.value = value; - } - - public void set(long value) { - if (this.value == null) { - this.value = value; - } else { - long oldValue = Math.abs(this.value.longValue()); - long take = Math.max(oldValue, value); - logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take); - this.value = -take; // make it obvious that this value is wrong - } - } - - @Override - public void clear() { - this.value = null; - } - - @Override - protected void add(long hash) { - throw new UnsupportedOperationException(); - } - - @Override - public void merge(HyperLogLogPlusCounter another) { - throw new UnsupportedOperationException(); - } - - @Override - public long getCountEstimate() { - return value; - } - - @Override - public void writeRegisters(ByteBuffer out) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void readRegisters(ByteBuffer in) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + (int) (value ^ (value >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (!super.equals(obj)) - return false; - if (getClass() != obj.getClass()) - return false; - FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj; - if (!value.equals(other.value)) - return false; - return true; - } - } - -}