Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8F582200CF0 for ; Thu, 7 Sep 2017 15:07:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8E5231609BB; Thu, 7 Sep 2017 13:07:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AD6411609A7 for ; Thu, 7 Sep 2017 15:07:52 +0200 (CEST) Received: (qmail 8675 invoked by uid 500); 7 Sep 2017 13:07:49 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 8631 invoked by uid 99); 7 Sep 2017 13:07:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Sep 2017 13:07:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7AA62F55C9; Thu, 7 Sep 2017 13:07:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chenliang613@apache.org To: commits@carbondata.apache.org Date: Thu, 07 Sep 2017 13:07:47 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] carbondata git commit: [CARBONDATA-1433] Added Vectorized Reader for Presto Integration archived-at: Thu, 07 Sep 2017 13:07:53 -0000 Repository: carbondata Updated Branches: refs/heads/master 0c519c425 -> 531dcd234 http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala new file mode 100644 index 0000000..e3985e0 --- /dev/null +++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.presto + +import com.facebook.presto.spi.block.SliceArrayBlock +import io.airlift.slice.{Slice, Slices} +import io.airlift.slice.Slices._ + +import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} +import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryChunksWrapper, +DictionaryColumnUniqueIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil} +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport + +/** + * This is the class to decode dictionary encoded column data back to its original value. + */ +class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] { + private var dictionaries: Array[Dictionary] = _ + private var dataTypes: Array[DataType] = _ + private var dictionarySliceArray: Array[SliceArrayBlock] = _ + + /** + * This initialization is done inside executor task + * for column dictionary involved in decoding. + * + * @param carbonColumns column list + * @param absoluteTableIdentifier table identifier + */ + + override def initialize(carbonColumns: Array[CarbonColumn], + absoluteTableIdentifier: AbsoluteTableIdentifier) { + + dictionaries = new Array[Dictionary](carbonColumns.length) + dataTypes = new Array[DataType](carbonColumns.length) + dictionarySliceArray = new Array[SliceArrayBlock](carbonColumns.length) + + carbonColumns.zipWithIndex.foreach { + case (carbonColumn, index) => if (carbonColumn.hasEncoding(Encoding.DICTIONARY) && + !carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) && + !carbonColumn.isComplex) { + val cacheProvider: CacheProvider = CacheProvider.getInstance + val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = + cacheProvider + .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath) + dataTypes(index) = carbonColumn.getDataType + dictionaries(index) = forwardDictionaryCache + .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier + .getCarbonTableIdentifier, carbonColumn.getColumnIdentifier)) + dictionarySliceArray(index) = createSliceArrayBlock(dictionaries(index)) + + } + else { + dataTypes(index) = carbonColumn.getDataType + } + } + + } + + /** + * Function to create the SliceArrayBlock with dictionary Data + * + * @param dictionaryData + * @return + */ + private def createSliceArrayBlock(dictionaryData: Dictionary): SliceArrayBlock = { + val chunks: DictionaryChunksWrapper = dictionaryData.getDictionaryChunks + val sliceArray = new Array[Slice](chunks.getSize + 1) + // Initialize Slice Array with Empty Slice as per Presto's code + sliceArray(0) = (Slices.EMPTY_SLICE) + var count = 1 + while (chunks.hasNext) { + { + val value: Array[Byte] = chunks.next + if (count == 1) { + sliceArray(count + 1) = null + } + else { + sliceArray(count) = wrappedBuffer(value, 0, value.length) + } + count += 1 + } + } + new SliceArrayBlock(sliceArray.length, sliceArray, true) + } + + override def readRow(data: Array[AnyRef]): T = { + throw new RuntimeException("UnSupported Method Call Convert Column Instead") + } + + def convertColumn(data: Array[AnyRef], columnNo: Int): T = { + val convertedData = if (Option(dictionaries(columnNo)).isDefined) { + data.map { value => + DataTypeUtil + .getDataBasedOnDataType(dictionaries(columnNo) + .getDictionaryValueForKey(value.asInstanceOf[Int]), DataType.STRING) + } + } else { + data + } + convertedData.asInstanceOf[T] + } + + /** + * Function to get the SliceArrayBlock with dictionary Data + * + * @param columnNo + * @return + */ + def getSliceArrayBlock(columnNo: Int): SliceArrayBlock = { + dictionarySliceArray(columnNo) + } + + /** + * to book keep the dictionary cache or update access count for each + * column involved during decode, to facilitate LRU cache policy if memory + * threshold is reached + */ + override def close() { + dictionaries + .foreach(dictionary => if (Option(dictionary).isDefined) { + CarbonUtil + .clearDictionaryCache(dictionary) + }) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala deleted file mode 100644 index a3244ae..0000000 --- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.presto - -import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} -import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.datatype.DataType -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn -import org.apache.carbondata.core.util.path.CarbonStorePath - -class CarbonDictionaryDecodeReaderSupport[T] { - - def initialize(carbonColumns: Array[CarbonColumn], - absoluteTableIdentifier: AbsoluteTableIdentifier): Array[(DataType, Dictionary, Int)] = { - - carbonColumns.zipWithIndex.filter(dictChecker(_)).map { carbonColumnWithIndex => - val (carbonColumn, index) = carbonColumnWithIndex - val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = - CacheProvider.getInstance() - .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier - .getStorePath) - val dict: Dictionary = forwardDictionaryCache - .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier, - carbonColumn.getColumnIdentifier, - carbonColumn.getColumnIdentifier.getDataType, - CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier))) - (carbonColumn.getDataType, dict, index) - } - } - - private def dictChecker(carbonColumWithIndex: (CarbonColumn, Int)): Boolean = { - val (carbonColumn, _) = carbonColumWithIndex - if (!carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumn.isComplex && - carbonColumn.hasEncoding(Encoding.DICTIONARY)) { - true - } else { - false - } - } - - def readRow(data: Array[Object], - dictionaries: Array[(DataType, Dictionary, Int)]): Array[Object] = { - dictionaries.foreach { (dictionary: (DataType, Dictionary, Int)) => - val (_, dict, position) = dictionary - data(position) = dict.getDictionaryValueForKey(data(position).asInstanceOf[Int]) - } - data - } - -}