Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 65A64200C4D for ; Wed, 5 Apr 2017 15:39:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 641C3160B94; Wed, 5 Apr 2017 13:39:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AE652160B91 for ; Wed, 5 Apr 2017 15:39:42 +0200 (CEST) Received: (qmail 98397 invoked by uid 500); 5 Apr 2017 13:39:41 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 98388 invoked by uid 99); 5 Apr 2017 13:39:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Apr 2017 13:39:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 604B7C002D for ; Wed, 5 Apr 2017 13:39:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.221 X-Spam-Level: X-Spam-Status: No, score=-4.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 89DLyb15rLYB for ; Wed, 5 Apr 2017 13:39:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 70C5B5FE44 for ; Wed, 5 Apr 2017 13:39:32 +0000 (UTC) Received: (qmail 97961 invoked by uid 99); 5 Apr 2017 13:39:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Apr 2017 13:39:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C5575DFDCD; Wed, 5 Apr 2017 13:39:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.incubator.apache.org Date: Wed, 05 Apr 2017 13:39:40 -0000 Message-Id: <4efb56805bc04009a7c35ef8008029e9@git.apache.org> In-Reply-To: <881d989231384a42b36ab1ed8b42221a@git.apache.org> References: <881d989231384a42b36ab1ed8b42221a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/13] incubator-carbondata git commit: refactor write step archived-at: Wed, 05 Apr 2017 13:39:44 -0000 refactor write step Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8cca0afc Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8cca0afc Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8cca0afc Branch: refs/heads/12-dev Commit: 8cca0afc5db16557146dfaa33e14c2823d895966 Parents: bd044c2 Author: jackylk Authored: Thu Mar 30 11:21:21 2017 +0530 Committer: ravipesala Committed: Wed Apr 5 14:36:47 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 2 +- .../core/datastore/NodeMeasureDataStore.java | 32 -- ...ractHeavyCompressedDoubleArrayDataStore.java | 89 ----- ...yCompressedDoubleArrayDataInMemoryStore.java | 28 -- .../HeavyCompressedDoubleArrayDataStore.java | 57 ++++ .../core/util/CarbonMetadataUtil.java | 2 +- .../apache/carbondata/core/util/CarbonUtil.java | 2 +- .../carbondata/core/util/DataTypeUtil.java | 5 +- .../sort/unsafe/UnsafeCarbonRowPage.java | 6 +- .../holder/UnsafeSortTempFileChunkHolder.java | 2 +- .../merger/UnsafeIntermediateFileMerger.java | 2 +- .../sortdata/IntermediateFileMerger.java | 2 +- .../sortandgroupby/sortdata/SortDataRows.java | 2 +- .../sortdata/SortTempFileChunkHolder.java | 2 +- .../store/CarbonFactDataHandlerColumnar.java | 328 +++++++++++-------- .../processing/store/StoreFactory.java | 33 -- .../store/writer/CarbonFactDataWriter.java | 4 +- .../writer/v1/CarbonFactDataWriterImplV1.java | 13 +- .../writer/v3/CarbonFactDataWriterImplV3.java | 14 +- 19 files changed, 287 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 789c321..b82d53c 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -631,7 +631,7 @@ public final class CarbonCommonConstants { /** * DOUBLE_VALUE_MEASURE */ - public static final char SUM_COUNT_VALUE_MEASURE = 'n'; + public static final char DOUBLE_MEASURE = 'n'; /** * BYTE_VALUE_MEASURE */ http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java deleted file mode 100644 index 2f54847..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore; - -import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder; - -public interface NodeMeasureDataStore { - /** - * This method will be used to get the writable key array. - * writable measure data array will hold below information: - * - * total length will be 4 bytes for size + measure data array length - * - * @return writable array (compressed or normal) - */ - byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolderArray); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java deleted file mode 100644 index b274b21..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.impl.data.compressed; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.NodeMeasureDataStore; -import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder; -import org.apache.carbondata.core.datastore.compression.WriterCompressModel; -import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder; -import org.apache.carbondata.core.util.ValueCompressionUtil; - -public abstract class AbstractHeavyCompressedDoubleArrayDataStore - implements NodeMeasureDataStore //NodeMeasureDataStore -{ - - private LogService LOGGER = - LogServiceFactory.getLogService(AbstractHeavyCompressedDoubleArrayDataStore.class.getName()); - - /** - * values. - */ - protected ValueCompressionHolder[] values; - - /** - * compressionModel. - */ - protected WriterCompressModel compressionModel; - - /** - * type - */ - private char[] type; - - /** - * AbstractHeavyCompressedDoubleArrayDataStore constructor. - * - * @param compressionModel - */ - public AbstractHeavyCompressedDoubleArrayDataStore(WriterCompressModel compressionModel) { - this.compressionModel = compressionModel; - if (null != compressionModel) { - this.type = compressionModel.getType(); - values = - new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length]; - } - } - - // this method first invokes encoding routine to encode the data chunk, - // followed by invoking compression routine for preparing the data chunk for writing. - @Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) { - byte[][] returnValue = new byte[values.length][]; - for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) { - values[i] = compressionModel.getValueCompressionHolder()[i]; - if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE - && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - // first perform encoding of the data chunk - values[i].setValue( - ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i]) - .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i], - compressionModel.getMaxValue()[i], - compressionModel.getMantissa()[i])); - } else { - values[i].setValue(dataHolder[i].getWritableByteArrayValues()); - } - values[i].compress(); - returnValue[i] = values[i].getCompressedData(); - } - - return returnValue; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java deleted file mode 100644 index a484b8f..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.impl.data.compressed; - -import org.apache.carbondata.core.datastore.compression.WriterCompressModel; - -public class HeavyCompressedDoubleArrayDataInMemoryStore - extends AbstractHeavyCompressedDoubleArrayDataStore { - - public HeavyCompressedDoubleArrayDataInMemoryStore(WriterCompressModel compressionModel) { - super(compressionModel); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java new file mode 100644 index 0000000..d3d67fd --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.impl.data.compressed; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder; +import org.apache.carbondata.core.datastore.compression.WriterCompressModel; +import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder; +import org.apache.carbondata.core.util.ValueCompressionUtil; + +public class HeavyCompressedDoubleArrayDataStore { + + // this method first invokes encoding routine to encode the data chunk, + // followed by invoking compression routine for preparing the data chunk for writing. + public static byte[][] encodeMeasureDataArray( + WriterCompressModel compressionModel, + CarbonWriteDataHolder[] dataHolder) { + char[] type = compressionModel.getType(); + ValueCompressionHolder[] values = + new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length]; + byte[][] returnValue = new byte[values.length][]; + for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) { + values[i] = compressionModel.getValueCompressionHolder()[i]; + if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE + && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) { + // first perform encoding of the data chunk + values[i].setValue( + ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i]) + .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i], + compressionModel.getMaxValue()[i], + compressionModel.getMantissa()[i])); + } else { + values[i].setValue(dataHolder[i].getWritableByteArrayValues()); + } + values[i].compress(); + returnValue[i] = values[i].getCompressedData(); + } + + return returnValue; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java index f134f0c..e60d675 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java @@ -824,7 +824,7 @@ public class CarbonMetadataUtil { public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) { ByteBuffer buffer = null; - if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + if (valueEncoderMeta.getType() == CarbonCommonConstants.DOUBLE_MEASURE) { buffer = ByteBuffer.allocate( (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE + 3); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index bf8c03b..a442087 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1427,7 +1427,7 @@ public final class CarbonUtil { ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta(); valueEncoderMeta.setType(measureType); switch (measureType) { - case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE: + case CarbonCommonConstants.DOUBLE_MEASURE: valueEncoderMeta.setMaxValue(buffer.getDouble()); valueEncoderMeta.setMinValue(buffer.getDouble()); valueEncoderMeta.setUniqueValue(buffer.getDouble()); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 10411b0..e437405 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -152,10 +152,13 @@ public final class DataTypeUtil { case LONG: return CarbonCommonConstants.BIG_INT_MEASURE; default: - return CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE; + return CarbonCommonConstants.DOUBLE_MEASURE; } } + // bytes of 0 in BigDecimal + public static final byte[] zeroBigDecimalBytes = bigDecimalToByte(BigDecimal.valueOf(0)); + /** * This method will convert a big decimal value to bytes * http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java index e468028..e682263 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java @@ -112,7 +112,7 @@ public class UnsafeCarbonRowPage { for (int mesCount = 0; mesCount < measureSize; mesCount++) { Object value = row[mesCount + dimensionSize]; if (null != value) { - if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) { Double val = (Double) value; CarbonUnsafe.unsafe.putDouble(baseObject, address + size, val); size += 8; @@ -183,7 +183,7 @@ public class UnsafeCarbonRowPage { for (int mesCount = 0; mesCount < measureSize; mesCount++) { if (isSet(nullSetWords, mesCount)) { - if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) { Double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size); size += 8; rowToFill[dimensionSize + mesCount] = val; @@ -254,7 +254,7 @@ public class UnsafeCarbonRowPage { for (int mesCount = 0; mesCount < measureSize; mesCount++) { if (isSet(nullSetWords, mesCount)) { - if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) { double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size); size += 8; stream.writeDouble(val); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java index 60f259e..de2b874 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -324,7 +324,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { for (int mesCount = 0; mesCount < measureCount; mesCount++) { if (UnsafeCarbonRowPage.isSet(words, mesCount)) { - if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) { row[dimensionCount + mesCount] = stream.readDouble(); } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) { row[dimensionCount + mesCount] = stream.readLong(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java index 7862a95..e52dc8a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java @@ -310,7 +310,7 @@ public class UnsafeIntermediateFileMerger implements Callable { for (int mesCount = 0; mesCount < measureSize; mesCount++) { Object value = row[mesCount + dimensionSize]; if (null != value) { - if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) { Double val = (Double) value; rowData.putDouble(size, val); size += 8; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java index 0ac2d5c..5487593 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java @@ -341,7 +341,7 @@ public class IntermediateFileMerger implements Callable { if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) { Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row); stream.writeDouble(val); - } else if (aggType[counter] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + } else if (aggType[counter] == CarbonCommonConstants.DOUBLE_MEASURE) { Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row); stream.writeDouble(val); } else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java index 9b5a850..3a7a579 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java @@ -285,7 +285,7 @@ public class SortDataRows { Object value = row[mesCount + dimColCount]; if (null != value) { stream.write((byte) 1); - if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) { Double val = (Double) value; stream.writeDouble(val); } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java index ae01404..b4ccc6f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java @@ -332,7 +332,7 @@ public class SortTempFileChunkHolder implements Comparable dataRows) - throws CarbonDataWriterException { - Object[] max = new Object[measureCount]; - Object[] min = new Object[measureCount]; - int[] decimal = new int[measureCount]; - Object[] uniqueValue = new Object[measureCount]; - // to store index of the measure columns which are null - BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount); - for (int i = 0; i < max.length; i++) { - if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) { - max[i] = Long.MIN_VALUE; - } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { - max[i] = -Double.MAX_VALUE; - } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - max[i] = new BigDecimal(0.0); - } else { - max[i] = 0.0; + /** statics for one blocklet/page */ + class Statistics { + /** min and max value of the measures */ + Object[] min, max; + + /** + * the unique value is the non-exist value in the row, + * and will be used as storage key for null values of measures + */ + Object[] uniqueValue; + + /** decimal count of the measures */ + int[] decimal; + + Statistics(int measureCount) { + max = new Object[measureCount]; + min = new Object[measureCount]; + uniqueValue = new Object[measureCount]; + decimal = new int[measureCount]; + for (int i = 0; i < measureCount; i++) { + if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) { + max[i] = Long.MIN_VALUE; + min[i] = Long.MAX_VALUE; + uniqueValue[i] = Long.MIN_VALUE; + } else if (type[i] == CarbonCommonConstants.DOUBLE_MEASURE) { + max[i] = Double.MIN_VALUE; + min[i] = Double.MAX_VALUE; + uniqueValue[i] = Double.MIN_VALUE; + } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { + max[i] = new BigDecimal(Double.MIN_VALUE); + min[i] = new BigDecimal(Double.MAX_VALUE); + uniqueValue[i] = new BigDecimal(Double.MIN_VALUE); + } else { + max[i] = 0.0; + min[i] = 0.0; + uniqueValue[i] = 0.0; + } + decimal[i] = 0; } } - for (int i = 0; i < min.length; i++) { - if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) { - min[i] = Long.MAX_VALUE; - uniqueValue[i] = Long.MIN_VALUE; - } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { - min[i] = Double.MAX_VALUE; - uniqueValue[i] = Double.MIN_VALUE; - } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - min[i] = new BigDecimal(Double.MAX_VALUE); - uniqueValue[i] = new BigDecimal(Double.MIN_VALUE); - } else { - min[i] = 0.0; - uniqueValue[i] = 0.0; + + /** + * update the statistics for the input row + */ + void update(int[] msrIndex, Object[] row, boolean compactionFlow) { + // Update row level min max + for (int i = 0; i < msrIndex.length; i++) { + int count = msrIndex[i]; + if (row[count] != null) { + if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) { + double value = (double) row[count]; + double maxVal = (double) max[count]; + double minVal = (double) min[count]; + max[count] = (maxVal > value ? max[count] : value); + min[count] = (minVal < value ? min[count] : value); + int num = getDecimalCount(value); + decimal[count] = (decimal[count] > num ? decimal[count] : num); + uniqueValue[count] = (double) min[count] - 1; + } else if (type[count] == CarbonCommonConstants.BIG_INT_MEASURE) { + long value = (long) row[count]; + long maxVal = (long) max[count]; + long minVal = (long) min[count]; + max[count] = (maxVal > value ? max[count] : value); + min[count] = (minVal < value ? min[count] : value); + uniqueValue[count] = (long) min[count] - 1; + } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { + byte[] buff = null; + // in compaction flow the measure with decimal type will come as spark decimal. + // need to convert it to byte array. + if (compactionFlow) { + BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal(); + buff = DataTypeUtil.bigDecimalToByte(bigDecimal); + } else { + buff = (byte[]) row[count]; + } + BigDecimal value = DataTypeUtil.byteToBigDecimal(buff); + decimal[count] = value.scale(); + BigDecimal val = (BigDecimal) min[count]; + uniqueValue[count] = (val.subtract(new BigDecimal(1.0))); + } + } } } - for (int i = 0; i < decimal.length; i++) { - decimal[i] = 0; - } + } + class IndexKey { + byte[] currentMDKey = null; + byte[][] currentNoDictionaryKey = null; byte[] startKey = null; byte[] endKey = null; byte[][] noDictStartKey = null; byte[][] noDictEndKey = null; - CarbonWriteDataHolder[] dataHolder = initialiseDataHolder(dataRows.size()); - CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolder(dataRows.size()); - CarbonWriteDataHolder noDictionaryKeyDataHolder = null; + + /** update all keys based on the input row */ + void update(Object[] row, boolean firstRow) { + currentMDKey = (byte[]) row[mdKeyIndex]; + if (noDictionaryCount > 0 || complexIndexMap.size() > 0) { + currentNoDictionaryKey = (byte[][]) row[mdKeyIndex - 1]; + } + if (firstRow) { + startKey = currentMDKey; + noDictStartKey = currentNoDictionaryKey; + } + endKey = currentMDKey; + noDictEndKey = currentNoDictionaryKey; + } + } + + /** generate the NodeHolder from the input rows */ + private NodeHolder processDataRows(List dataRows) + throws CarbonDataWriterException { + // to store index of the measure columns which are null + BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount); + // statistics for one blocklet/page + Statistics stats = new Statistics(measureCount); + IndexKey keys = new IndexKey(); + + // initialize measureHolder, mdKeyHolder and noDictionaryHolder, these three Holders + // are the input for final encoding + CarbonWriteDataHolder[] measureHolder = initialiseDataHolder(dataRows.size()); + CarbonWriteDataHolder mdKeyHolder = initialiseKeyBlockHolder(dataRows.size()); + CarbonWriteDataHolder noDictionaryHolder = null; if ((noDictionaryCount + complexColCount) > 0) { - noDictionaryKeyDataHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size()); + noDictionaryHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size()); } + // loop on the input rows, fill measureHolder, mdKeyHolder and noDictionaryHolder for (int count = 0; count < dataRows.size(); count++) { Object[] row = dataRows.get(count); - byte[] mdKey = (byte[]) row[this.mdKeyIndex]; - byte[][] noDictionaryKey = null; - if (noDictionaryCount > 0 || complexIndexMap.size() > 0) { - noDictionaryKey = (byte[][]) row[this.mdKeyIndex - 1]; - } - ByteBuffer byteBuffer = null; - byte[] b = null; - if (count == 0) { - startKey = mdKey; - noDictStartKey = noDictionaryKey; - } - endKey = mdKey; - noDictEndKey = noDictionaryKey; - // add to key store - if (mdKey.length > 0) { - keyDataHolder.setWritableByteArrayValueByIndex(count, mdKey); + keys.update(row, (count == 0)); + if (keys.currentMDKey.length > 0) { + mdKeyHolder.setWritableByteArrayValueByIndex(count, keys.currentMDKey); } - // for storing the byte [] for high card. if (noDictionaryCount > 0 || complexIndexMap.size() > 0) { - noDictionaryKeyDataHolder.setWritableNonDictByteArrayValueByIndex(count, noDictionaryKey); + noDictionaryHolder.setWritableNonDictByteArrayValueByIndex(count, + keys.currentNoDictionaryKey); } + fillMeasureHolder(row, count, measureHolder, nullValueIndexBitSet); + stats.update(otherMeasureIndex, row, compactionFlow); + stats.update(customMeasureIndex, row, compactionFlow); + } - for (int k = 0; k < otherMeasureIndex.length; k++) { - if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) { - if (null == row[otherMeasureIndex[k]]) { - nullValueIndexBitSet[otherMeasureIndex[k]].set(count); - dataHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L); - } else { - dataHolder[otherMeasureIndex[k]] - .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]); - } + // generate encoded byte array for 3 holders + // for measure columns: encode and compress the measureHolder + WriterCompressModel compressionModel = + ValueCompressionUtil.getWriterCompressModel( + stats.max, stats.min, stats.decimal, stats.uniqueValue, type, new byte[measureCount]); + byte[][] encodedMeasureArray = + HeavyCompressedDoubleArrayDataStore.encodeMeasureDataArray( + compressionModel, measureHolder); + + // for mdkey and noDictionary, it is already in bytes, just get the array from holder + byte[][] mdKeyArray = mdKeyHolder.getByteArrayValues(); + byte[][][] noDictionaryArray = null; + if ((noDictionaryCount + complexColCount) > 0) { + noDictionaryArray = noDictionaryHolder.getNonDictByteArrayValues(); + } + + // create NodeHolder using these encoded byte arrays + NodeHolder nodeHolder = + createNodeHolderObjectWithOutKettle( + encodedMeasureArray, mdKeyArray, noDictionaryArray, dataRows.size(), + keys.startKey, keys.endKey, compressionModel, keys.noDictStartKey, keys.noDictEndKey, + nullValueIndexBitSet); + LOGGER.info("Number Of records processed: " + dataRows.size()); + return nodeHolder; + } + + private void fillMeasureHolder(Object[] row, int count, CarbonWriteDataHolder[] measureHolder, + BitSet[] nullValueIndexBitSet) { + for (int k = 0; k < otherMeasureIndex.length; k++) { + if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) { + if (null == row[otherMeasureIndex[k]]) { + nullValueIndexBitSet[otherMeasureIndex[k]].set(count); + measureHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L); } else { - if (null == row[otherMeasureIndex[k]]) { - nullValueIndexBitSet[otherMeasureIndex[k]].set(count); - dataHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0); - } else { - dataHolder[otherMeasureIndex[k]] - .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]); - } + measureHolder[otherMeasureIndex[k]] + .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]); } - } - calculateMaxMin(max, min, decimal, otherMeasureIndex, row); - for (int i = 0; i < customMeasureIndex.length; i++) { - if (null == row[customMeasureIndex[i]] - && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - BigDecimal val = BigDecimal.valueOf(0); - b = DataTypeUtil.bigDecimalToByte(val); - nullValueIndexBitSet[customMeasureIndex[i]].set(count); + } else { + if (null == row[otherMeasureIndex[k]]) { + nullValueIndexBitSet[otherMeasureIndex[k]].set(count); + measureHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0); } else { - if (this.compactionFlow) { - BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal(); - b = DataTypeUtil.bigDecimalToByte(bigDecimal); - } else { - b = (byte[]) row[customMeasureIndex[i]]; - } + measureHolder[otherMeasureIndex[k]] + .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]); } - BigDecimal value = DataTypeUtil.byteToBigDecimal(b); - String[] bigdVals = value.toPlainString().split("\\."); - long[] bigDvalue = new long[2]; - if (bigdVals.length == 2) { - bigDvalue[0] = Long.parseLong(bigdVals[0]); - BigDecimal bd = new BigDecimal(CarbonCommonConstants.POINT + bigdVals[1]); - bigDvalue[1] = (long) (bd.doubleValue() * Math.pow(10, value.scale())); + } + } + ByteBuffer byteBuffer = null; + byte[] measureBytes = null; + for (int i = 0; i < customMeasureIndex.length; i++) { + if (null == row[customMeasureIndex[i]] + && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { + measureBytes = DataTypeUtil.zeroBigDecimalBytes; + nullValueIndexBitSet[customMeasureIndex[i]].set(count); + } else { + if (this.compactionFlow) { + BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal(); + measureBytes = DataTypeUtil.bigDecimalToByte(bigDecimal); } else { - bigDvalue[0] = Long.parseLong(bigdVals[0]); + measureBytes = (byte[]) row[customMeasureIndex[i]]; } - byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE); - byteBuffer.putInt(b.length); - byteBuffer.put(b); - byteBuffer.flip(); - b = byteBuffer.array(); - dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b); } - calculateMaxMin(max, min, decimal, customMeasureIndex, row); - } - calculateUniqueValue(min, uniqueValue); - byte[][] byteArrayValues = keyDataHolder.getByteArrayValues().clone(); - byte[][][] noDictionaryValueHolder = null; - if ((noDictionaryCount + complexColCount) > 0) { - noDictionaryValueHolder = noDictionaryKeyDataHolder.getNonDictByteArrayValues(); + byteBuffer = ByteBuffer.allocate(measureBytes.length + + CarbonCommonConstants.INT_SIZE_IN_BYTE); + byteBuffer.putInt(measureBytes.length); + byteBuffer.put(measureBytes); + byteBuffer.flip(); + measureBytes = byteBuffer.array(); + measureHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, measureBytes); } - WriterCompressModel compressionModel = ValueCompressionUtil - .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[max.length]); - byte[][] writableMeasureDataArray = - StoreFactory.createDataStore(compressionModel).getWritableMeasureDataArray(dataHolder) - .clone(); - NodeHolder nodeHolder = - getNodeHolderObject(writableMeasureDataArray, byteArrayValues, dataRows.size(), - startKey, endKey, compressionModel, noDictionaryValueHolder, noDictStartKey, - noDictEndKey, nullValueIndexBitSet); - LOGGER.info("Number Of records processed: " + dataRows.size()); - return nodeHolder; } - private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, - byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal, - WriterCompressModel compressionModel, byte[][][] noDictionaryData, - byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet) + private NodeHolder createNodeHolderObjectWithOutKettle(byte[][] measureArray, byte[][] mdKeyArray, + byte[][][] noDictionaryArray, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal, + WriterCompressModel compressionModel, byte[][] noDictionaryStartKey, + byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet) throws CarbonDataWriterException { byte[][][] noDictionaryColumnsData = null; List> colsAndValues = new ArrayList>(); @@ -636,19 +706,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { colsAndValues.add(new ArrayList()); } int noOfColumn = colGrpModel.getNoOfColumnStore(); - DataHolder[] dataHolders = getDataHolders(noOfColumn, byteArrayValues.length); - for (int i = 0; i < byteArrayValues.length; i++) { - byte[][] splitKey = columnarSplitter.splitKey(byteArrayValues[i]); + DataHolder[] dataHolders = getDataHolders(noOfColumn, mdKeyArray.length); + for (int i = 0; i < mdKeyArray.length; i++) { + byte[][] splitKey = columnarSplitter.splitKey(mdKeyArray[i]); for (int j = 0; j < splitKey.length; j++) { dataHolders[j].addData(splitKey[j], i); } } if (noDictionaryCount > 0 || complexIndexMap.size() > 0) { - noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryData.length][]; - for (int i = 0; i < noDictionaryData.length; i++) { + noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryArray.length][]; + for (int i = 0; i < noDictionaryArray.length; i++) { int complexColumnIndex = primitiveDimLens.length + noDictionaryCount; - byte[][] splitKey = noDictionaryData[i]; + byte[][] splitKey = noDictionaryArray[i]; int complexTypeIndex = 0; for (int j = 0; j < splitKey.length; j++) { @@ -754,7 +824,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryEndKey); } return this.dataWriter - .buildDataNodeHolder(blockStorage, dataHolderLocal, entryCountLocal, startkeyLocal, + .buildDataNodeHolder(blockStorage, measureArray, entryCountLocal, startkeyLocal, endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey, nullValueIndexBitSet); } @@ -914,7 +984,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { for (int i = 0; i < msrIndex.length; i++) { int count = msrIndex[i]; if (row[count] != null) { - if (type[count] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) { double value = (double) row[count]; double maxVal = (double) max[count]; double minVal = (double) min[count]; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java deleted file mode 100644 index 0097483..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.store; - -import org.apache.carbondata.core.datastore.NodeMeasureDataStore; -import org.apache.carbondata.core.datastore.compression.WriterCompressModel; -import org.apache.carbondata.core.datastore.impl.data.compressed.HeavyCompressedDoubleArrayDataInMemoryStore; - -public final class StoreFactory { - - private StoreFactory() { - } - - public static NodeMeasureDataStore createDataStore(WriterCompressModel compressionModel) { - return new HeavyCompressedDoubleArrayDataInMemoryStore(compressionModel); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java index 227f92b..c8f740b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java @@ -31,7 +31,7 @@ public interface CarbonFactDataWriter { * file format * .... * - * @param dataArray measure array + * @param measureArray measure array * @param entryCount number of entries * @param startKey start key of leaf * @param endKey end key of leaf @@ -40,7 +40,7 @@ public interface CarbonFactDataWriter { * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem */ - NodeHolder buildDataNodeHolder(IndexStorage[] keyStorageArray, byte[][] dataArray, + NodeHolder buildDataNodeHolder(IndexStorage[] keyStorageArray, byte[][] measureArray, int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel, byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet) throws CarbonDataWriterException; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java index cf5311c..64077e2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java @@ -47,9 +47,10 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter { } @Override - public NodeHolder buildDataNodeHolder(IndexStorage[] keyStorageArray, byte[][] dataArray, - int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel, - byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet) + public NodeHolder buildDataNodeHolder(IndexStorage[] keyStorageArray, + byte[][] measureArray, int entryCount, byte[] startKey, byte[] endKey, + WriterCompressModel compressionModel, byte[] noDictionaryStartKey, + byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet) throws CarbonDataWriterException { // if there are no NO-Dictionary column present in the table then // set the empty byte array @@ -143,13 +144,13 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter { int[] msrLength = new int[dataWriterVo.getMeasureCount()]; // calculate the total size required for all the measure and get the // each measure size - for (int i = 0; i < dataArray.length; i++) { - currentMsrLenght = dataArray[i].length; + for (int i = 0; i < measureArray.length; i++) { + currentMsrLenght = measureArray[i].length; totalMsrArrySize += currentMsrLenght; msrLength[i] = currentMsrLenght; } NodeHolder holder = new NodeHolder(); - holder.setDataArray(dataArray); + holder.setDataArray(measureArray); holder.setKeyArray(keyBlockData); holder.setMeasureNullValueIndex(nullValueIndexBitSet); // end key format will be * be written in carbon data file */ @Override public NodeHolder buildDataNodeHolder(IndexStorage[] keyStorageArray, - byte[][] dataArray, int entryCount, byte[] startKey, byte[] endKey, + byte[][] measureArray, int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel, byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet) throws CarbonDataWriterException { // if there are no NO-Dictionary column present in the table then @@ -113,8 +113,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter byte[][] dimensionMinValue = new byte[keyStorageArray.length][]; byte[][] dimensionMaxValue = new byte[keyStorageArray.length][]; - byte[][] measureMinValue = new byte[dataArray.length][]; - byte[][] measureMaxValue = new byte[dataArray.length][]; + byte[][] measureMinValue = new byte[measureArray.length][]; + byte[][] measureMaxValue = new byte[measureArray.length][]; byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount); boolean[] colGrpBlock = new boolean[keyStorageArray.length]; @@ -137,7 +137,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter colGrpBlock[i] = true; } } - for (int i = 0; i < dataArray.length; i++) { + for (int i = 0; i < measureArray.length; i++) { measureMaxValue[i] = CarbonMetadataUtil .getByteValueForMeasure(compressionModel.getMaxValue()[i], dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType()); @@ -176,13 +176,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter int[] msrLength = new int[dataWriterVo.getMeasureCount()]; // calculate the total size required for all the measure and get the // each measure size - for (int i = 0; i < dataArray.length; i++) { - currentMsrLenght = dataArray[i].length; + for (int i = 0; i < measureArray.length; i++) { + currentMsrLenght = measureArray[i].length; totalMsrArrySize += currentMsrLenght; msrLength[i] = currentMsrLenght; } NodeHolder holder = new NodeHolder(); - holder.setDataArray(dataArray); + holder.setDataArray(measureArray); holder.setKeyArray(keyBlockData); holder.setMeasureNullValueIndex(nullValueIndexBitSet); // end key format will be