spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #12313: [SPARK-14543] [SQL] Improve InsertIntoTable colum...
Date Thu, 02 Jun 2016 17:32:12 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12313#discussion_r65583506
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
---
    @@ -284,8 +284,128 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton
with Bef
           val data = (1 to 10).map(i => (i.toLong, s"data-$i")).toDF("id", "data")
     
           val logical = InsertIntoTable(spark.table("partitioned").logicalPlan,
    -        Map("part" -> None), data.logicalPlan, overwrite = false, ifNotExists = false)
    +        Map("part" -> None), data.logicalPlan, overwrite = false, ifNotExists = false,
    +        Map("matchByName" -> "true"))
           assert(!logical.resolved, "Should not resolve: missing partition data")
         }
       }
    +
    +  test("Insert unnamed expressions by position") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
    +
    +      val expected = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else
"odd"))
    +          .toDF("id", "data", "part")
    +      val data = expected.select("id", "part")
    +
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      // should be able to insert an expression when NOT mapping columns by name
    +      spark.table("source").selectExpr("id", "part", "CONCAT('data-', id)")
    +          .write.insertInto("partitioned")
    +      checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Insert expression by name") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
    +
    +      val expected = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else
"odd"))
    +          .toDF("id", "data", "part")
    +      val data = expected.select("id", "part")
    +
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      intercept[AnalysisException] {
    +        // also a problem when mapping by name
    +        spark.table("source").selectExpr("id", "part", "CONCAT('data-', id)")
    +            .write.option("matchByName", true).insertInto("partitioned")
    +      }
    +
    +      // should be able to insert an expression using AS when mapping columns by name
    +      spark.table("source").selectExpr("id", "part", "CONCAT('data-', id) as data")
    +          .write.option("matchByName", true).insertInto("partitioned")
    +      checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Reject missing columns") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
    +
    +      intercept[AnalysisException] {
    +        spark.table("source").write.insertInto("partitioned")
    +      }
    +
    +      intercept[AnalysisException] {
    +        // also a problem when mapping by name
    +        spark.table("source").write.option("matchByName", true).insertInto("partitioned")
    +      }
    +    }
    +  }
    +
    +  test("Reject extra columns") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, data string, extra string, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
    +
    +      intercept[AnalysisException] {
    +        spark.table("source").write.insertInto("partitioned")
    +      }
    +
    +      val data = (1 to 10)
    +          .map(i => (i, s"data-$i", s"${i * i}", if ((i % 2) == 0) "even" else "odd"))
    +          .toDF("id", "data", "extra", "part")
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      spark.table("source").write.option("matchByName", true).insertInto("partitioned")
    +
    +      val expected = data.select("id", "data", "part")
    +      checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Ignore names when writing by position") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string, data string)") // part, data
transposed
    +      sql("CREATE TABLE destination (id bigint, data string, part string)")
    +
    +      val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else
"odd"))
    +          .toDF("id", "data", "part")
    +
    +      // write into the reordered table by name
    +      data.write.option("matchByName", true).insertInto("source")
    +      checkAnswer(sql("SELECT id, data, part FROM source"), data.collect().toSeq)
    +
    +      val expected = data.select($"id", $"part" as "data", $"data" as "part")
    +
    +      // this produces a warning, but writes src.part -> dest.data and src.data ->
dest.part
    +      spark.table("source").write.insertInto("destination")
    +      checkAnswer(sql("SELECT id, data, part FROM destination"), expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Reorder columns by name") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (data string, part string, id bigint)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
    +
    +      val data = (1 to 10).map(i => (s"data-$i", if ((i % 2) == 0) "even" else "odd",
i))
    +          .toDF("data", "part", "id")
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      spark.table("source").write.option("matchByName", true).insertInto("partitioned")
    --- End diff --
    
    I'm not sure if I understand the previous discussion between you and @yhuai , but looks
like @yhuai suggested that we should only allow position-based resolution for `insertInto`
and add name-bases resolution in follow-up PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message