spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liancheng <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-14206][SQL] buildReader() implementatio...
Date Mon, 28 Mar 2016 16:51:53 GMT
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12002#discussion_r57595359
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
---
    @@ -91,6 +96,70 @@ class DefaultSource extends FileFormat with DataSourceRegister {
         new CSVOutputWriterFactory(csvOptions)
       }
     
    +  override def buildReader(
    +      sqlContext: SQLContext,
    +      physicalSchema: StructType,
    +      partitionSchema: StructType,
    +      dataSchema: StructType,
    +      filters: Seq[Filter],
    +      options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] =
{
    +    val csvOptions = new CSVOptions(options)
    +    val headers = dataSchema.fields.map(_.name)
    +
    +    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
    +    val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
    +
    +    (file: PartitionedFile) => {
    +      val fileSplit = {
    +        val filePath = new Path(new URI(file.filePath))
    +        new FileSplit(filePath, file.start, file.length, Array.empty)
    +      }
    +
    +      val hadoopAttemptContext = {
    +        val conf = broadcastedConf.value.value
    +        val attemptID = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
    +        new TaskAttemptContextImpl(conf, attemptID)
    +      }
    +
    +      val reader = new LineRecordReader()
    +      reader.initialize(fileSplit, hadoopAttemptContext)
    +
    +      val lineIterator = new RecordReaderIterator(reader).map { line =>
    +        new String(line.getBytes, 0, line.getLength, csvOptions.charset)
    +      }
    +
    +      // Skips the header line of each file if the `header` option is set to true.
    +      // TODO What if the first partitioned file consists of only comments and empty
lines?
    --- End diff --
    
    Unlike RDD, here we no longer have a global view of all the input data. Thus filtering
the first header line becomes difficult. Here we only handle the simplest and most common
use case, namely we can find a valid header in the first block of any given input file. If
the first block contains only comments and empty lines, this branch fails. Haven't got a good
solution for this yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message