Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BF43218B26 for ; Fri, 15 May 2015 03:06:37 +0000 (UTC) Received: (qmail 7669 invoked by uid 500); 15 May 2015 03:06:37 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 7644 invoked by uid 500); 15 May 2015 03:06:37 -0000 Mailing-List: contact commits-help@kylin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.incubator.apache.org Delivered-To: mailing list commits@kylin.incubator.apache.org Received: (qmail 7598 invoked by uid 99); 15 May 2015 03:06:37 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 May 2015 03:06:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 161221A2CA8 for ; Fri, 15 May 2015 03:06:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id T8qXykMa-HHS for ; Fri, 15 May 2015 03:06:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id AAA91256D5 for ; Fri, 15 May 2015 03:06:18 +0000 (UTC) Received: (qmail 2150 invoked by uid 99); 15 May 2015 03:06:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 May 2015 03:06:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8924DE3A5E; Fri, 15 May 2015 03:06:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shaofengshi@apache.org To: commits@kylin.incubator.apache.org Date: Fri, 15 May 2015 03:06:43 -0000 Message-Id: <01be07a482a64fb392cfc697cb9da135@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] incubator-kylin git commit: KYLIN-728 KYLIN-728 Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0d87e8f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0d87e8f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0d87e8f6 Branch: refs/heads/streaming-localdict Commit: 0d87e8f689e42d5a21330815eca6b2f83afb3f02 Parents: 876ac60 Author: qianhao.zhou Authored: Tue May 12 13:59:54 2015 +0800 Committer: qianhao.zhou Committed: Tue May 12 13:59:54 2015 +0800 ---------------------------------------------------------------------- .../kylin/common/util/BytesSerializer.java | 6 +- .../org/apache/kylin/common/util/SSHClient.java | 5 +- .../kylin/cube/cuboid/CuboidScheduler.java | 14 +- .../apache/kylin/dict/DictionarySerializer.java | 54 +++++ .../invertedindex/model/IIKeyValueCodec.java | 31 +-- .../job/hadoop/cubev2/InMemCubeBuilder.java | 197 +++++++++++++------ .../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 2 +- .../hadoop/cubev2/MapContextGTRecordWriter.java | 2 +- .../metadata/serializer/DataTypeSerializer.java | 32 +-- .../kylin/storage/cube/CubeCodeSystem.java | 1 + .../storage/cube/CubeHBaseReadonlyStore.java | 14 +- .../kylin/storage/gridtable/GTBuilder.java | 1 - .../kylin/storage/gridtable/GTComboStore.java | 112 +++++++++++ .../apache/kylin/storage/gridtable/GTInfo.java | 10 +- .../storage/gridtable/GTInvertedIndex.java | 4 +- .../kylin/storage/gridtable/GTRawScanner.java | 24 +-- .../kylin/storage/gridtable/GTRecord.java | 4 +- .../kylin/storage/gridtable/GTRowBlock.java | 19 +- .../storage/gridtable/GTSampleCodeSystem.java | 5 +- .../kylin/storage/gridtable/IGTScanner.java | 6 +- .../kylin/storage/gridtable/IGTStore.java | 18 +- .../apache/kylin/storage/gridtable/ScanKey.java | 34 ++++ .../storage/gridtable/diskstore/FileSystem.java | 22 +++ .../gridtable/diskstore/GTDiskStore.java | 160 +++++++++++++++ .../gridtable/diskstore/HadoopFileSystem.java | 88 +++++++++ .../gridtable/diskstore/LocalFileSystem.java | 60 ++++++ .../gridtable/memstore/GTSimpleMemStore.java | 49 +++-- .../apache/kylin/storage/util/SizeOfUtil.java | 21 ++ .../invertedindex/IIStreamBuilder.java | 6 +- 29 files changed, 803 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java b/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java index 1d1f5ae..26342f5 100644 --- a/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java +++ b/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java @@ -26,10 +26,10 @@ import java.nio.ByteBuffer; */ public interface BytesSerializer { - public static final int SERIALIZE_BUFFER_SIZE = 65536; + int SERIALIZE_BUFFER_SIZE = 65536; - abstract public void serialize(T value, ByteBuffer out); + void serialize(T value, ByteBuffer out); - abstract public T deserialize(ByteBuffer in); + T deserialize(ByteBuffer in); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/common/src/main/java/org/apache/kylin/common/util/SSHClient.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/SSHClient.java b/common/src/main/java/org/apache/kylin/common/util/SSHClient.java index ccaabf0..32eb72a 100644 --- a/common/src/main/java/org/apache/kylin/common/util/SSHClient.java +++ b/common/src/main/java/org/apache/kylin/common/util/SSHClient.java @@ -49,6 +49,7 @@ public class SSHClient { public SSHClient(String hostname, int port, String username, String password) { this.hostname = hostname; this.username = username; + this.port = port; if (password != null && new File(password).exists()) { this.identityPath = new File(password).getAbsolutePath(); this.password = null; @@ -324,7 +325,7 @@ public class SSHClient { throw ee; } if (timeout < 0) - throw new Exception("Remote commmand not finished within " + timeoutSeconds + " seconds."); + throw new Exception("Remote command not finished within " + timeoutSeconds + " seconds."); } channel.disconnect(); session.disconnect(); @@ -340,7 +341,7 @@ public class SSHClient { jsch.addIdentity(identityPath); } - Session session = jsch.getSession(username, hostname, 22); + Session session = jsch.getSession(username, hostname, port); if (password != null) { session.setPassword(password); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java ---------------------------------------------------------------------- diff --git a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java index 6f64116..07be092 100644 --- a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java +++ b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java @@ -22,11 +22,7 @@ package org.apache.kylin.cube.cuboid; * @author George Song (ysong1) * */ -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.cube.model.CubeDesc; @@ -38,21 +34,21 @@ public class CuboidScheduler { private final CubeDesc cubeDef; private final int size; private final long max; - private final Map> cache; + private final Map> cache; public CuboidScheduler(CubeDesc cube) { this.cubeDef = cube; this.size = cube.getRowkey().getRowKeyColumns().length; this.max = (long) Math.pow(2, size) - 1; - this.cache = new ConcurrentHashMap>(); + this.cache = new ConcurrentHashMap>(); } - public Collection getSpanningCuboid(long cuboid) { + public List getSpanningCuboid(long cuboid) { if (cuboid > max || cuboid < 0) { throw new IllegalArgumentException("Cuboid " + cuboid + " is out of scope 0-" + max); } - Collection result = cache.get(cuboid); + List result = cache.get(cuboid); if (result != null) { return result; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java new file mode 100644 index 0000000..4b61591 --- /dev/null +++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java @@ -0,0 +1,54 @@ +package org.apache.kylin.dict; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.kylin.common.util.ClassUtil; + +import java.io.*; + +/** + * Created by qianzhou on 5/5/15. + */ +public final class DictionarySerializer { + + private DictionarySerializer() {} + + public static Dictionary deserialize(InputStream inputStream) { + try { + final DataInputStream dataInputStream = new DataInputStream(inputStream); + final String type = dataInputStream.readUTF(); + final Dictionary dictionary = ClassUtil.forName(type, Dictionary.class).newInstance(); + dictionary.readFields(dataInputStream); + return dictionary; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static Dictionary deserialize(ImmutableBytesWritable dictBytes) { + return deserialize(new ByteArrayInputStream(dictBytes.get(), dictBytes.getOffset(), dictBytes.getLength())); + } + + public static void serialize(Dictionary dict, OutputStream outputStream) { + try { + DataOutputStream out = new DataOutputStream(outputStream); + out.writeUTF(dict.getClass().getName()); + dict.write(out); + out.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static ImmutableBytesWritable serialize(Dictionary dict) { + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + out.writeUTF(dict.getClass().getName()); + dict.write(out); + return new ImmutableBytesWritable(baos.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java index 8dbaed7..b236879 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java @@ -23,8 +23,8 @@ import com.google.common.collect.Lists; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.dict.DictionarySerializer; import org.apache.kylin.invertedindex.index.ColumnValueContainer; import org.apache.kylin.invertedindex.index.CompressedValueContainer; import org.apache.kylin.invertedindex.index.Slice; @@ -32,7 +32,6 @@ import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; import org.apache.kylin.metadata.model.DataType; -import java.io.*; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -73,31 +72,7 @@ public class IIKeyValueCodec implements KeyValueCodec { if (dictionary == null) { return new IIRow(key, value, new ImmutableBytesWritable(BytesUtil.EMPTY_BYTE_ARRAY)); } else { - return new IIRow(key, value, serialize(dictionary)); - } - } - - private static Dictionary deserialize(ImmutableBytesWritable dictBytes) { - try { - final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(dictBytes.get(), dictBytes.getOffset(), dictBytes.getLength())); - final String type = dataInputStream.readUTF(); - final Dictionary dictionary = ClassUtil.forName(type, Dictionary.class).newInstance(); - dictionary.readFields(dataInputStream); - return dictionary; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private static ImmutableBytesWritable serialize(Dictionary dict) { - try { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos); - out.writeUTF(dict.getClass().getName()); - dict.write(out); - return new ImmutableBytesWritable(baos.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); + return new IIRow(key, value, DictionarySerializer.serialize(dictionary)); } } @@ -221,7 +196,7 @@ public class IIKeyValueCodec implements KeyValueCodec { } else { final ImmutableBytesWritable dictBytes = row.getDictionary(); if (dictBytes.getLength() != 0) { - final Dictionary dictionary = deserialize(dictBytes); + final Dictionary dictionary = DictionarySerializer.deserialize(dictBytes); CompressedValueContainer c = new CompressedValueContainer(dictionary.getSizeOfId(), dictionary.getMaxId() - dictionary.getMinId() + 1, 0); c.fromBytes(row.getValue()); valueContainers[curCol] = c; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java index 29cdc9a..87ad2d3 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java @@ -38,6 +38,7 @@ import com.google.common.collect.Maps; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Pair; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.util.StringUtils; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; @@ -51,10 +52,9 @@ import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.serializer.DataTypeSerializer; import org.apache.kylin.storage.cube.CubeGridTable; import org.apache.kylin.storage.gridtable.*; -import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore; +import org.apache.kylin.storage.util.SizeOfUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +69,7 @@ import java.util.concurrent.BlockingQueue; @SuppressWarnings("rawtypes") public class InMemCubeBuilder implements Runnable { + private static final double AGGREGATION_CACHE_FACTOR = 3; private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class); private BlockingQueue> queue; @@ -78,14 +79,11 @@ public class InMemCubeBuilder implements Runnable { private Map> dictionaryMap = null; private CubeJoinedFlatTableDesc intermediateTableDesc; private MeasureCodec measureCodec; - private int measureNumber; private String[] metricsAggrFuncs = null; private Map dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on; public static final LongWritable ONE = new LongWritable(1l); protected IGTRecordWriter gtRecordWriter; - private GridTable baseCuboidGT; - private DataTypeSerializer[] serializers; /** @@ -95,30 +93,31 @@ public class InMemCubeBuilder implements Runnable { * @param gtRecordWriter */ public InMemCubeBuilder(BlockingQueue> queue, CubeInstance cube, Map> dictionaryMap, IGTRecordWriter gtRecordWriter) { + if (dictionaryMap == null || dictionaryMap.isEmpty()) { + throw new IllegalArgumentException(); + } this.queue = queue; this.desc = cube.getDescriptor(); this.cuboidScheduler = new CuboidScheduler(desc); this.dictionaryMap = dictionaryMap; this.gtRecordWriter = gtRecordWriter; - baseCuboidId = Cuboid.getBaseCuboidId(desc); - - intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null); - measureCodec = new MeasureCodec(desc.getMeasures()); - measureNumber = desc.getMeasures().size(); - - dependentMeasures = Maps.newHashMap(); + this.baseCuboidId = Cuboid.getBaseCuboidId(desc); + this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null); + this.measureCodec = new MeasureCodec(desc.getMeasures()); - Map measureIndexMap = new HashMap(); + Map measureIndexMap = Maps.newHashMap(); List metricsAggrFuncsList = Lists.newArrayList(); - for (int i = 0, n = desc.getMeasures().size(); i < n; i++) { + final int measureCount = desc.getMeasures().size(); + for (int i = 0; i < measureCount; i++) { MeasureDesc measureDesc = desc.getMeasures().get(i); metricsAggrFuncsList.add(measureDesc.getFunction().getExpression()); measureIndexMap.put(desc.getMeasures().get(i).getName(), i); } - metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]); + this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]); - for (int i = 0; i < measureNumber; i++) { + this.dependentMeasures = Maps.newHashMap(); + for (int i = 0; i < measureCount; i++) { String depMsrRef = desc.getMeasures().get(i).getDependentMeasureRef(); if (depMsrRef != null) { int index = measureIndexMap.get(depMsrRef); @@ -126,21 +125,19 @@ public class InMemCubeBuilder implements Runnable { } } - if (dictionaryMap == null || dictionaryMap.isEmpty()) - throw new IllegalArgumentException(); } - private GridTable newGridTableByCuboidID(long cuboidID) { + private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) { GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap); - GTSimpleMemStore store = new GTSimpleMemStore(info); + GTComboStore store = new GTComboStore(info, memStore); GridTable gridTable = new GridTable(info, store); return gridTable; } - private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException { + private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId, boolean inMem) throws IOException { logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId); - Pair columnBitSets = getDimensionAndMetricColumBitSet(parentCuboidId); + Pair columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId); BitSet parentDimensions = columnBitSets.getFirst(); BitSet measureColumns = columnBitSets.getSecond(); BitSet childDimensions = (BitSet) parentDimensions.clone(); @@ -160,14 +157,14 @@ public class InMemCubeBuilder implements Runnable { mask = mask >> 1; } - return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns); + return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns, inMem); } - private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException { + private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns, boolean inMem) throws IOException { GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null); IGTScanner scanner = gridTable.scan(req); - GridTable newGridTable = newGridTableByCuboidID(cuboidId); + GridTable newGridTable = newGridTableByCuboidID(cuboidId, inMem); GTBuilder builder = newGridTable.rebuild(); BitSet allNeededColumns = new BitSet(); @@ -181,7 +178,7 @@ public class InMemCubeBuilder implements Runnable { try { BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality()); for (Integer i : dependentMeasures.keySet()) { - dependentMetrics.set((allNeededColumns.cardinality() - measureNumber + dependentMeasures.get(i))); + dependentMetrics.set((allNeededColumns.cardinality() - desc.getMeasures().size() + dependentMeasures.get(i))); } Object[] hllObjects = new Object[dependentMeasures.keySet().size()]; @@ -197,13 +194,13 @@ public class InMemCubeBuilder implements Runnable { for (Integer i : dependentMeasures.keySet()) { for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) { - if (c == allNeededColumns.cardinality() - measureNumber + dependentMeasures.get(i)) { + if (c == allNeededColumns.cardinality() - desc.getMeasures().size() + dependentMeasures.get(i)) { assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed byteBuffer.clear(); BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer); byteArray.set(byteBuffer.array(), 0, byteBuffer.position()); - newRecord.set(allNeededColumns.cardinality() - measureNumber + i, byteArray); + newRecord.set(allNeededColumns.cardinality() - desc.getMeasures().size() + i, byteArray); } } @@ -219,16 +216,16 @@ public class InMemCubeBuilder implements Runnable { return newGridTable; } - private Pair getDimensionAndMetricColumBitSet(long cuboidId) { + private Pair getDimensionAndMetricColumnBitSet(long cuboidId) { BitSet bitSet = BitSet.valueOf(new long[]{cuboidId}); BitSet dimension = new BitSet(); dimension.set(0, bitSet.cardinality()); BitSet metrics = new BitSet(); - metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureNumber); + metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.desc.getMeasures().size()); return new Pair(dimension, metrics); } - private Object[] buildKey(List row, DataTypeSerializer[] serializers) { + private Object[] buildKey(List row) { int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length; Object[] key = new Object[keySize]; @@ -280,7 +277,8 @@ public class InMemCubeBuilder implements Runnable { @Override public void run() { try { - createBaseCuboidGT(); + logger.info("Create base cuboid " + baseCuboidId); + final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, true); GTBuilder baseGTBuilder = baseCuboidGT.rebuild(); final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo()); @@ -339,7 +337,7 @@ public class InMemCubeBuilder implements Runnable { } }; - Pair dimensionMetricsBitSet = getDimensionAndMetricColumBitSet(baseCuboidId); + Pair dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId); GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null); IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req); @@ -352,8 +350,18 @@ public class InMemCubeBuilder implements Runnable { aggregationScanner.close(); logger.info("Base cuboid has " + counter + " rows;"); - if (counter > 0) - createNDCuboidGT(null, -1l, baseCuboidId); + SimpleGridTableTree tree = new SimpleGridTableTree(); + tree.data = baseCuboidGT; + tree.id = baseCuboidId; + tree.parent = null; + if (counter > 0) { + List children = cuboidScheduler.getSpanningCuboid(baseCuboidId); + Collections.sort(children); + for (Long childId : children) { + createNDCuboidGT(tree, baseCuboidId, childId); + } + } + baseCuboidGT.getStore().drop(); } catch (IOException e) { logger.error("Fail to build cube", e); @@ -364,7 +372,7 @@ public class InMemCubeBuilder implements Runnable { private void buildGTRecord(List row, GTRecord record) { - Object[] dimensions = buildKey(row, serializers); + Object[] dimensions = buildKey(row); Object[] metricsValues = buildValue(row); Object[] recordValues = new Object[dimensions.length + metricsValues.length]; System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length); @@ -372,43 +380,84 @@ public class InMemCubeBuilder implements Runnable { record.setValues(recordValues); } - private void createBaseCuboidGT() throws IOException { - - logger.info("Create base cuboid " + baseCuboidId); - Cuboid baseCuboid = Cuboid.findById(this.desc, baseCuboidId); - serializers = new DataTypeSerializer[baseCuboid.getColumns().size()]; - - for (int i = 0; i < baseCuboid.getColumns().size(); i++) { - serializers[i] = DataTypeSerializer.create(baseCuboid.getColumns().get(i).getType()); + private boolean checkMemory(long threshold) { + final long freeMemory = Runtime.getRuntime().freeMemory(); + logger.info("available memory:" + (freeMemory>>10) + " KB"); + if (freeMemory >= threshold) { + logger.info("no need to flush to disk"); + return true; + } else { + return false; } - - this.baseCuboidGT = newGridTableByCuboidID(baseCuboidId); } + private boolean gc(TreeNode parentNode) { + final long parentCuboidMem = SizeOfUtil.deepSizeOf(parentNode.data.getStore()); + long threshold = (long) (parentCuboidMem * (AGGREGATION_CACHE_FACTOR + 1)); + logger.info((threshold >> 10) + " KB is needed to create " + parentNode.id + "'s child"); + if (checkMemory(threshold)) { + return true; + } + final List> gridTables = parentNode.getAncestorList(); + for (TreeNode gridTable : gridTables) { + logger.info("wait 10 seconds for gc"); + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + logger.error("this should not happen", e); + } + if (checkMemory(threshold)) { + return true; + } else { + logger.info("memory is low, try to select one node to flush to disk from:" + StringUtils.join(",", gridTables)); + final IGTStore store = gridTable.data.getStore(); + assert store instanceof GTComboStore; + if (store.memoryUsage() > 0) { + logger.info("cuboid id:" + gridTable.id + " selected, memory used:" + (SizeOfUtil.deepSizeOf(store)>>10) + " KB"); + long t = System.currentTimeMillis(); + ((GTComboStore) store).switchToDiskStore(); + logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms"); + } + } + } + logger.info("no store has been flushed to disk"); + return true; + } - private void createNDCuboidGT(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException { + private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException { - GridTable thisCuboid; long startTime = System.currentTimeMillis(); - if (parentCuboidId < 0) { - thisCuboid = this.baseCuboidGT; - } else { - thisCuboid = aggregateCuboid(parentCuboid, parentCuboidId, cuboidId); + assert parentNode.data.getStore() instanceof GTComboStore; + if (parentNode.data.getStore().memoryUsage() <= 0) { + long t = System.currentTimeMillis(); + ((GTComboStore) parentNode.data.getStore()).switchToMemStore(); + logger.info("switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms"); } - logger.info("Cuboid " + cuboidId + " build takes (second): " + (System.currentTimeMillis() - startTime) / 1000); - - ArrayList children = (ArrayList) cuboidScheduler.getSpanningCuboid(cuboidId); - Collections.sort(children); // sort cuboids - for (Long childId : children) { - createNDCuboidGT(thisCuboid, cuboidId, childId); + boolean inMem = gc(parentNode); + GridTable currentCuboid = aggregateCuboid(parentNode.data, parentCuboidId, cuboidId, inMem); + SimpleGridTableTree node = new SimpleGridTableTree(); + node.parent = parentNode; + node.data = currentCuboid; + node.id = cuboidId; + parentNode.children.add(node); + + logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms"); + + List children = cuboidScheduler.getSpanningCuboid(cuboidId); + if (!children.isEmpty()) { + Collections.sort(children); // sort cuboids + for (Long childId : children) { + createNDCuboidGT(node, cuboidId, childId); + } } - startTime = System.currentTimeMillis(); //output the grid table - outputGT(cuboidId, thisCuboid); - logger.info("Cuboid" + cuboidId + " output takes (second) " + (System.currentTimeMillis() - startTime) / 1000); + outputGT(cuboidId, currentCuboid); + currentCuboid.getStore().drop(); + parentNode.children.remove(node); + logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); } @@ -419,4 +468,30 @@ public class InMemCubeBuilder implements Runnable { this.gtRecordWriter.write(cuboidId, record); } } + + private static class TreeNode { + T data; + long id; + TreeNode parent; + List> children = Lists.newArrayList(); + + List> getAncestorList() { + ArrayList> result = Lists.newArrayList(); + TreeNode parent = this.parent; + while (parent != null) { + result.add(parent); + parent = parent.parent; + } + return Lists.reverse(result); + } + + @Override + public String toString() { + return id + ""; + } + } + + private static class SimpleGridTableTree extends TreeNode {} + + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java index fc165ea..db690b9 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java @@ -104,7 +104,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - long timeout = 1000*60*60l; // 1 hour + long timeout = 1000*60*60L; // 1 hour job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout)); Configuration conf = HBaseConfiguration.create(getConf()); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java index 41237d7..3ba80d1 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java @@ -72,7 +72,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter { try { mapContext.write(outputKey, outputValue); } catch (InterruptedException e) { - throw new IOException(e); + throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java index ac6f409..739cde4 100644 --- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java +++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java @@ -20,8 +20,11 @@ package org.apache.kylin.metadata.serializer; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; +import java.util.Map; +import com.google.common.collect.Maps; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.metadata.model.DataType; @@ -31,20 +34,23 @@ import org.apache.kylin.metadata.model.DataType; */ abstract public class DataTypeSerializer implements BytesSerializer { - final static HashMap> implementations = new HashMap>(); + final static Map> implementations; static { - implementations.put("varchar", StringSerializer.class); - implementations.put("decimal", BigDecimalSerializer.class); - implementations.put("double", DoubleSerializer.class); - implementations.put("float", DoubleSerializer.class); - implementations.put("bigint", LongSerializer.class); - implementations.put("long", LongSerializer.class); - implementations.put("integer", LongSerializer.class); - implementations.put("int", LongSerializer.class); - implementations.put("smallint", LongSerializer.class); - implementations.put("date", DateTimeSerializer.class); - implementations.put("datetime", DateTimeSerializer.class); - implementations.put("timestamp", DateTimeSerializer.class); + HashMap> impl = Maps.newHashMap(); + impl.put("varchar", StringSerializer.class); + impl.put("decimal", BigDecimalSerializer.class); + impl.put("double", DoubleSerializer.class); + impl.put("float", DoubleSerializer.class); + impl.put("bigint", LongSerializer.class); + impl.put("long", LongSerializer.class); + impl.put("integer", LongSerializer.class); + impl.put("int", LongSerializer.class); + impl.put("smallint", LongSerializer.class); + impl.put("date", DateTimeSerializer.class); + impl.put("datetime", DateTimeSerializer.class); + impl.put("timestamp", DateTimeSerializer.class); + implementations = Collections.unmodifiableMap(impl); + } public static DataTypeSerializer create(String dataType) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java index 70567e5..9b7550f 100644 --- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java +++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java @@ -8,6 +8,7 @@ import java.util.Map; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.dict.Dictionary; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java index f721148..0784587 100644 --- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java +++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java @@ -49,13 +49,8 @@ public class CubeHBaseReadonlyStore implements IGTStore { } @Override - public GTInfo getInfo() { - return info; - } - - @Override - public String getStorageDescription() { - return cubeSeg.toString(); + public long memoryUsage() { + return 0; } @Override @@ -127,6 +122,11 @@ public class CubeHBaseReadonlyStore implements IGTStore { }; } + @Override + public void drop() throws IOException { + throw new UnsupportedOperationException(); + } + private Scan buildScan(ByteArray pkStart, ByteArray pkEnd, List> selectedColumns) { Scan scan = new Scan(); scan.setCaching(SCAN_CACHE); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java index 7195e7f..7552ab3 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java @@ -51,7 +51,6 @@ public class GTBuilder implements Closeable, Flushable { blockWriter.readyForFlush(); storeWriter.write(block); writtenRowBlockCount++; - if (block.isFull()) { blockWriter.clearForNext(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java new file mode 100644 index 0000000..c7d0c2b --- /dev/null +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java @@ -0,0 +1,112 @@ +package org.apache.kylin.storage.gridtable; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.storage.gridtable.diskstore.GTDiskStore; +import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.BitSet; + +/** + * Created by qianzhou on 5/6/15. + */ +public class GTComboStore implements IGTStore { + + private static final Logger logger = LoggerFactory.getLogger(GTComboStore.class); + + private final GTInfo gtInfo; + + private void convert(IGTStore input, IGTStore output) throws IOException { + final IGTStoreScanner scanner = input.scan(ScanKey.makeScanKey(gtInfo, new GTRecord(gtInfo)), ScanKey.makeScanKey(gtInfo, new GTRecord(gtInfo)), null, null); + final IGTStoreWriter writer = output.rebuild(-1); + while (scanner.hasNext()) { + writer.write(scanner.next()); + } + } + + private GTDiskStore gtDiskStore; + private GTSimpleMemStore gtSimpleMemStore; + + public GTComboStore(GTInfo gtInfo) { + this(gtInfo, true); + } + + public GTComboStore(GTInfo gtInfo, boolean useMemStore) { + this.gtInfo = gtInfo; + if (useMemStore) { + this.gtSimpleMemStore = new GTSimpleMemStore(gtInfo); + } else { + this.gtDiskStore = new GTDiskStore(gtInfo); + } + } + + private IGTStore getCurrent() { + if (gtSimpleMemStore != null) { + return gtSimpleMemStore; + } else { + return gtDiskStore; + } + } + + public void switchToMemStore() { + try { + if (gtSimpleMemStore == null) { + gtSimpleMemStore = new GTSimpleMemStore(gtInfo); + convert(gtDiskStore, gtSimpleMemStore); + gtDiskStore.drop(); + gtDiskStore = null; + } + } catch (IOException e) { + logger.error("fail to switch to mem store", e); + throw new RuntimeException(e); + } + logger.info("switch to mem store"); + } + + public void switchToDiskStore() { + try { + if (gtDiskStore == null) { + gtDiskStore = new GTDiskStore(gtInfo); + convert(gtSimpleMemStore, gtDiskStore); + gtSimpleMemStore.drop(); + gtSimpleMemStore = null; + } + } catch (IOException e) { + logger.error("fail to switch to disk store", e); + throw new RuntimeException(e); + } + logger.info("switch to disk store"); + } + + @Override + public long memoryUsage() { + return getCurrent().memoryUsage(); + } + + @Override + public IGTStoreWriter rebuild(int shard) throws IOException { + return getCurrent().rebuild(shard); + } + + @Override + public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException { + return getCurrent().append(shard, fillLast); + } + + @Override + public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException { + return getCurrent().scan(pkStart, pkEnd, selectedColBlocks, additionalPushDown); + } + + @Override + public void drop() throws IOException { + if (gtSimpleMemStore != null) { + gtSimpleMemStore.drop(); + } + if (gtDiskStore != null) { + gtDiskStore.drop(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java index 954e464..fdabb60 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java @@ -17,7 +17,6 @@ public class GTInfo { IGTCodeSystem codeSystem; // column schema - int nColumns; DataType[] colTypes; BitSet colAll; BitSet colPreferIndex; @@ -37,7 +36,7 @@ public class GTInfo { } public int getColumnCount() { - return nColumns; + return colTypes.length; } public DataType getColumnType(int i) { @@ -74,7 +73,7 @@ public class GTInfo { public int getMaxColumnLength() { int max = 0; - for (int i = 0; i < nColumns; i++) + for (int i = 0; i < colTypes.length; i++) max = Math.max(max, codeSystem.maxCodeLength(i)); return max; } @@ -95,7 +94,7 @@ public class GTInfo { public TblColRef colRef(int i) { if (colRefs == null) { - colRefs = new TblColRef[nColumns]; + colRefs = new TblColRef[colTypes.length]; } if (colRefs[i] == null) { colRefs[i] = GTUtil.tblColRef(i, colTypes[i].toString()); @@ -124,7 +123,7 @@ public class GTInfo { private void validateColumnBlocks() { colAll = new BitSet(); - colAll.flip(0, nColumns); + colAll.flip(0, colTypes.length); if (colBlocks == null) { colBlocks = new BitSet[2]; @@ -185,7 +184,6 @@ public class GTInfo { /** required */ public Builder setColumns(DataType... colTypes) { - info.nColumns = colTypes.length; info.colTypes = colTypes; return this; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java index 7c6abec..2756659 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java @@ -34,7 +34,7 @@ public class GTInvertedIndex { this.colPreferIndex = info.colPreferIndex; this.colBlocks = info.selectColumnBlocks(colPreferIndex); - index = new GTInvertedIndexOfColumn[info.nColumns]; + index = new GTInvertedIndexOfColumn[info.getColumnCount()]; for (int i = colPreferIndex.nextSetBit(0); i >= 0; i = colPreferIndex.nextSetBit(i + 1)) { index[i] = new GTInvertedIndexOfColumn(info.codeSystem.getFilterCodeSystem()); } @@ -43,7 +43,7 @@ public class GTInvertedIndex { public void add(GTRowBlock block) { @SuppressWarnings("unchecked") - Set[] distinctValues = new Set[info.nColumns]; + Set[] distinctValues = new Set[info.getColumnCount()]; for (int i = colPreferIndex.nextSetBit(0); i >= 0; i = colPreferIndex.nextSetBit(i + 1)) { distinctValues[i] = new HashSet(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java index 895ccf3..ff97cd5 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java @@ -24,34 +24,14 @@ public class GTRawScanner implements IGTScanner { public GTRawScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException { this.info = info; - ByteArray start = makeScanKey(req.getPkStart()); - ByteArray end = makeScanKey(req.getPkEnd()); + ByteArray start = ScanKey.makeScanKey(info, req.getPkStart()); + ByteArray end = ScanKey.makeScanKey(info, req.getPkEnd()); this.selectedColBlocks = info.selectColumnBlocks(req.getColumns()); this.storeScanner = store.scan(start, end, selectedColBlocks, req); this.oneRecord = new GTRecord(info); } - private ByteArray makeScanKey(GTRecord rec) { - int firstPKCol = info.primaryKey.nextSetBit(0); - if (rec == null || rec.cols[firstPKCol].array() == null) - return null; - - BitSet selectedColumns = new BitSet(); - int len = 0; - for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) { - if (rec.cols[i].array() == null) { - break; - } - selectedColumns.set(i); - len += rec.cols[i].length(); - } - - ByteArray buf = ByteArray.allocate(len); - rec.exportColumns(selectedColumns, buf); - return buf; - } - @Override public GTInfo getInfo() { return info; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java index 8516f05..2a38731 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java @@ -16,7 +16,7 @@ public class GTRecord implements Comparable { public GTRecord(GTInfo info) { this.info = info; - this.cols = new ByteArray[info.nColumns]; + this.cols = new ByteArray[info.getColumnCount()]; for (int i = 0; i < cols.length; i++) this.cols[i] = new ByteArray(); this.maskForEqualHashComp = info.colAll; @@ -55,7 +55,7 @@ public class GTRecord implements Comparable { /** decode and return the values of this record */ public Object[] getValues() { - return getValues(info.colAll, new Object[info.nColumns]); + return getValues(info.colAll, new Object[info.getColumnCount()]); } /** decode and return the values of this record */ http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java index 7f79948..ec24da6 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java @@ -1,5 +1,7 @@ package org.apache.kylin.storage.gridtable; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.BitSet; @@ -54,7 +56,7 @@ public class GTRowBlock { public Writer getWriter() { return new Writer(); } - + public class Writer { ByteBuffer[] cellBlockBuffers; @@ -181,6 +183,21 @@ public class GTRowBlock { return len; } + public void export(DataOutputStream dataOutputStream) throws IOException { + dataOutputStream.writeInt(seqId); + dataOutputStream.writeInt(nRows); + export(dataOutputStream, primaryKey); + for (ByteArray cb : cellBlocks) { + export(dataOutputStream, cb); + } + } + + public void export(DataOutputStream dataOutputStream, ByteArray array) throws IOException { + dataOutputStream.writeInt(array.length()); + dataOutputStream.write(array.array(), array.offset(), array.length()); + } + + /** write data to given buffer, like serialize */ public void export(ByteBuffer buf) { buf.putInt(seqId); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java index cb8698c..9c758fa 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java @@ -3,6 +3,7 @@ package org.apache.kylin.storage.gridtable; import java.nio.ByteBuffer; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.metadata.filter.IFilterCodeSystem; import org.apache.kylin.metadata.measure.MeasureAggregator; @@ -30,8 +31,8 @@ public class GTSampleCodeSystem implements IGTCodeSystem { public void init(GTInfo info) { this.info = info; - this.serializers = new DataTypeSerializer[info.nColumns]; - for (int i = 0; i < info.nColumns; i++) { + this.serializers = new DataTypeSerializer[info.getColumnCount()]; + for (int i = 0; i < info.getColumnCount(); i++) { this.serializers[i] = DataTypeSerializer.create(info.colTypes[i]); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java index 285a301..3d3c3c8 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java @@ -4,10 +4,10 @@ import java.io.Closeable; public interface IGTScanner extends Iterable, Closeable { - public GTInfo getInfo(); + GTInfo getInfo(); - public int getScannedRowCount(); + int getScannedRowCount(); - public int getScannedRowBlockCount(); + int getScannedRowBlockCount(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java index f5eb077..0152571 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java @@ -8,22 +8,22 @@ import java.util.Iterator; import org.apache.kylin.common.util.ByteArray; public interface IGTStore { + + long memoryUsage(); - public GTInfo getInfo(); - - public String getStorageDescription(); - - public IGTStoreWriter rebuild(int shard) throws IOException; + IGTStoreWriter rebuild(int shard) throws IOException; - public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException; + IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException; - public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException; + IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException; + + void drop() throws IOException; - public interface IGTStoreWriter extends Closeable { + interface IGTStoreWriter extends Closeable { void write(GTRowBlock block) throws IOException; } - public interface IGTStoreScanner extends Iterator, Closeable { + interface IGTStoreScanner extends Iterator, Closeable { } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java new file mode 100644 index 0000000..5c0c436 --- /dev/null +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java @@ -0,0 +1,34 @@ +package org.apache.kylin.storage.gridtable; + +import org.apache.kylin.common.util.ByteArray; + +import java.util.BitSet; + +/** + * Created by qianzhou on 5/6/15. + */ +public final class ScanKey { + + private ScanKey() { + } + + static ByteArray makeScanKey(GTInfo info, GTRecord rec) { + int firstPKCol = info.primaryKey.nextSetBit(0); + if (rec == null || rec.cols[firstPKCol].array() == null) + return null; + + BitSet selectedColumns = new BitSet(); + int len = 0; + for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) { + if (rec.cols[i].array() == null) { + break; + } + selectedColumns.set(i); + len += rec.cols[i].length(); + } + + ByteArray buf = ByteArray.allocate(len); + rec.exportColumns(selectedColumns, buf); + return buf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java new file mode 100644 index 0000000..2ab2c7e --- /dev/null +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java @@ -0,0 +1,22 @@ +package org.apache.kylin.storage.gridtable.diskstore; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Created by qianzhou on 5/4/15. + */ +interface FileSystem { + + boolean checkExistence(String path); + + boolean delete(String path); + + boolean createDirectory(String path); + + boolean createFile(String path); + + OutputStream getWriter(String path); + + InputStream getReader(String path); +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java new file mode 100644 index 0000000..f48fce3 --- /dev/null +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java @@ -0,0 +1,160 @@ +package org.apache.kylin.storage.gridtable.diskstore; + +import com.google.common.base.Preconditions; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.storage.gridtable.GTInfo; +import org.apache.kylin.storage.gridtable.GTRowBlock; +import org.apache.kylin.storage.gridtable.GTScanRequest; +import org.apache.kylin.storage.gridtable.IGTStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.UUID; + +/** + * Created by qianzhou on 5/4/15. + */ +public class GTDiskStore implements IGTStore { + + private static final Logger logger = LoggerFactory.getLogger(GTDiskStore.class); + + private final String identifier; + private final FileSystem fileSystem; + private final DiskStoreWriter writer; + private final GTInfo gtInfo; + + public GTDiskStore(GTInfo gtInfo) { + this.gtInfo = gtInfo; + this.fileSystem = new LocalFileSystem(); + this.identifier = generateIdentifier(fileSystem); + logger.info("disk store created, identifier:" + identifier); + this.writer = new DiskStoreWriter(fileSystem.getWriter(getRowBlockFile(identifier))); + } + + private String generateIdentifier(FileSystem fs) { + while (true) { + String identifier = UUID.randomUUID().toString(); + final String path = getRootDirectory(identifier); + if (fs.createDirectory(path)) { + return identifier; + } + } + } + + private String getRootDirectory(String identifier) { + return "/tmp/kylin/gtdiskstore/" + identifier; + } + + private String getRowBlockFile(String identifier) { + return getRootDirectory(identifier) + "/rowblock"; + } + + private class DiskStoreWriter implements IGTStoreWriter { + + private final DataOutputStream outputStream; + + DiskStoreWriter(OutputStream outputStream) { + this.outputStream = new DataOutputStream(outputStream); + } + + @Override + public void write(GTRowBlock block) throws IOException { + final int blockSize = block.exportLength(); + outputStream.writeInt(blockSize); + block.export(outputStream); + outputStream.flush(); + } + + @Override + public void close() throws IOException { + outputStream.close(); + } + } + + @Override + public long memoryUsage() { + return 0; + } + + @Override + public IGTStoreWriter rebuild(int shard) throws IOException { + return writer; + } + + @Override + public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException { + return writer; + } + + private class DiskStoreScanner implements IGTStoreScanner { + + private final DataInputStream inputStream; + private int blockSize = 0; + + DiskStoreScanner(InputStream inputStream) { + this.inputStream = new DataInputStream(inputStream); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + + @Override + public boolean hasNext() { + try { + blockSize = inputStream.readInt(); + return blockSize > 0; + } catch (EOFException e) { + return false; + } catch (IOException e) { + logger.error("input stream fail", e); + throw new RuntimeException(e); + } + } + + @Override + public GTRowBlock next() { + GTRowBlock block = new GTRowBlock(gtInfo); + ByteBuffer buffer = ByteBuffer.allocate(blockSize); + int count = blockSize; + while (count > 0) { + try { + count -= inputStream.read(buffer.array(), buffer.position(), buffer.remaining()); + } catch (IOException e) { + logger.error("input stream fail", e); + throw new RuntimeException(e); + } + } + Preconditions.checkArgument(count == 0, "invalid read count:" + count + " block size:" + blockSize); + block.load(buffer); + return block; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @Override + public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException { + return new DiskStoreScanner(fileSystem.getReader(getRowBlockFile(identifier))); + } + + @Override + public void drop() throws IOException { + try { + writer.close(); + } catch (Exception e) { + logger.error("error to close writer", e); + } + fileSystem.delete(getRowBlockFile(identifier)); + fileSystem.delete(getRootDirectory(identifier)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java new file mode 100644 index 0000000..e1efd1b --- /dev/null +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java @@ -0,0 +1,88 @@ +package org.apache.kylin.storage.gridtable.diskstore; + +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Created by qianzhou on 5/6/15. + */ +public class HadoopFileSystem implements FileSystem { + + private static final Logger logger = LoggerFactory.getLogger(HadoopFileSystem.class); + + final org.apache.hadoop.fs.FileSystem fileSystem; + + public HadoopFileSystem() { + try { + fileSystem = org.apache.hadoop.fs.FileSystem.get(HadoopUtil.getCurrentConfiguration()); + } catch (IOException e) { + logger.error("error construct HadoopFileSystem", e); + throw new RuntimeException(e); + } + } + @Override + public boolean checkExistence(String path) { + try { + return fileSystem.exists(new Path(path)); + } catch (IOException e) { + logger.error("error checkExistence, path:" + path, e); + throw new RuntimeException(e); + } + } + + @Override + public boolean delete(String path) { + try { + return fileSystem.delete(new Path(path), true); + } catch (IOException e) { + logger.error("error delete, path:" + path, e); + throw new RuntimeException(e); + } + } + + @Override + public boolean createDirectory(String path) { + try { + return fileSystem.mkdirs(new Path(path)); + } catch (IOException e) { + logger.error("error createDirectory, path:" + path, e); + throw new RuntimeException(e); + } + } + + @Override + public boolean createFile(String path) { + try { + return fileSystem.createNewFile(new Path(path)); + } catch (IOException e) { + logger.error("error createFile, path:" + path, e); + throw new RuntimeException(e); + } + } + + @Override + public OutputStream getWriter(String path) { + try { + return fileSystem.create(new Path(path)); + } catch (IOException e) { + logger.error("error getWriter, path:" + path, e); + throw new RuntimeException(e); + } + } + + @Override + public InputStream getReader(String path) { + try { + return fileSystem.open(new Path(path)); + } catch (IOException e) { + logger.error("error getReader, path:" + path, e); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java new file mode 100644 index 0000000..1c14e3f --- /dev/null +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java @@ -0,0 +1,60 @@ +package org.apache.kylin.storage.gridtable.diskstore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +/** + * Created by qianzhou on 5/4/15. + */ +class LocalFileSystem implements FileSystem { + + private static Logger logger = LoggerFactory.getLogger(LocalFileSystem.class); + @Override + public boolean checkExistence(String path) { + return new File(path).exists(); + } + + @Override + public boolean delete(String path) { + return new File(path).delete(); + } + + @Override + public boolean createDirectory(String path) { + return new File(path).mkdirs(); + } + + @Override + public boolean createFile(String path) { + try { + return new File(path).createNewFile(); + } catch (IOException e) { + logger.warn("create file failed:" + path, e); + return false; + } + } + + @Override + public OutputStream getWriter(String path) { + try { + return new FileOutputStream(path); + } catch (FileNotFoundException e) { + //should not happen + logger.error("path:" + path + " nout found"); + throw new RuntimeException(e); + } + } + + @Override + public InputStream getReader(String path) { + try { + return new FileInputStream(path); + } catch (FileNotFoundException e) { + //should not happen + logger.error("path:" + path + " nout found"); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java index 32c7f36..329c048 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java @@ -1,44 +1,41 @@ package org.apache.kylin.storage.gridtable.memstore; -import java.io.IOException; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Iterator; -import java.util.List; - +import com.google.common.collect.Lists; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.storage.gridtable.GTInfo; import org.apache.kylin.storage.gridtable.GTRowBlock; import org.apache.kylin.storage.gridtable.GTScanRequest; import org.apache.kylin.storage.gridtable.IGTStore; +import java.io.IOException; +import java.util.BitSet; +import java.util.Iterator; +import java.util.List; + public class GTSimpleMemStore implements IGTStore { - final GTInfo info; final List rowBlockList; public GTSimpleMemStore(GTInfo info) { - this.info = info; - this.rowBlockList = new ArrayList(); + this.rowBlockList = Lists.newLinkedList(); if (info.isShardingEnabled()) throw new UnsupportedOperationException(); } @Override - public GTInfo getInfo() { - return info; - } - - @Override - public String getStorageDescription() { - return this.toString(); + public long memoryUsage() { + if (rowBlockList.size() == 0) { + return 0; + } else { + return rowBlockList.get(0).exportLength() * Long.valueOf(rowBlockList.size()); + } } @Override public IGTStoreWriter rebuild(int shard) { rowBlockList.clear(); - return new Writer(); + return new Writer(rowBlockList); } @Override @@ -47,10 +44,16 @@ public class GTSimpleMemStore implements IGTStore { GTRowBlock last = rowBlockList.get(rowBlockList.size() - 1); fillLast.copyFrom(last); } - return new Writer(); + return new Writer(rowBlockList); } - private class Writer implements IGTStoreWriter { + private static class Writer implements IGTStoreWriter { + + private final List rowBlockList; + + Writer(List rowBlockList) { + this.rowBlockList = rowBlockList; + } @Override public void close() throws IOException { } @@ -66,7 +69,7 @@ public class GTSimpleMemStore implements IGTStore { rowBlockList.add(copy); } } - }; + } @Override public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) { @@ -95,4 +98,10 @@ public class GTSimpleMemStore implements IGTStore { }; } + @Override + public void drop() throws IOException { + //will there be any concurrent issue? If yes, ArrayList should be replaced + rowBlockList.clear(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java b/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java new file mode 100644 index 0000000..d390415 --- /dev/null +++ b/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java @@ -0,0 +1,21 @@ +package org.apache.kylin.storage.util; + +import net.sf.ehcache.pool.sizeof.ReflectionSizeOf; + +/** + * Created by qianzhou on 5/11/15. + */ +public final class SizeOfUtil { + + private SizeOfUtil(){} + + private static final ReflectionSizeOf DEFAULT_SIZE_OF = new ReflectionSizeOf(); + + public static final long deepSizeOf(Object obj) { + return DEFAULT_SIZE_OF.deepSizeOf(Integer.MAX_VALUE, true, obj).getCalculated(); + } + + public static final long sizeOf(Object obj) { + return DEFAULT_SIZE_OF.sizeOf(obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java index 773425b..7c1d435 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java @@ -71,10 +71,6 @@ public class IIStreamBuilder extends StreamBuilder { private StreamingManager streamingManager; public IIStreamBuilder(BlockingQueue queue, String streaming, String hTableName, IIDesc iiDesc, int shard) { - this(queue, streaming, hTableName, iiDesc, shard, true); - } - - public IIStreamBuilder(BlockingQueue queue, String streaming, String hTableName, IIDesc iiDesc, int shard, boolean useLocalDict) { super(queue, iiDesc.getSliceSize()); this.streaming = streaming; this.desc = iiDesc; @@ -86,7 +82,7 @@ public class IIStreamBuilder extends StreamBuilder { logger.error("cannot open htable name:" + hTableName, e); throw new RuntimeException("cannot open htable name:" + hTableName, e); } - this.sliceBuilder = new SliceBuilder(desc, (short) shard, useLocalDict); + this.sliceBuilder = new SliceBuilder(desc, (short) shard, iiDesc.isUseLocalDictionary()); this.streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()); }