carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-1433] Added Vectorized Reader for Presto Integration
Date Thu, 07 Sep 2017 13:07:47 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 0c519c425 -> 531dcd234


http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
new file mode 100644
index 0000000..e3985e0
--- /dev/null
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto
+
+import com.facebook.presto.spi.block.SliceArrayBlock
+import io.airlift.slice.{Slice, Slices}
+import io.airlift.slice.Slices._
+
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryChunksWrapper,
+DictionaryColumnUniqueIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+
+/**
+ * This is the class to decode dictionary encoded column data back to its original value.
+ */
+class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
+  private var dictionaries: Array[Dictionary] = _
+  private var dataTypes: Array[DataType] = _
+  private var dictionarySliceArray: Array[SliceArrayBlock] = _
+
+  /**
+   * This initialization is done inside executor task
+   * for column dictionary involved in decoding.
+   *
+   * @param carbonColumns           column list
+   * @param absoluteTableIdentifier table identifier
+   */
+
+  override def initialize(carbonColumns: Array[CarbonColumn],
+      absoluteTableIdentifier: AbsoluteTableIdentifier) {
+
+    dictionaries = new Array[Dictionary](carbonColumns.length)
+    dataTypes = new Array[DataType](carbonColumns.length)
+    dictionarySliceArray = new Array[SliceArrayBlock](carbonColumns.length)
+
+    carbonColumns.zipWithIndex.foreach {
+      case (carbonColumn, index) => if (carbonColumn.hasEncoding(Encoding.DICTIONARY)
&&
+                                        !carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)
&&
+                                        !carbonColumn.isComplex) {
+        val cacheProvider: CacheProvider = CacheProvider.getInstance
+        val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+          cacheProvider
+            .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath)
+        dataTypes(index) = carbonColumn.getDataType
+        dictionaries(index) = forwardDictionaryCache
+          .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier
+            .getCarbonTableIdentifier, carbonColumn.getColumnIdentifier))
+        dictionarySliceArray(index) = createSliceArrayBlock(dictionaries(index))
+
+      }
+      else {
+        dataTypes(index) = carbonColumn.getDataType
+      }
+    }
+
+  }
+
+  /**
+   * Function to create the SliceArrayBlock with dictionary Data
+   *
+   * @param dictionaryData
+   * @return
+   */
+  private def createSliceArrayBlock(dictionaryData: Dictionary): SliceArrayBlock = {
+    val chunks: DictionaryChunksWrapper = dictionaryData.getDictionaryChunks
+    val sliceArray = new Array[Slice](chunks.getSize + 1)
+    // Initialize Slice Array with Empty Slice as per Presto's code
+    sliceArray(0) = (Slices.EMPTY_SLICE)
+    var count = 1
+    while (chunks.hasNext) {
+      {
+        val value: Array[Byte] = chunks.next
+        if (count == 1) {
+          sliceArray(count + 1) = null
+        }
+        else {
+          sliceArray(count) = wrappedBuffer(value, 0, value.length)
+        }
+        count += 1
+      }
+    }
+    new SliceArrayBlock(sliceArray.length, sliceArray, true)
+  }
+
+  override def readRow(data: Array[AnyRef]): T = {
+    throw new RuntimeException("UnSupported Method Call Convert Column Instead")
+  }
+
+  def convertColumn(data: Array[AnyRef], columnNo: Int): T = {
+    val convertedData = if (Option(dictionaries(columnNo)).isDefined) {
+      data.map { value =>
+        DataTypeUtil
+          .getDataBasedOnDataType(dictionaries(columnNo)
+            .getDictionaryValueForKey(value.asInstanceOf[Int]), DataType.STRING)
+      }
+    } else {
+      data
+    }
+    convertedData.asInstanceOf[T]
+  }
+
+  /**
+   * Function to get the SliceArrayBlock with dictionary Data
+   *
+   * @param columnNo
+   * @return
+   */
+  def getSliceArrayBlock(columnNo: Int): SliceArrayBlock = {
+    dictionarySliceArray(columnNo)
+  }
+
+  /**
+   * to book keep the dictionary cache or update access count for each
+   * column involved during decode, to facilitate LRU cache policy if memory
+   * threshold is reached
+   */
+  override def close() {
+    dictionaries
+      .foreach(dictionary => if (Option(dictionary).isDefined) {
+        CarbonUtil
+          .clearDictionaryCache(dictionary)
+      })
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
deleted file mode 100644
index a3244ae..0000000
--- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.presto
-
-import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
-import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataType
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.util.path.CarbonStorePath
-
-class CarbonDictionaryDecodeReaderSupport[T] {
-
-  def initialize(carbonColumns: Array[CarbonColumn],
-      absoluteTableIdentifier: AbsoluteTableIdentifier): Array[(DataType, Dictionary, Int)]
= {
-
-    carbonColumns.zipWithIndex.filter(dictChecker(_)).map { carbonColumnWithIndex =>
-      val (carbonColumn, index) = carbonColumnWithIndex
-      val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
-        CacheProvider.getInstance()
-          .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier
-            .getStorePath)
-      val dict: Dictionary = forwardDictionaryCache
-        .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier,
-          carbonColumn.getColumnIdentifier,
-          carbonColumn.getColumnIdentifier.getDataType,
-          CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)))
-      (carbonColumn.getDataType, dict, index)
-    }
-  }
-
-  private def dictChecker(carbonColumWithIndex: (CarbonColumn, Int)): Boolean = {
-    val (carbonColumn, _) = carbonColumWithIndex
-    if (!carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumn.isComplex
&&
-        carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
-      true
-    } else {
-      false
-    }
-  }
-
-  def readRow(data: Array[Object],
-      dictionaries: Array[(DataType, Dictionary, Int)]): Array[Object] = {
-    dictionaries.foreach { (dictionary: (DataType, Dictionary, Int)) =>
-      val (_, dict, position) = dictionary
-      data(position) = dict.getDictionaryValueForKey(data(position).asInstanceOf[Int])
-    }
-    data
-  }
-
-}


Mime
View raw message