spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Imran Rashid (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-21165) Fail to write into partitioned hive table due to attribute reference not working with cast on partition column
Date Wed, 21 Jun 2017 20:04:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-21165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Imran Rashid updated SPARK-21165:
---------------------------------
    Description: 
A simple "insert into ... select" involving partitioned hive tables fails.  Here's a simpler
repro which doesn't involve hive at all -- this succeeds on 2.1.1, but fails on 2.2.0-rc5:

{noformat}
spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""")
spark.sql("""DROP TABLE IF EXISTS src""")
spark.sql("""DROP TABLE IF EXISTS dest""")
spark.sql("""
CREATE TABLE src (first string, word string)
  PARTITIONED BY (length int)
""")

spark.sql("""
INSERT INTO src PARTITION(length) VALUES
  ('a', 'abc', 3),
  ('b', 'bcde', 4),
  ('c', 'cdefg', 5)
""")

spark.sql("""
  CREATE TABLE dest (word string, length int)
    PARTITIONED BY (first string)
""")

spark.sql("""
  INSERT INTO TABLE dest PARTITION(first) SELECT word, length, cast(first as string) as first
FROM src
""")
{noformat}

The exception is

{noformat}
17/06/21 14:25:53 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 10, localhost, executor
driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute
, tree: first#74
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:49)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:43)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:884)
        at org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:363)
        at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:63)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:102)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Couldn't find first#74 in [word#76,length#77,first#75]
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 40 more
{noformat}

The key to making this fail is the {{cast(first as string) as first}}.  Doing the same thing
on any other column doesn't matter, it only matters on {{first}} (which is the partition column
for the destination table).

Here's the explain plan from 2.2.0:
{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
+- 'Project ['word, 'length, cast('first as string) AS first#85]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
   +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
      +- SubqueryAlias src
         +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]

== Optimized Logical Plan ==                                                             
                                                                                      [68/2430]
InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
   +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
      +- SubqueryAlias src
         +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]

== Physical Plan ==
ExecutedCommand
   +- InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
         +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
            +- SubqueryAlias src
               +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]|
{noformat}

And from 2.1.1:
{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), OverwriteOptions(false,Map()),
false
+- 'Project ['word, 'length, cast('first as string) AS first#55]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()),
false
+- Project [word#60, length#58, cast(first#59 as string) AS first#55]
   +- MetastoreRelation default, src

== Optimized Logical Plan ==
InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()),
false
+- Project [word#60, length#58, first#59]
   +- MetastoreRelation default, src

== Physical Plan ==
InsertIntoHiveTable MetastoreRelation default, dest, Map(first -> None), false, false
+- HiveTableScan [word#60, length#58, first#59], MetastoreRelation default, src
{noformat}

While this example query is somewhat contrived, this is really iimportant because if you try
to do the same thing where {{src}} was created by hive, then the query fails with the same
error.  In that case, the explain plan looks like:

{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
+- 'Project ['word, 'length, 'first]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
   +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48
as string) AS first#56]
      +- Project [word#49, length#50, first#48]
         +- SubqueryAlias src
            +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]

== Optimized Logical Plan ==
InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
   +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48
as string) AS first#56]
      +- Project [word#49, length#50, first#48]
         +- SubqueryAlias src
            +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]

== Physical Plan ==
ExecutedCommand
   +- InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
         +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55,
cast(first#48 as string) AS first#56]
            +- Project [word#49, length#50, first#48]
               +- SubqueryAlias src
                  +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]
{noformat}

  was:
A simple "insert into ... select" involving partitioned hive tables fails.  Here's a simpler
repro which doesn't involve hive at all -- this succeeds on 2.1.1, but fails on 2.2.0-rc5:

{noformat}
spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""")
spark.sql("""DROP TABLE IF EXISTS src""")
spark.sql("""DROP TABLE IF EXISTS dest""")
spark.sql("""
CREATE TABLE src (first string, word string)
  PARTITIONED BY (length int)
""")

spark.sql("""
INSERT INTO src PARTITION(length) VALUES
  ('a', 'abc', 3),
  ('b', 'bcde', 4),
  ('c', 'cdefg', 5)
""")

spark.sql("""
  CREATE TABLE dest (word string, length int)
    PARTITIONED BY (first string)
""")

