spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #19651: [SPARK-20682][SPARK-15474][SPARK-21791] Add new O...
Date Sat, 11 Nov 2017 01:03:49 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19651#discussion_r150368108
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.orc
    +
    +import org.apache.hadoop.io._
    +import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
    +import org.apache.orc.storage.common.`type`.HiveDecimal
    +import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.datasources.orc.OrcUtils.getTypeDescription
    +import org.apache.spark.sql.types._
    +import org.apache.spark.unsafe.types.UTF8String
    +
    +private[orc] class OrcSerializer(dataSchema: StructType) {
    +
    +  private[this] lazy val orcStruct: OrcStruct =
    +    createOrcValue(dataSchema).asInstanceOf[OrcStruct]
    +
    +  private[this] val writableWrappers =
    +    dataSchema.fields.map(f => getWritableWrapper(f.dataType))
    +
    +  def serialize(row: InternalRow): OrcStruct = {
    +    convertInternalRowToOrcStruct(row, dataSchema, Some(writableWrappers), Some(orcStruct))
    +  }
    +
    +  /**
    +   * Return a Orc value object for the given Spark schema.
    +   */
    +  private[this] def createOrcValue(dataType: DataType) =
    +    OrcStruct.createValue(getTypeDescription(dataType))
    +
    +  /**
    +   * Convert Apache Spark InternalRow to Apache ORC OrcStruct.
    +   */
    +  private[this] def convertInternalRowToOrcStruct(
    +      row: InternalRow,
    +      schema: StructType,
    +      valueWrappers: Option[Seq[Any => Any]] = None,
    +      struct: Option[OrcStruct] = None): OrcStruct = {
    +    val wrappers =
    +      valueWrappers.getOrElse(schema.fields.map(_.dataType).map(getWritableWrapper).toSeq)
    +    val orcStruct = struct.getOrElse(createOrcValue(schema).asInstanceOf[OrcStruct])
    +
    +    for (schemaIndex <- 0 until schema.length) {
    +      val fieldType = schema(schemaIndex).dataType
    +      if (row.isNullAt(schemaIndex)) {
    +        orcStruct.setFieldValue(schemaIndex, null)
    +      } else {
    +        val field = row.get(schemaIndex, fieldType)
    +        val fieldValue = wrappers(schemaIndex)(field).asInstanceOf[WritableComparable[_]]
    +        orcStruct.setFieldValue(schemaIndex, fieldValue)
    +      }
    +    }
    +    orcStruct
    +  }
    +
    +  private[this] def withNullSafe(f: Any => Any): Any => Any = {
    +    input => if (input == null) null else f(input)
    +  }
    +
    +  /**
    +   * Builds a WritableComparable-return function ahead of time according to DataType
    +   * to avoid pattern matching and branching costs per row.
    +   */
    +  private[this] def getWritableWrapper(dataType: DataType): Any => Any = dataType
match {
    +    case NullType => _ => null
    +
    +    case BooleanType => withNullSafe(o => new BooleanWritable(o.asInstanceOf[Boolean]))
    +
    +    case ByteType => withNullSafe(o => new ByteWritable(o.asInstanceOf[Byte]))
    --- End diff --
    
    we can apply the same technology: pass `SpecializedGetters` as a parameter to avoid boxing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message