spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] rdblue commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog
Date Thu, 29 Aug 2019 21:15:01 GMT
rdblue commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto
through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r319274871
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/InsertIntoTests.scala
 ##########
 @@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * A collection of "INSERT INTO" tests that can be run through the SQL or DataFrameWriter
APIs.
+ * Extending test suites can implement the `doInsert` method to run the insert through either
+ * API.
+ *
+ * @param supportsDynamicOverwrite Whether the Table implementations used in the test suite
support
+ *                                 dynamic partition overwrites. If they do, we will check
for the
+ *                                 success of the operations. If not, then we will check
that we
+ *                                 failed with the right error message.
+ * @param includeSQLOnlyTests Certain INSERT INTO behavior can be achieved purely through
SQL, e.g.
+ *                            static or dynamic partition overwrites. This flag should be
set to
+ *                            true if we would like to test these cases.
+ */
+abstract class InsertIntoTests(
+    override protected val supportsDynamicOverwrite: Boolean,
+    override protected val includeSQLOnlyTests: Boolean) extends InsertIntoSQLOnlyTests {
+
+  import testImplicits._
+
+  /**
+   * Insert data into a table using the insertInto statement. Implementations can be in SQL
+   * ("INSERT") or using the DataFrameWriter (`df.write.insertInto`).
+   */
+  protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode = null): Unit
+
+  test("insertInto: append") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    doInsert(t1, df)
+    verifyTable(t1, df)
+  }
+
+  test("insertInto: append by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+
+    doInsert(t1, dfr)
+    verifyTable(t1, df)
+  }
+
+  test("insertInto: append partitioned table") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+      doInsert(t1, df)
+      verifyTable(t1, df)
+    }
+  }
+
+  test("insertInto: overwrite non-partitioned table") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data")
+    doInsert(t1, df)
+    doInsert(t1, df2, SaveMode.Overwrite)
+    verifyTable(t1, df2)
+  }
+
+  test("insertInto: overwrite partitioned table in static mode") {
+    withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString)
{
+      val t1 = s"${catalogAndNamespace}tbl"
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
+      val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+      doInsert(t1, init)
+
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+      doInsert(t1, df, SaveMode.Overwrite)
+      verifyTable(t1, df)
+    }
+  }
+
+
+  test("insertInto: overwrite partitioned table in static mode by position") {
+    withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString)
{
+      val t1 = s"${catalogAndNamespace}tbl"
+      withTable(t1) {
+        sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
+        val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+        doInsert(t1, init)
+
+        val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+        doInsert(t1, dfr, SaveMode.Overwrite)
+
+        val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+        verifyTable(t1, df)
+      }
+    }
+  }
+
+  test("insertInto: fails when missing a column") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING $v2Format")
+    val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    val exc = intercept[AnalysisException] {
+      doInsert(t1, df)
+    }
+
+    verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing"))
+    val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1
+    assert(exc.getMessage.contains(s"Cannot write to '$tableName', not enough data columns"))
+  }
+
+  test("insertInto: fails when an extra column is present") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+      val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit")
+      val exc = intercept[AnalysisException] {
+        doInsert(t1, df)
+      }
+
+      verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+      val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1
+      assert(exc.getMessage.contains(s"Cannot write to '$tableName', too many data columns"))
+    }
+  }
+
+  dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic mode") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
+      val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+      doInsert(t1, init)
+
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+      doInsert(t1, df, SaveMode.Overwrite)
+
+      verifyTable(t1, df.union(sql("SELECT 4L, 'keep'")))
+    }
+  }
+
+  dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic mode by position")
{
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
+      val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+      doInsert(t1, init)
+
+      val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+      doInsert(t1, dfr, SaveMode.Overwrite)
+
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "keep")).toDF("id", "data")
+      verifyTable(t1, df)
+    }
+  }
+}
+
+private[v2] trait InsertIntoSQLOnlyTests
+  extends QueryTest
+  with SharedSparkSession
+  with BeforeAndAfter {
+
+  import testImplicits._
+
+  /** Check that the results in `tableName` match the `expected` DataFrame. */
+  protected def verifyTable(tableName: String, expected: DataFrame): Unit
+
+  protected val v2Format: String
+  protected val catalogAndNamespace: String
+
+  /**
+   * Whether dynamic partition overwrites are supported by the `Table` definitions used in
the
+   * test suites. Tables that leverage the V1 Write interface do not support dynamic partition
+   * overwrites.
+   */
+  protected val supportsDynamicOverwrite: Boolean
+
+  /** Whether to include the SQL specific tests in this trait within the extending test suite.
*/
+  protected val includeSQLOnlyTests: Boolean
+
+  private def withTableAndData(tableName: String)(testFn: String => Unit): Unit = {
+    withTable(tableName) {
+      val viewName = "tmp_view"
+      val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
+      df.createOrReplaceTempView(viewName)
+      withTempView(viewName) {
+        testFn(viewName)
+      }
+    }
+  }
+
+  protected def dynamicOverwriteTest(testName: String)(f: => Unit): Unit = {
+    test(testName) {
+      try {
+        withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString)
{
+          f
+        }
+        if (!supportsDynamicOverwrite) {
+          fail("Expected failure from test, because the table doesn't support dynamic overwrites")
+        }
+      } catch {
+        case a: AnalysisException if !supportsDynamicOverwrite =>
+          assert(a.getMessage.contains("Table does not support dynamic overwrite"))
+      }
+    }
+  }
+
+  if (includeSQLOnlyTests) {
 
 Review comment:
   Same here. It would be better to use assume.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message