spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-15231][SQL] Document the semantic of saveAsTable and insertInto and don't drop columns silently
Date Wed, 11 May 2016 06:55:00 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a8637f4ac -> 2d3c69a02


[SPARK-15231][SQL] Document the semantic of saveAsTable and insertInto and don't drop columns
silently

## What changes were proposed in this pull request?

This PR adds documents about the different behaviors between `insertInto` and `saveAsTable`,
and throws an exception when the user try to add too man columns using `saveAsTable with append`.

## How was this patch tested?

Unit tests added in this PR.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13013 from zsxwing/SPARK-15231.

(cherry picked from commit 875ef764280428acd095aec1834fee0ddad08611)
Signed-off-by: Yin Huai <yhuai@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d3c69a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d3c69a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d3c69a0

Branch: refs/heads/branch-2.0
Commit: 2d3c69a0221cbf8a24c82b623b48edcf4e879730
Parents: a8637f4
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Tue May 10 23:53:55 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Tue May 10 23:54:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 36 +++++++++++++++-
 .../command/createDataSourceTables.scala        |  5 +++
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 43 ++++++++++++++++++++
 3 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2d3c69a0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index da9d254..a9e8329 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -361,6 +361,23 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * Inserts the content of the [[DataFrame]] to the specified table. It requires that
    * the schema of the [[DataFrame]] is the same as the schema of the table.
    *
+   * Note: Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based
+   * resolution. For example:
+   *
+   * {{{
+   *    scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
+   *    scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
+   *    scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
+   *    scala> sql("select * from t1").show
+   *    +---+---+
+   *    |  i|  j|
+   *    +---+---+
+   *    |  5|  6|
+   *    |  3|  4|
+   *    |  1|  2|
+   *    +---+---+
+   * }}}
+   *
    * Because it inserts data to an existing table, format or options will be ignored.
    *
    * @since 1.4.0
@@ -454,8 +471,23 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * save mode, specified by the `mode` function (default to throwing an exception).
    * When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not need to be
    * the same as that of the existing table.
-   * When `mode` is `Append`, the schema of the [[DataFrame]] need to be
-   * the same as that of the existing table, and format or options will be ignored.
+   *
+   * When `mode` is `Append`, if there is an existing table, we will use the format and options
of
+   * the existing table. The column order in the schema of the [[DataFrame]] doesn't need
to be same
+   * as that of the existing table. Unlike `insertInto`, `saveAsTable` will use the column
names to
+   * find the correct column positions. For example:
+   *
+   * {{{
+   *    scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
+   *    scala> Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("t1")
+   *    scala> sql("select * from t1").show
+   *    +---+---+
+   *    |  i|  j|
+   *    +---+---+
+   *    |  1|  2|
+   *    |  4|  3|
+   *    +---+---+
+   * }}}
    *
    * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single
input
    * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e.
ORC

http://git-wip-us.apache.org/repos/asf/spark/blob/2d3c69a0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 1494341..3525111 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -192,6 +192,11 @@ case class CreateDataSourceTableAsSelectCommand(
           EliminateSubqueryAliases(
             sessionState.catalog.lookupRelation(tableIdent)) match {
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+              if (query.schema.size != l.schema.size) {
+                throw new AnalysisException(
+                  s"The column number of the existing schema[${l.schema}] " +
+                    s"doesn't match the data schema[${query.schema}]'s")
+              }
               existingSchema = Some(l.schema)
             case o =>
               throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")

http://git-wip-us.apache.org/repos/asf/spark/blob/2d3c69a0/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b2a80e7..676fbd0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1038,6 +1038,49 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils
with TestHiv
     }
   }
 
+  test("saveAsTable[append]: the column order doesn't matter") {
+    withTable("saveAsTable_column_order") {
+      Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_column_order")
+      Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("saveAsTable_column_order")
+      checkAnswer(
+        table("saveAsTable_column_order"),
+        Seq((1, 2), (4, 3)).toDF("i", "j"))
+    }
+  }
+
+  test("saveAsTable[append]: mismatch column names") {
+    withTable("saveAsTable_mismatch_column_names") {
+      Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_mismatch_column_names")
+      val e = intercept[AnalysisException] {
+        Seq((3, 4)).toDF("i", "k")
+          .write.mode("append").saveAsTable("saveAsTable_mismatch_column_names")
+      }
+      assert(e.getMessage.contains("cannot resolve"))
+    }
+  }
+
+  test("saveAsTable[append]: too many columns") {
+    withTable("saveAsTable_too_many_columns") {
+      Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_too_many_columns")
+      val e = intercept[AnalysisException] {
+        Seq((3, 4, 5)).toDF("i", "j", "k")
+          .write.mode("append").saveAsTable("saveAsTable_too_many_columns")
+      }
+      assert(e.getMessage.contains("doesn't match"))
+    }
+  }
+
+  test("saveAsTable[append]: less columns") {
+    withTable("saveAsTable_less_columns") {
+      Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns")
+      val e = intercept[AnalysisException] {
+        Seq((4)).toDF("j")
+          .write.mode("append").saveAsTable("saveAsTable_less_columns")
+      }
+      assert(e.getMessage.contains("doesn't match"))
+    }
+  }
+
   test("SPARK-15025: create datasource table with path with select") {
     withTempPath { dir =>
       withTable("t") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message