From dev-return-13470-archive-asf-public=cust-asf.ponee.io@kylin.apache.org Fri Jul 27 03:48:19 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 53970180621 for ; Fri, 27 Jul 2018 03:48:18 +0200 (CEST) Received: (qmail 8641 invoked by uid 500); 27 Jul 2018 01:48:17 -0000 Mailing-List: contact dev-help@kylin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.apache.org Delivered-To: mailing list dev@kylin.apache.org Received: (qmail 8630 invoked by uid 99); 27 Jul 2018 01:48:17 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jul 2018 01:48:17 +0000 From: GitBox To: dev@kylin.apache.org Subject: [GitHub] shaofengshi closed pull request #172: KYLIN-3453 Improve cube size estimation for topn, count distinct Message-ID: <153265609642.14950.7510586557477137162.gitbox@gitbox.apache.org> Date: Fri, 27 Jul 2018 01:48:16 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit shaofengshi closed pull request #172: KYLIN-3453 Improve cube size estimation for topn,count distinct URL: https://github.com/apache/kylin/pull/172 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 637502ef05..025a982bdf 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -426,7 +426,6 @@ public double getExtTableSnapshotLocalCacheMaxSizeGB() { return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200")); } - // ============================================================================ // CUBE // ============================================================================ @@ -449,7 +448,11 @@ public double getJobCuboidSizeMemHungryRatio() { } public double getJobCuboidSizeCountDistinctRatio() { - return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.05")); + return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.5")); + } + + public double getJobCuboidSizeTopNRatio() { + return Double.parseDouble(getOptional("kylin.cube.size-estimate-topn-ratio", "0.5")); } public String getCubeAlgorithm() { @@ -872,7 +875,7 @@ public int getSqoopMapperNum() { public Map getSqoopConfigOverride() { return getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override."); } - + public String getJdbcSourceFieldDelimiter() { return getOptional("kylin.source.jdbc.field-delimiter", "|"); } @@ -1223,11 +1226,11 @@ public boolean isSparkSanityCheckEnabled() { public Boolean isEnumerableRulesEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.calcite.enumerable-rules-enabled", "false")); } - + public boolean isReduceExpressionsRulesEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.calcite.reduce-rules-enabled", "true")); } - + public boolean isConvertCreateTableToWith() { return Boolean.valueOf(getOptional("kylin.query.convert-create-table-to-with", "false")); } @@ -1328,12 +1331,13 @@ public int getBadQueryHistoryNum() { public int getBadQueryDefaultAlertingSeconds() { return Integer.parseInt(getOptional("kylin.query.badquery-alerting-seconds", "90")); } + public double getBadQueryDefaultAlertingCoefficient() { return Double.parseDouble(getOptional("kylin.query.timeout-seconds-coefficient", "0.5")); } public int getBadQueryDefaultDetectIntervalSeconds() { - int time =(int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout + int time = (int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout if (time == 0) { time = 60; // 60 sec } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java index 1c138761f3..29a25e9b5b 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java @@ -30,6 +30,7 @@ private static final int IS_RESULT_FLAG = 1; private static final int RESULT_SIZE = 12; + private static final int DEFAULT_MAX_SIZE = 1024; // called by reflection public BitmapSerializer(DataType type) { @@ -85,6 +86,19 @@ public int getStorageBytesEstimate() { return 8 * 1024; } + @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + // MappeableArrayContainer DEFAULT_MAX_SIZE = 4096 + if (averageNumOfElementsInCounter < DEFAULT_MAX_SIZE) { + // 8 = 4 + 4 for SERIAL_COOKIE_NO_RUNCONTAINER + size + // size * 8 = 2 * size + 2 * size + 4 * size as keys + values Cardinality + startOffsets + // size * 8 for values array + return 8 + averageNumOfElementsInCounter * 16; + } else { + return getStorageBytesEstimate(); + } + } + @Override public boolean supportDirectReturnResult() { return true; diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java index 98bc5cf772..9310864916 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -80,4 +80,16 @@ public int getStorageBytesEstimate() { return current().maxLength(); } + @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + int registerIndexSize = current().getRegisterIndexSize(); + int m = 1 << precision; + if (!current().isDense((int) averageNumOfElementsInCounter) + || averageNumOfElementsInCounter < (m - 5) / (1 + registerIndexSize)) { + // 5 = 1 + 4 for scheme and size + // size * (getRegisterIndexSize + 1) + return 5 + averageNumOfElementsInCounter * (registerIndexSize + 1); + } + return getStorageBytesEstimate(); + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java index b79346503e..80bbb2a9c1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java @@ -78,7 +78,7 @@ public HLLCounter(int p, RegisterType type) { } } - private boolean isDense(int size) { + public boolean isDense(int size) { double over = OVERFLOW_FACTOR * m; return size > (int) over; } @@ -358,7 +358,7 @@ public int maxLength() { return 1 + m; } - private int getRegisterIndexSize() { + public int getRegisterIndexSize() { return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java index 77a69cf935..eff510f8d0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java @@ -61,12 +61,12 @@ public int peekLength(ByteBuffer in) { @Override public int maxLength() { - return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8), 1024 * 1024); // use at least 1M + return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(), 1024 * 1024); // use at least 1M } @Override public int getStorageBytesEstimate() { - return precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8); + return precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(); } @Override @@ -107,4 +107,17 @@ public void serialize(TopNCounter value, ByteBuffer out) { return counter; } + @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + if (averageNumOfElementsInCounter < precision * TopNCounter.EXTRA_SPACE_RATE) { + return averageNumOfElementsInCounter * storageBytesEstimatePerCounter() + 12; + } else { + return getStorageBytesEstimate(); + } + } + + private int storageBytesEstimatePerCounter() { + return (scale + 8); + } + } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 3c054a3283..6b8934abb4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -55,6 +55,7 @@ import org.apache.kylin.cube.kv.RowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.measure.topn.TopNMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -79,6 +80,7 @@ final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge final Map cuboidRowEstimatesHLL; final CuboidScheduler cuboidScheduler; + final long sourceRowCount; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig); @@ -94,7 +96,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, RawResource resource = store.getResource(statsKey); if (resource == null) throw new IllegalStateException("Missing resource at " + statsKey); - + File tmpSeqFile = writeTmpSeqFile(resource.inputStream); Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); @@ -107,6 +109,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); + this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); } /** @@ -129,6 +132,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); + this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); } private File writeTmpSeqFile(InputStream inputStream) throws IOException { @@ -158,7 +162,7 @@ public int getSamplingPercentage() { // return map of Cuboid ID => MB public Map getCuboidSizeMap() { - return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL()); + return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount); } public double estimateCubeSize() { @@ -184,7 +188,8 @@ public double getMapperOverlapRatioOfFirstBuild() { return cuboidRowCountMap; } - public static Map getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map rowCountMap) { + public static Map getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map rowCountMap, + long sourceRowCount) { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); final List rowkeyColumnSize = Lists.newArrayList(); final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc); @@ -199,7 +204,7 @@ public double getMapperOverlapRatioOfFirstBuild() { Map sizeMap = Maps.newHashMap(); for (Map.Entry entry : rowCountMap.entrySet()) { sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), - baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize)); + baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount)); } return sizeMap; } @@ -210,7 +215,7 @@ public double getMapperOverlapRatioOfFirstBuild() { * @return the cuboid size in M bytes */ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, - long baseCuboidId, long baseCuboidCount, List rowKeyColumnLength) { + long baseCuboidId, long baseCuboidCount, List rowKeyColumnLength, long sourceRowCount) { int rowkeyLength = cubeSegment.getRowKeyPreambleSize(); KylinConfig kylinConf = cubeSegment.getConfig(); @@ -228,12 +233,21 @@ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cu int normalSpace = rowkeyLength; int countDistinctSpace = 0; double percentileSpace = 0; + int topNSpace = 0; for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { + if (rowCount == 0) + break; DataType returnType = measureDesc.getFunction().getReturnDataType(); if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT)) { - countDistinctSpace += returnType.getStorageBytesEstimate(); + long estimateDistinctCount = sourceRowCount / rowCount; + estimateDistinctCount = estimateDistinctCount == 0 ? 1L : estimateDistinctCount; + countDistinctSpace += returnType.getStorageBytesEstimate(estimateDistinctCount); } else if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_PERCENTILE)) { percentileSpace += returnType.getStorageBytesEstimate(baseCuboidCount * 1.0 / rowCount); + } else if (measureDesc.getFunction().getExpression().equals(TopNMeasureType.FUNC_TOP_N)) { + long estimateTopNCount = sourceRowCount / rowCount; + estimateTopNCount = estimateTopNCount == 0 ? 1L : estimateTopNCount; + topNSpace += returnType.getStorageBytesEstimate(estimateTopNCount); } else { normalSpace += returnType.getStorageBytesEstimate(); } @@ -241,9 +255,11 @@ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cu double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio(); double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio(); + double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio(); + double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio - + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount) - / (1024L * 1024L); + + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount + + 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L); return ret; } @@ -351,6 +367,7 @@ private static String formatDouble(double input) { public static class CubeStatsResult { private int percentage = 100; private double mapperOverlapRatio = 0; + private long sourceRecordCount = 0; private int mapperNumber = 0; private Map counterMap = Maps.newHashMap(); @@ -367,6 +384,8 @@ public CubeStatsResult(Path path, int precision) throws IOException { mapperOverlapRatio = Bytes.toDouble(value.getBytes()); } else if (key.get() == -2) { mapperNumber = Bytes.toInt(value.getBytes()); + } else if (key.get() == -3) { + sourceRecordCount = Bytes.toLong(value.getBytes()); } else if (key.get() > 0) { HLLCounter hll = new HLLCounter(precision); ByteArray byteArray = new ByteArray(value.getBytes()); @@ -392,6 +411,10 @@ public int getMapperNumber() { public Map getCounterMap() { return Collections.unmodifiableMap(counterMap); } + + public long getSourceRecordCount() { + return sourceRecordCount; + } } public static void main(String[] args) throws IOException { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index f50a4beb97..c3d6042b1d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -40,14 +40,15 @@ public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map cuboidHLLMap, int samplingPercentage) throws IOException { - writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0); + writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0); } public static void writeCuboidStatistics(Configuration conf, Path outputPath, // - Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { + Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, + long sourceRecordCoun) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, - mapperOverlapRatio); + mapperOverlapRatio, sourceRecordCoun); } //Be care of that the file name for partial cuboid statistics should start with BatchConstants.CFG_OUTPUT_STATISTICS, @@ -57,12 +58,12 @@ public static void writePartialCuboidStatistics(Configuration conf, Path outputP int shard) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard); writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, - mapperOverlapRatio); + mapperOverlapRatio, 0); } private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, // - Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) - throws IOException { + Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, + long sourceRecordCount) throws IOException { List allCuboids = Lists.newArrayList(); allCuboids.addAll(cuboidHLLMap.keySet()); Collections.sort(allCuboids); @@ -80,6 +81,9 @@ private static void writeCuboidStatisticsInner(Configuration conf, Path outputFi // sampling percentage at key 0 writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage))); + // flat table source_count at key -3 + writer.append(new LongWritable(-3), new BytesWritable(Bytes.toBytes(sourceRecordCount))); + for (long i : allCuboids) { valueBuf.clear(); cuboidHLLMap.get(i).writeRegisters(valueBuf); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index b532360f36..1f79539f6d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -122,12 +122,15 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio totalRowsBeforeMerge); } double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal; + CubingJob cubingJob = (CubingJob) getManager() + .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + long sourceRecordCount = cubingJob.findSourceRecordCount(); CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage, - mapperNumber, mapperOverlapRatio); + mapperNumber, mapperOverlapRatio, sourceRecordCount); Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); logger.info(newSegment + " stats saved to hdfs " + statisticsFile); - + FSDataInputStream is = fs.open(statisticsFile); try { // put the statistics to metadata store @@ -135,8 +138,6 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio rs.putResource(resPath, is, System.currentTimeMillis()); logger.info(newSegment + " stats saved to resource " + resPath); - CubingJob cubingJob = (CubingJob) getManager() - .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment); StatisticsDecisionUtil.optimizeCubingPlan(newSegment); } finally { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services