spark.sql("""
  INSERT INTO TABLE dest PARTITION(first) SELECT word, length, first FROM src
""")

spark.sql("""
  INSERT INTO TABLE dest PARTITION(first) SELECT word, length, cast(first as string) as first
FROM src
""")
{noformat}

The exception is

{noformat}
17/06/21 14:25:53 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 10, localhost, executor
driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute
, tree: first#74
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:49)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:43)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:884)
        at org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:363)
        at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:63)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:102)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Couldn't find first#74 in [word#76,length#77,first#75]
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 40 more
{noformat}

The key to making this fail is the {{cast(first as string) as first}}.  Doing the same thing
on any other column doesn't matter, it only matters on {{first}} (which is the partition column
for the destination table).

Here's the explain plan from 2.2.0:
{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
+- 'Project ['word, 'length, cast('first as string) AS first#85]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
   +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
      +- SubqueryAlias src
         +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]

== Optimized Logical Plan ==                                                             
                                                                                      [68/2430]
InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
   +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
      +- SubqueryAlias src
         +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]

== Physical Plan ==
ExecutedCommand
   +- InsertIntoHiveTable CatalogTable(
Database: default
Table: dest
Owner: irashid
Created: Wed Jun 21 14:25:13 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`first`]
Schema: root
-- word: string (nullable = true)
-- length: integer (nullable = true)
-- first: string (nullable = true)
), Map(first -> None), false, false
         +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
            +- SubqueryAlias src
               +- CatalogRelation CatalogTable(
Database: default
Table: src
Owner: irashid
Created: Wed Jun 21 14:25:11 CDT 2017
Last Access: Wed Dec 31 18:00:00 CST 1969
Type: MANAGED
Provider: hive
Properties: [serialization.format=1]
Statistics: 9223372036854775807 bytes
Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Partition Provider: Catalog
Partition Columns: [`length`]
Schema: root
-- first: string (nullable = true)
-- word: string (nullable = true)
-- length: integer (nullable = true)
), [first#88, word#89], [length#90]|
{noformat}

And from 2.1.1:
{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), OverwriteOptions(false,Map()),
false
+- 'Project ['word, 'length, cast('first as string) AS first#55]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()),
false
+- Project [word#60, length#58, cast(first#59 as string) AS first#55]
   +- MetastoreRelation default, src

== Optimized Logical Plan ==
InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()),
false
+- Project [word#60, length#58, first#59]
   +- MetastoreRelation default, src

== Physical Plan ==
InsertIntoHiveTable MetastoreRelation default, dest, Map(first -> None), false, false
+- HiveTableScan [word#60, length#58, first#59], MetastoreRelation default, src
{noformat}

While this example query is somewhat contrived, this is really iimportant because if you try
to do the same thing where {{src}} was created by hive, then the query fails with the same
error.  In that case, the explain plan looks like:

{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
+- 'Project ['word, 'length, 'first]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
   +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48
as string) AS first#56]
      +- Project [word#49, length#50, first#48]
         +- SubqueryAlias src
            +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]

== Optimized Logical Plan ==
InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
   +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48
as string) AS first#56]
      +- Project [word#49, length#50, first#48]
         +- SubqueryAlias src
            +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]

== Physical Plan ==
ExecutedCommand
   +- InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
         +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55,
