spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #19651: [SPARK-20682][SPARK-15474][SPARK-21791] Add new O...
Date Sat, 04 Nov 2017 16:34:01 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19651#discussion_r148935336
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
---
    @@ -39,3 +58,134 @@ private[sql] object OrcFileFormat {
         names.foreach(checkFieldName)
       }
     }
    +
    +class DefaultSource extends OrcFileFormat
    +
    +/**
    + * New ORC File Format based on Apache ORC 1.4.1 and above.
    + */
    +class OrcFileFormat
    +  extends FileFormat
    +  with DataSourceRegister
    +  with Serializable {
    +
    +  override def shortName(): String = "orc"
    +
    +  override def toString: String = "ORC_1.4"
    +
    +  override def hashCode(): Int = getClass.hashCode()
    +
    +  override def equals(other: Any): Boolean = other.isInstanceOf[OrcFileFormat]
    +
    +  override def inferSchema(
    +      sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    OrcUtils.readSchema(sparkSession, files)
    +  }
    +
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +    val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
    +
    +    val conf = job.getConfiguration
    +
    +    conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.getSchemaString(dataSchema))
    +
    +    conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
    +
    +    conf.asInstanceOf[JobConf]
    +      .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
    +
    +    new OutputWriterFactory {
    +      override def newInstance(
    +          path: String,
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        new OrcOutputWriter(path, dataSchema, context)
    +      }
    +
    +      override def getFileExtension(context: TaskAttemptContext): String = {
    +        val compressionExtension: String = {
    +          val name = context.getConfiguration.get(COMPRESS.getAttribute)
    +          OrcOptions.extensionsForCompressionCodecNames.getOrElse(name, "")
    +        }
    +
    +        compressionExtension + ".orc"
    +      }
    +    }
    +  }
    +
    +  override def isSplitable(
    +      sparkSession: SparkSession,
    +      options: Map[String, String],
    +      path: Path): Boolean = {
    +    true
    +  }
    +
    +  override def buildReaderWithPartitionValues(
    --- End diff --
    
    we should override `buildReader` and return `GenericInternalRow` here. Then the parent
class will merge the partition values and output `UnsafeRow`. This is what the current `OrcFileFormat`
does and let's keep it first.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message