spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [4/4] spark git commit: [SPARK-15543][SQL] Rename DefaultSources to make them more self-describing
Date Thu, 26 May 2016 06:54:31 GMT
[SPARK-15543][SQL] Rename DefaultSources to make them more self-describing

## What changes were proposed in this pull request?
This patch renames various DefaultSources to make their names more self-describing. The choice of "DefaultSource" was from the days when we did not have a good way to specify short names.

They are now named:
- LibSVMFileFormat
- CSVFileFormat
- JdbcRelationProvider
- JsonFileFormat
- ParquetFileFormat
- TextFileFormat

Backward compatibility is maintained through aliasing.

## How was this patch tested?
Updated relevant test cases too.

Author: Reynold Xin <rxin@databricks.com>

Closes #13311 from rxin/SPARK-15543.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/361ebc28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/361ebc28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/361ebc28

Branch: refs/heads/master
Commit: 361ebc282b2d09dc6dcf21419a53c5c617b1b6bd
Parents: dfc9fc0
Author: Reynold Xin <rxin@databricks.com>
Authored: Wed May 25 23:54:24 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed May 25 23:54:24 2016 -0700

----------------------------------------------------------------------
 ....apache.spark.sql.sources.DataSourceRegister |   2 +-
 .../spark/ml/source/libsvm/LibSVMRelation.scala |   8 +-
 project/MimaExcludes.scala                      |   4 +-
 ....apache.spark.sql.sources.DataSourceRegister |  10 +-
 .../spark/sql/execution/ExistingRDD.scala       |   2 +-
 .../sql/execution/datasources/DataSource.scala  |  45 +-
 .../datasources/csv/CSVFileFormat.scala         | 185 ++++
 .../datasources/csv/DefaultSource.scala         | 187 ----
 .../datasources/jdbc/DefaultSource.scala        |  55 --
 .../datasources/jdbc/JdbcRelationProvider.scala |  55 ++
 .../datasources/json/JSONRelation.scala         | 198 ----
 .../datasources/json/JsonFileFormat.scala       | 198 ++++
 .../datasources/parquet/ParquetFileFormat.scala | 923 +++++++++++++++++++
 .../datasources/parquet/ParquetRelation.scala   | 923 -------------------
 .../datasources/text/DefaultSource.scala        | 141 ---
 .../datasources/text/TextFileFormat.scala       | 141 +++
 .../execution/datasources/json/JsonSuite.scala  |   4 +-
 .../parquet/ParquetSchemaSuite.scala            |  12 +-
 .../sql/sources/ResolvedDataSourceSuite.scala   |  18 +-
 .../streaming/DataFrameReaderWriterSuite.scala  | 542 -----------
 .../sql/streaming/FileStreamSinkSuite.scala     |   4 +-
 .../test/DataFrameReaderWriterSuite.scala       | 541 +++++++++++
 ....apache.spark.sql.sources.DataSourceRegister |   2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  18 +-
 .../spark/sql/hive/orc/OrcFileFormat.scala      | 375 ++++++++
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 371 --------
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala |   2 +-
 27 files changed, 2498 insertions(+), 2468 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index f632dd6..a865cbe 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1 @@
-org.apache.spark.ml.source.libsvm.DefaultSource
+org.apache.spark.ml.source.libsvm.LibSVMFileFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 5ba768d..64ebf0c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -90,7 +90,7 @@ private[libsvm] class LibSVMOutputWriter(
  *     .load("data/mllib/sample_libsvm_data.txt")
  *
  *   // Java
- *   DataFrame df = spark.read().format("libsvm")
+ *   Dataset<Row> df = spark.read().format("libsvm")
  *     .option("numFeatures, "780")
  *     .load("data/mllib/sample_libsvm_data.txt");
  * }}}
