spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema
Date Fri, 27 Oct 2017 00:59:15 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 2839280ad -> cb54f297a


[SPARK-22356][SQL] data source table should support overlapped columns between data and partition
schema

This is a regression introduced by #14207. After Spark 2.1, we store the inferred schema when
creating the table, to avoid inferring schema again at read path. However, there is one special
case: overlapped columns between data and partition. For this case, it breaks the assumption
of table schema that there is on ovelap between data and partition schema, and partition columns
should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that
puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate
table schema, which fails at this case.

To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed
columns detected, i.e. store empty schema in metastore.

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19579 from cloud-fan/bug2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb54f297
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb54f297
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb54f297

Branch: refs/heads/branch-2.2
Commit: cb54f297ae52690e6162b2bab9a3940d38ff82f2
Parents: 2839280
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Thu Oct 26 17:39:53 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Thu Oct 26 17:56:29 2017 -0700

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        | 35 +++++++++++++-----
 .../datasources/HadoopFsRelation.scala          | 25 ++++++++-----
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 16 +++++++++
 .../hive/HiveExternalCatalogVersionsSuite.scala | 38 +++++++++++++++-----
 4 files changed, 89 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb54f297/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 2d89011..d05af89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 
 /**
  * A command used to create a data source table.
@@ -87,14 +88,32 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists:
Boo
       }
     }
 
-    val newTable = table.copy(
-      schema = dataSource.schema,
-      partitionColumnNames = partitionColumnNames,
-      // If metastore partition management for file source tables is enabled, we start off
with
-      // partition provider hive, but no partitions in the metastore. The user has to call
-      // `msck repair table` to populate the table partitions.
-      tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
-        sessionState.conf.manageFilesourcePartitions)
+    val newTable = dataSource match {
+      // Since Spark 2.1, we store the inferred schema of data source in metastore, to avoid
+      // inferring the schema again at read path. However if the data source has overlapped
columns
+      // between data and partition schema, we can't store it in metastore as it breaks the
+      // assumption of table schema. Here we fallback to the behavior of Spark prior to 2.1,
store
+      // empty schema in metastore and infer it at runtime. Note that this also means the
new
+      // scalable partitioning handling feature(introduced at Spark 2.1) is disabled in this
case.
+      case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty =>
+        logWarning("It is not recommended to create a table with overlapped data and partition
" +
+          "columns, as Spark cannot store a valid table schema and has to infer it at runtime,
" +
+          "which hurts performance. Please check your data files and remove the partition
" +
+          "columns in it.")
+        table.copy(schema = new StructType(), partitionColumnNames = Nil)
+
+      case _ =>
+        table.copy(
+          schema = dataSource.schema,
+          partitionColumnNames = partitionColumnNames,
+          // If metastore partition management for file source tables is enabled, we start
off with
+          // partition provider hive, but no partitions in the metastore. The user has to
call
+          // `msck repair table` to populate the table partitions.
+          tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
+            sessionState.conf.manageFilesourcePartitions)
+
+    }
+
     // We will return Nil or throw exception at the beginning if the table already exists,
so when
     // we reach here, the table should not exist and we should set `ignoreIfExists` to false.
     sessionState.catalog.createTable(newTable, ignoreIfExists = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/cb54f297/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 9a08524..89d8a85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.util.Locale
+
 import scala.collection.mutable
 
 import org.apache.spark.sql.{SparkSession, SQLContext}
@@ -50,15 +52,22 @@ case class HadoopFsRelation(
 
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
-  val schema: StructType = {
-    val getColName: (StructField => String) =
-      if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else _.name.toLowerCase
-    val overlappedPartCols = mutable.Map.empty[String, StructField]
-    partitionSchema.foreach { partitionField =>
-      if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
-        overlappedPartCols += getColName(partitionField) -> partitionField
-      }
+  private def getColName(f: StructField): String = {
+    if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
+      f.name
+    } else {
+      f.name.toLowerCase(Locale.ROOT)
+    }
+  }
+
+  val overlappedPartCols = mutable.Map.empty[String, StructField]
+  partitionSchema.foreach { partitionField =>
+    if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
+      overlappedPartCols += getColName(partitionField) -> partitionField
     }
+  }
+
+  val schema: StructType = {
     StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
       partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cb54f297/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index c6a6efd..3750551 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2646,4 +2646,20 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-22356: overlapped columns between data and partition schema in data source
tables") {
+    withTempPath { path =>
+      Seq((1, 1, 1), (1, 2, 1)).toDF("i", "p", "j")
+        .write.mode("overwrite").parquet(new File(path, "p=1").getCanonicalPath)
+      withTable("t") {
+        sql(s"create table t using parquet options(path='${path.getCanonicalPath}')")
+        // We should respect the column order in data schema.
+        assert(spark.table("t").columns === Array("i", "p", "j"))
+        checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(1, 1, 1) :: Nil)
+        // The DESC TABLE should report same schema as table scan.
+        assert(sql("desc t").select("col_name")
+          .as[String].collect().mkString(",").contains("i,p,j"))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cb54f297/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 5f8c9d5..6859432 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -40,7 +40,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
   private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data")
   // For local test, you can set `sparkTestingDir` to a static value like `/tmp/test-spark`,
to
   // avoid downloading Spark of different versions in each run.
-  private val sparkTestingDir = Utils.createTempDir(namePrefix = "test-spark")
+  private val sparkTestingDir = new File("/tmp/test-spark")
   private val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
 
   override def afterAll(): Unit = {
@@ -77,35 +77,38 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
     super.beforeAll()
 
     val tempPyFile = File.createTempFile("test", ".py")
+    // scalastyle:off line.size.limit
     Files.write(tempPyFile.toPath,
       s"""
         |from pyspark.sql import SparkSession
+        |import os
         |
         |spark = SparkSession.builder.enableHiveSupport().getOrCreate()
         |version_index = spark.conf.get("spark.sql.test.version.index", None)
         |
         |spark.sql("create table data_source_tbl_{} using json as select 1 i".format(version_index))
         |
-        |spark.sql("create table hive_compatible_data_source_tbl_" + version_index + \\
-        |          " using parquet as select 1 i")
+        |spark.sql("create table hive_compatible_data_source_tbl_{} using parquet as select
1 i".format(version_index))
         |
         |json_file = "${genDataDir("json_")}" + str(version_index)
         |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file)
-        |spark.sql("create table external_data_source_tbl_" + version_index + \\
-        |          "(i int) using json options (path '{}')".format(json_file))
+        |spark.sql("create table external_data_source_tbl_{}(i int) using json options (path
'{}')".format(version_index, json_file))
         |
         |parquet_file = "${genDataDir("parquet_")}" + str(version_index)
         |spark.range(1, 2).selectExpr("cast(id as int) as i").write.parquet(parquet_file)
-        |spark.sql("create table hive_compatible_external_data_source_tbl_" + version_index
+ \\
-        |          "(i int) using parquet options (path '{}')".format(parquet_file))
+        |spark.sql("create table hive_compatible_external_data_source_tbl_{}(i int) using
parquet options (path '{}')".format(version_index, parquet_file))
         |
         |json_file2 = "${genDataDir("json2_")}" + str(version_index)
         |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file2)
-        |spark.sql("create table external_table_without_schema_" + version_index + \\
-        |          " using json options (path '{}')".format(json_file2))
+        |spark.sql("create table external_table_without_schema_{} using json options (path
'{}')".format(version_index, json_file2))
+        |
+        |parquet_file2 = "${genDataDir("parquet2_")}" + str(version_index)
+        |spark.range(1, 3).selectExpr("1 as i", "cast(id as int) as p", "1 as j").write.parquet(os.path.join(parquet_file2,
"p=1"))
+        |spark.sql("create table tbl_with_col_overlap_{} using parquet options(path '{}')".format(version_index,
parquet_file2))
         |
         |spark.sql("create view v_{} as select 1 i".format(version_index))
       """.stripMargin.getBytes("utf8"))
+    // scalastyle:on line.size.limit
 
     PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) =>
       val sparkHome = new File(sparkTestingDir, s"spark-$version")
@@ -153,6 +156,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
       .enableHiveSupport()
       .getOrCreate()
     spark = session
+    import session.implicits._
 
     testingVersions.indices.foreach { index =>
       Seq(
@@ -194,6 +198,22 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
 
       // test permanent view
       checkAnswer(sql(s"select i from v_$index"), Row(1))
+
+      // SPARK-22356: overlapped columns between data and partition schema in data source
tables
+      val tbl_with_col_overlap = s"tbl_with_col_overlap_$index"
+      // For Spark 2.2.0 and 2.1.x, the behavior is different from Spark 2.0.
+      if (testingVersions(index).startsWith("2.1") || testingVersions(index) == "2.2.0")
{
+        spark.sql("msck repair table " + tbl_with_col_overlap)
+        assert(spark.table(tbl_with_col_overlap).columns === Array("i", "j", "p"))
+        checkAnswer(spark.table(tbl_with_col_overlap), Row(1, 1, 1) :: Row(1, 1, 1) :: Nil)
+        assert(sql("desc " + tbl_with_col_overlap).select("col_name")
+          .as[String].collect().mkString(",").contains("i,j,p"))
+      } else {
+        assert(spark.table(tbl_with_col_overlap).columns === Array("i", "p", "j"))
+        checkAnswer(spark.table(tbl_with_col_overlap), Row(1, 1, 1) :: Row(1, 1, 1) :: Nil)
+        assert(sql("desc " + tbl_with_col_overlap).select("col_name")
+          .as[String].collect().mkString(",").contains("i,p,j"))
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message