cast(first#48 as string) AS first#56]
            +- Project [word#49, length#50, first#48]
               +- SubqueryAlias src
                  +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]
{noformat}


> Fail to write into partitioned hive table due to attribute reference not working with
cast on partition column
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21165
>                 URL: https://issues.apache.org/jira/browse/SPARK-21165
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Imran Rashid
>            Priority: Blocker
>
> A simple "insert into ... select" involving partitioned hive tables fails.  Here's a
simpler repro which doesn't involve hive at all -- this succeeds on 2.1.1, but fails on 2.2.0-rc5:
> {noformat}
> spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""")
> spark.sql("""DROP TABLE IF EXISTS src""")
> spark.sql("""DROP TABLE IF EXISTS dest""")
> spark.sql("""
> CREATE TABLE src (first string, word string)
>   PARTITIONED BY (length int)
> """)
> spark.sql("""
> INSERT INTO src PARTITION(length) VALUES
>   ('a', 'abc', 3),
>   ('b', 'bcde', 4),
>   ('c', 'cdefg', 5)
> """)
> spark.sql("""
>   CREATE TABLE dest (word string, length int)
>     PARTITIONED BY (first string)
> """)
> spark.sql("""
>   INSERT INTO TABLE dest PARTITION(first) SELECT word, length, cast(first as string)
as first FROM src
> """)
> {noformat}
> The exception is
> {noformat}
> 17/06/21 14:25:53 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 10, localhost,
executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding
attribute
> , tree: first#74
>         at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>         at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:49)
>         at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:43)
>         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:884)
>         at org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:363)
>         at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:63)
>         at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:102)
>         at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Couldn't find first#74 in [word#76,length#77,first#75]
>         at scala.sys.package$.error(package.scala:27)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
>         at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
>         at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>         ... 40 more
> {noformat}
> The key to making this fail is the {{cast(first as string) as first}}.  Doing the same
thing on any other column doesn't matter, it only matters on {{first}} (which is the partition
column for the destination table).
> Here's the explain plan from 2.2.0:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
> +- 'Project ['word, 'length, cast('first as string) AS first#85]
>    +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
>    +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
>       +- SubqueryAlias src
>          +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]
> == Optimized Logical Plan ==                                                        
                                                                                         
 [68/2430]
> InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
>    +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
>       +- SubqueryAlias src
>          +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]
> == Physical Plan ==
> ExecutedCommand
>    +- InsertIntoHiveTable CatalogTable(
> Database: default
> Table: dest
> Owner: irashid
> Created: Wed Jun 21 14:25:13 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`first`]
> Schema: root
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> -- first: string (nullable = true)
> ), Map(first -> None), false, false
>          +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
>             +- SubqueryAlias src
>                +- CatalogRelation CatalogTable(
> Database: default
> Table: src
> Owner: irashid
> Created: Wed Jun 21 14:25:11 CDT 2017
> Last Access: Wed Dec 31 18:00:00 CST 1969
> Type: MANAGED
> Provider: hive
> Properties: [serialization.format=1]
> Statistics: 9223372036854775807 bytes
> Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
> Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> InputFormat: org.apache.hadoop.mapred.TextInputFormat
> OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> Partition Provider: Catalog
> Partition Columns: [`length`]
> Schema: root
> -- first: string (nullable = true)
> -- word: string (nullable = true)
> -- length: integer (nullable = true)
> ), [first#88, word#89], [length#90]|
> {noformat}
> And from 2.1.1:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), OverwriteOptions(false,Map()),
false
> +- 'Project ['word, 'length, cast('first as string) AS first#55]
>    +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()),
false
> +- Project [word#60, length#58, cast(first#59 as string) AS first#55]
>    +- MetastoreRelation default, src
> == Optimized Logical Plan ==
> InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()),
false
> +- Project [word#60, length#58, first#59]
>    +- MetastoreRelation default, src
> == Physical Plan ==
> InsertIntoHiveTable MetastoreRelation default, dest, Map(first -> None), false, false
> +- HiveTableScan [word#60, length#58, first#59], MetastoreRelation default, src
> {noformat}
> While this example query is somewhat contrived, this is really iimportant because if
you try to do the same thing where {{src}} was created by hive, then the query fails with
the same error.  In that case, the explain plan looks like:
> {noformat}
> == Parsed Logical Plan ==
> 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
> +- 'Project ['word, 'length, 'first]
>    +- 'UnresolvedRelation `src`
> == Analyzed Logical Plan ==
> InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
>    +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55,
cast(first#48 as string) AS first#56]
>       +- Project [word#49, length#50, first#48]
>          +- SubqueryAlias src
>             +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]
> == Optimized Logical Plan ==
> InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
>    +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55,
cast(first#48 as string) AS first#56]
>       +- Project [word#49, length#50, first#48]
>          +- SubqueryAlias src
>             +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]
> == Physical Plan ==
> ExecutedCommand
>    +- InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
Map(first -> None), false, false
>          +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55,
cast(first#48 as string) AS first#56]
>             +- Project [word#49, length#50, first#48]
>                +- SubqueryAlias src
>                   +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[first#48, word#49], [length#50]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message