@@ -105,9 +105,13 @@ private[libsvm] class LibSVMOutputWriter(
  *  - "vectorType": feature vector type, "sparse" (default) or "dense".
  *
  *  @see [[https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/ LIBSVM datasets]]
+ *
+ * Note that this class is public for documentation purpose. Please don't use this class directly.
+ * Rather, use the data source API as illustrated above.
  */
+// If this is moved or renamed, please update DataSource's backwardCompatibilityMap.
 @Since("1.6.0")
-class DefaultSource extends FileFormat with DataSourceRegister {
+class LibSVMFileFormat extends FileFormat with DataSourceRegister {
 
   @Since("1.6.0")
   override def shortName(): String = "libsvm"

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4e99a09..08c575a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -68,7 +68,9 @@ object MimaExcludes {
         // SPARK-13664 Replace HadoopFsRelation with FileFormat
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.LibSVMRelation"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelationProvider"),
-        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache")
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache"),
+        // SPARK-15543 Rename DefaultSources to make them more self-describing
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.DefaultSource")
       ) ++ Seq(
         ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"),

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index ef92557..9f8bb5d 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,6 +1,6 @@
-org.apache.spark.sql.execution.datasources.csv.DefaultSource
-org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
-org.apache.spark.sql.execution.datasources.json.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.DefaultSource
-org.apache.spark.sql.execution.datasources.text.DefaultSource
+org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
+org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index ec23a9c..412f5fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
-import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d0853f6..dfe0647 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,6 +30,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -74,15 +78,34 @@ case class DataSource(
   lazy val sourceInfo = sourceSchema()
 
   /** A map to maintain backward compatibility in case we move data sources around. */
-  private val backwardCompatibilityMap = Map(
-    "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
-    "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName,
-    "com.databricks.spark.csv" -> classOf[csv.DefaultSource].getCanonicalName
-  )
+  private val backwardCompatibilityMap: Map[String, String] = {
+    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
+    val json = classOf[JsonFileFormat].getCanonicalName
+    val parquet = classOf[ParquetFileFormat].getCanonicalName
+    val csv = classOf[CSVFileFormat].getCanonicalName
+    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
+    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+    Map(
+      "org.apache.spark.sql.jdbc" -> jdbc,
+      "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
+      "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
+      "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
+      "org.apache.spark.sql.json" -> json,
+      "org.apache.spark.sql.json.DefaultSource" -> json,
+      "org.apache.spark.sql.execution.datasources.json" -> json,
+      "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
+      "org.apache.spark.sql.parquet" -> parquet,
+      "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
+      "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
+      "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
+      "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
+      "org.apache.spark.sql.hive.orc" -> orc,
+      "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
+      "org.apache.spark.ml.source.libsvm" -> libsvm,
+      "com.databricks.spark.csv" -> csv
+    )
+  }
 
   /**
    * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
@@ -188,7 +211,7 @@ case class DataSource(
           throw new IllegalArgumentException("'path' is not specified")
         })
         val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
-        val isTextSource = providingClass == classOf[text.DefaultSource]
+        val isTextSource = providingClass == classOf[text.TextFileFormat]
         // If the schema inference is disabled, only text sources require schema to be specified
         if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
           throw new IllegalArgumentException(
@@ -229,7 +252,7 @@ case class DataSource(
     providingClass.newInstance() match {
       case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns)
 
-      case parquet: parquet.DefaultSource =>
+      case parquet: parquet.ParquetFileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
new file mode 100644
index 0000000..4d36b76
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.nio.charset.{Charset, StandardCharsets}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Provides access to CSV data from pure SQL statements.
+ */
+class CSVFileFormat extends FileFormat with DataSourceRegister {
+
+  override def shortName(): String = "csv"
+
+  override def toString: String = "CSV"
+
+  override def hashCode(): Int = getClass.hashCode()
+
+  override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]
+
+  override def inferSchema(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    val csvOptions = new CSVOptions(options)
+
+    // TODO: Move filtering.
+    val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString)
+    val rdd = baseRdd(sparkSession, csvOptions, paths)
+    val firstLine = findFirstLine(csvOptions, rdd)
+    val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine)
+
+    val header = if (csvOptions.headerFlag) {
+      firstRow.zipWithIndex.map { case (value, index) =>
+        if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value
+      }
+    } else {
+      firstRow.zipWithIndex.map { case (value, index) => s"_c$index" }
+    }
+
+    val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
+    val schema = if (csvOptions.inferSchemaFlag) {
+      CSVInferSchema.infer(parsedRdd, header, csvOptions)
+    } else {
+      // By default fields are assumed to be StringType
+      val schemaFields = header.map { fieldName =>
+        StructField(fieldName.toString, StringType, nullable = true)
+      }
+      StructType(schemaFields)
+    }
+    Some(schema)
+  }
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    verifySchema(dataSchema)
+    val conf = job.getConfiguration
+    val csvOptions = new CSVOptions(options)
+    csvOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+
+    new CSVOutputWriterFactory(csvOptions)
+  }
+
+  override def buildReader(
+      sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+    val csvOptions = new CSVOptions(options)
+    val headers = requiredSchema.fields.map(_.name)
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    (file: PartitionedFile) => {
+      val lineIterator = {
+        val conf = broadcastedHadoopConf.value.value
+        new HadoopFileLinesReader(file, conf).map { line =>
+          new String(line.getBytes, 0, line.getLength, csvOptions.charset)
+        }
+      }
+
+      CSVRelation.dropHeaderLine(file, lineIterator, csvOptions)
+
+      val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
+      val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
+      tokenizedIterator.flatMap(parser(_).toSeq)
+    }
+  }
+
+  private def baseRdd(
+      sparkSession: SparkSession,
+      options: CSVOptions,
+      inputPaths: Seq[String]): RDD[String] = {
+    readText(sparkSession, options, inputPaths.mkString(","))
+  }
+
+  private def tokenRdd(
+      sparkSession: SparkSession,
+      options: CSVOptions,
+      header: Array[String],
+      inputPaths: Seq[String]): RDD[Array[String]] = {
+    val rdd = baseRdd(sparkSession, options, inputPaths)
+    // Make sure firstLine is materialized before sending to executors
+    val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null
+    CSVRelation.univocityTokenizer(rdd, header, firstLine, options)
+  }
+
+  /**
+   * Returns the first line of the first non-empty file in path
+   */
+  private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = {
+    if (options.isCommentSet) {
+      val comment = options.comment.toString
+      rdd.filter { line =>
+        line.trim.nonEmpty && !line.startsWith(comment)
+      }.first()
+    } else {
+      rdd.filter { line =>
+        line.trim.nonEmpty
+      }.first()
+    }
+  }
+
+  private def readText(
+      sparkSession: SparkSession,
+      options: CSVOptions,
+      location: String): RDD[String] = {
+    if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
+      sparkSession.sparkContext.textFile(location)
+    } else {
+      val charset = options.charset
+      sparkSession.sparkContext
+        .hadoopFile[LongWritable, Text, TextInputFormat](location)
+        .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
+    }
+  }
+
+  private def verifySchema(schema: StructType): Unit = {
+    schema.foreach { field =>
+      field.dataType match {
+        case _: ArrayType | _: MapType | _: StructType =>
+          throw new UnsupportedOperationException(
+            s"CSV data source does not support ${field.dataType.simpleString} data type.")
+        case _ =>
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
deleted file mode 100644
index 057bde1..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.csv
-
-import java.nio.charset.{Charset, StandardCharsets}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce._
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
-
-/**
- * Provides access to CSV data from pure SQL statements.
- */
-class DefaultSource extends FileFormat with DataSourceRegister {
-
-  override def shortName(): String = "csv"
-
-  override def toString: String = "CSV"
-
-  override def hashCode(): Int = getClass.hashCode()
-
-  override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
-
-  override def inferSchema(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Option[StructType] = {
-    val csvOptions = new CSVOptions(options)
-
-    // TODO: Move filtering.
-    val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString)
-    val rdd = baseRdd(sparkSession, csvOptions, paths)
-    val firstLine = findFirstLine(csvOptions, rdd)
-    val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine)
-
-    val header = if (csvOptions.headerFlag) {
-      firstRow.zipWithIndex.map { case (value, index) =>
-        if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value
-      }
-    } else {
-      firstRow.zipWithIndex.map { case (value, index) => s"_c$index" }
-    }
-
-    val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
-    val schema = if (csvOptions.inferSchemaFlag) {
-      CSVInferSchema.infer(parsedRdd, header, csvOptions)
-    } else {
-      // By default fields are assumed to be StringType
-      val schemaFields = header.map { fieldName =>
-        StructField(fieldName.toString, StringType, nullable = true)
-      }
-      StructType(schemaFields)
-    }
-    Some(schema)
-  }
-
-  override def prepareWrite(
-      sparkSession: SparkSession,
-      job: Job,
-      options: Map[String, String],
-      dataSchema: StructType): OutputWriterFactory = {
-    verifySchema(dataSchema)
-    val conf = job.getConfiguration
-    val csvOptions = new CSVOptions(options)
-    csvOptions.compressionCodec.foreach { codec =>
-      CompressionCodecs.setCodecConfiguration(conf, codec)
-    }
-
-    new CSVOutputWriterFactory(csvOptions)
-  }
-
-  override def buildReader(
-      sparkSession: SparkSession,
-      dataSchema: StructType,
-      partitionSchema: StructType,
-      requiredSchema: StructType,
-      filters: Seq[Filter],
-      options: Map[String, String],
-      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
-    val csvOptions = new CSVOptions(options)
-    val headers = requiredSchema.fields.map(_.name)
-
-    val broadcastedHadoopConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-
-    (file: PartitionedFile) => {
-      val lineIterator = {
-        val conf = broadcastedHadoopConf.value.value
-        new HadoopFileLinesReader(file, conf).map { line =>
-          new String(line.getBytes, 0, line.getLength, csvOptions.charset)
-        }
-      }
-
-      CSVRelation.dropHeaderLine(file, lineIterator, csvOptions)
-
-      val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
-      val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
-      tokenizedIterator.flatMap(parser(_).toSeq)
-    }
-  }
-
-  private def baseRdd(
-      sparkSession: SparkSession,
-      options: CSVOptions,
-      inputPaths: Seq[String]): RDD[String] = {
-    readText(sparkSession, options, inputPaths.mkString(","))
-  }
-
-  private def tokenRdd(
-      sparkSession: SparkSession,
-      options: CSVOptions,
-      header: Array[String],
-      inputPaths: Seq[String]): RDD[Array[String]] = {
-    val rdd = baseRdd(sparkSession, options, inputPaths)
-    // Make sure firstLine is materialized before sending to executors
-    val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null
-    CSVRelation.univocityTokenizer(rdd, header, firstLine, options)
-  }
-
-  /**
-   * Returns the first line of the first non-empty file in path
-   */
-  private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = {
-    if (options.isCommentSet) {
-      val comment = options.comment.toString
-      rdd.filter { line =>
-        line.trim.nonEmpty && !line.startsWith(comment)
-      }.first()
-    } else {
-      rdd.filter { line =>
-        line.trim.nonEmpty
-      }.first()
-    }
-  }
-
-  private def readText(
-      sparkSession: SparkSession,
-      options: CSVOptions,
-      location: String): RDD[String] = {
-    if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
-      sparkSession.sparkContext.textFile(location)
-    } else {
-      val charset = options.charset
-      sparkSession.sparkContext
-        .hadoopFile[LongWritable, Text, TextInputFormat](location)
-        .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
-    }
-  }
-
-  private def verifySchema(schema: StructType): Unit = {
-    schema.foreach { field =>
-      field.dataType match {
-        case _: ArrayType | _: MapType | _: StructType =>
-          throw new UnsupportedOperationException(
-            s"CSV data source does not support ${field.dataType.simpleString} data type.")
-        case _ =>
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
deleted file mode 100644
index 6609e5d..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.jdbc
-
-import java.util.Properties
-
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
-
-class DefaultSource extends RelationProvider with DataSourceRegister {
-
-  override def shortName(): String = "jdbc"
-
-  /** Returns a new base relation with the given parameters. */
-  override def createRelation(
-      sqlContext: SQLContext,
-      parameters: Map[String, String]): BaseRelation = {
-    val jdbcOptions = new JDBCOptions(parameters)
-    if (jdbcOptions.partitionColumn != null
-      && (jdbcOptions.lowerBound == null
-        || jdbcOptions.upperBound == null
-        || jdbcOptions.numPartitions == null)) {
-      sys.error("Partitioning incompletely specified")
-    }
-
-    val partitionInfo = if (jdbcOptions.partitionColumn == null) {
-      null
-    } else {
-      JDBCPartitioningInfo(
-        jdbcOptions.partitionColumn,
-        jdbcOptions.lowerBound.toLong,
-        jdbcOptions.upperBound.toLong,
-        jdbcOptions.numPartitions.toInt)
-    }
-    val parts = JDBCRelation.columnPartition(partitionInfo)
-    val properties = new Properties() // Additional properties that we will pass to getConnection
-    parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-    JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
new file mode 100644
index 0000000..106ed1d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.util.Properties
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
+
+class JdbcRelationProvider extends RelationProvider with DataSourceRegister {
+
+  override def shortName(): String = "jdbc"
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    val jdbcOptions = new JDBCOptions(parameters)
+    if (jdbcOptions.partitionColumn != null
+      && (jdbcOptions.lowerBound == null
+        || jdbcOptions.upperBound == null
+        || jdbcOptions.numPartitions == null)) {
+      sys.error("Partitioning incompletely specified")
+    }
+
+    val partitionInfo = if (jdbcOptions.partitionColumn == null) {
+      null
+    } else {
+      JDBCPartitioningInfo(
+        jdbcOptions.partitionColumn,
+        jdbcOptions.lowerBound.toLong,
+        jdbcOptions.upperBound.toLong,
+        jdbcOptions.numPartitions.toInt)
+    }
+    val parts = JDBCRelation.columnPartition(partitionInfo)
+    val properties = new Properties() // Additional properties that we will pass to getConnection
+    parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
+    JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
deleted file mode 100644
index 4c97abe..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import java.io.CharArrayWriter
-
-import com.fasterxml.jackson.core.JsonFactory
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
-import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
-import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
-
-class DefaultSource extends FileFormat with DataSourceRegister {
-
-  override def shortName(): String = "json"
-
-  override def inferSchema(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Option[StructType] = {
-    if (files.isEmpty) {
-      None
-    } else {
-      val parsedOptions: JSONOptions = new JSONOptions(options)
-      val columnNameOfCorruptRecord =
-        parsedOptions.columnNameOfCorruptRecord
-          .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-      val jsonFiles = files.filterNot { status =>
-        val name = status.getPath.getName
-        name.startsWith("_") || name.startsWith(".")
-      }.toArray
-
-      val jsonSchema = InferSchema.infer(
-        createBaseRdd(sparkSession, jsonFiles),
-        columnNameOfCorruptRecord,
-        parsedOptions)
-      checkConstraints(jsonSchema)
-
-      Some(jsonSchema)
-    }
-  }
-
-  override def prepareWrite(
-      sparkSession: SparkSession,
-      job: Job,
-      options: Map[String, String],
-      dataSchema: StructType): OutputWriterFactory = {
-    val conf = job.getConfiguration
-    val parsedOptions: JSONOptions = new JSONOptions(options)
-    parsedOptions.compressionCodec.foreach { codec =>
-      CompressionCodecs.setCodecConfiguration(conf, codec)
-    }
-
-    new OutputWriterFactory {
-      override def newInstance(
-          path: String,
-          bucketId: Option[Int],
-          dataSchema: StructType,
-          context: TaskAttemptContext): OutputWriter = {
-        new JsonOutputWriter(path, bucketId, dataSchema, context)
-      }
-    }
-  }
-
-  override def buildReader(
-      sparkSession: SparkSession,
-      dataSchema: StructType,
-      partitionSchema: StructType,
-      requiredSchema: StructType,
-      filters: Seq[Filter],
-      options: Map[String, String],
-      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    val broadcastedHadoopConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-
-    val parsedOptions: JSONOptions = new JSONOptions(options)
-    val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
-      .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-
-    (file: PartitionedFile) => {
-      val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
-
-      JacksonParser.parseJson(
-        lines,
-        requiredSchema,
-        columnNameOfCorruptRecord,
-        parsedOptions)
-    }
-  }
-
-  private def createBaseRdd(
-      sparkSession: SparkSession,
-      inputPaths: Seq[FileStatus]): RDD[String] = {
-    val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
-    val conf = job.getConfiguration
-
-    val paths = inputPaths.map(_.getPath)
-
-    if (paths.nonEmpty) {
-      FileInputFormat.setInputPaths(job, paths: _*)
-    }
-
-    sparkSession.sparkContext.hadoopRDD(
-      conf.asInstanceOf[JobConf],
-      classOf[TextInputFormat],
-      classOf[LongWritable],
-      classOf[Text]).map(_._2.toString) // get the text line
-  }
-
-  /** Constraints to be imposed on schema to be stored. */
-  private def checkConstraints(schema: StructType): Unit = {
-    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
-      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }.mkString(", ")
-      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
-          s"cannot save to JSON format")
-    }
-  }
-
-  override def toString: String = "JSON"
-
-  override def hashCode(): Int = getClass.hashCode()
-
-  override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
-}
-
-private[json] class JsonOutputWriter(
-    path: String,
-    bucketId: Option[Int],
-    dataSchema: StructType,
-    context: TaskAttemptContext)
-  extends OutputWriter with Logging {
-
-  private[this] val writer = new CharArrayWriter()
-  // create the Generator without separator inserted between 2 records
-  private[this] val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
-  private[this] val result = new Text()
-
-  private val recordWriter: RecordWriter[NullWritable, Text] = {
-    new TextOutputFormat[NullWritable, Text]() {
-      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension")
-      }
-    }.getRecordWriter(context)
-  }
-
-  override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
-
-  override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    JacksonGenerator(dataSchema, gen)(row)
-    gen.flush()
-
-    result.set(writer.toString)
-    writer.reset()
-
-    recordWriter.write(NullWritable.get(), result)
-  }
-
-  override def close(): Unit = {
-    gen.close()
-    recordWriter.close(context)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
new file mode 100644
index 0000000..35f2476
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.CharArrayWriter
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
+import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class JsonFileFormat extends FileFormat with DataSourceRegister {
+
+  override def shortName(): String = "json"
+
+  override def inferSchema(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    if (files.isEmpty) {
+      None
+    } else {
+      val parsedOptions: JSONOptions = new JSONOptions(options)
+      val columnNameOfCorruptRecord =
+        parsedOptions.columnNameOfCorruptRecord
+          .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+      val jsonFiles = files.filterNot { status =>
+        val name = status.getPath.getName
+        name.startsWith("_") || name.startsWith(".")
+      }.toArray
+
+      val jsonSchema = InferSchema.infer(
+        createBaseRdd(sparkSession, jsonFiles),
+        columnNameOfCorruptRecord,
+        parsedOptions)
+      checkConstraints(jsonSchema)
+
+      Some(jsonSchema)
+    }
+  }
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    val parsedOptions: JSONOptions = new JSONOptions(options)
+    parsedOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          bucketId: Option[Int],
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new JsonOutputWriter(path, bucketId, dataSchema, context)
+      }
+    }
+  }
+
+  override def buildReader(
+      sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    val parsedOptions: JSONOptions = new JSONOptions(options)
+    val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
+      .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+    (file: PartitionedFile) => {
+      val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
+
+      JacksonParser.parseJson(
+        lines,
+        requiredSchema,
+        columnNameOfCorruptRecord,
+        parsedOptions)
+    }
+  }
+
+  private def createBaseRdd(
+      sparkSession: SparkSession,
+      inputPaths: Seq[FileStatus]): RDD[String] = {
+    val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
+    val conf = job.getConfiguration
+
+    val paths = inputPaths.map(_.getPath)
+
+    if (paths.nonEmpty) {
+      FileInputFormat.setInputPaths(job, paths: _*)
+    }
+
+    sparkSession.sparkContext.hadoopRDD(
+      conf.asInstanceOf[JobConf],
+      classOf[TextInputFormat],
+      classOf[LongWritable],
+      classOf[Text]).map(_._2.toString) // get the text line
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => "\"" + x + "\""
+      }.mkString(", ")
+      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+          s"cannot save to JSON format")
+    }
+  }
+
+  override def toString: String = "JSON"
+
+  override def hashCode(): Int = getClass.hashCode()
+
+  override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
+}
+
+private[json] class JsonOutputWriter(
+    path: String,
+    bucketId: Option[Int],
+    dataSchema: StructType,
+    context: TaskAttemptContext)
+  extends OutputWriter with Logging {
+
+  private[this] val writer = new CharArrayWriter()
+  // create the Generator without separator inserted between 2 records
+  private[this] val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+  private[this] val result = new Text()
+
+  private val recordWriter: RecordWriter[NullWritable, Text] = {
+    new TextOutputFormat[NullWritable, Text]() {
+      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+        val configuration = context.getConfiguration
+        val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+        val taskAttemptId = context.getTaskAttemptID
+        val split = taskAttemptId.getTaskID.getId
+        val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension")
+      }
+    }.getRecordWriter(context)
+  }
+
+  override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+
+  override protected[sql] def writeInternal(row: InternalRow): Unit = {
+    JacksonGenerator(dataSchema, gen)(row)
+    gen.flush()
+
+    result.set(writer.toString)
+    writer.reset()
+
+    recordWriter.write(NullWritable.get(), result)
+  }
+
+  override def close(): Unit = {
+    gen.close()
+    recordWriter.close(context)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message