spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5775) GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table
Date Fri, 27 Feb 2015 00:25:04 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14339457#comment-14339457
] 

Apache Spark commented on SPARK-5775:
-------------------------------------

User 'yhuai' has created a pull request for this issue:
https://github.com/apache/spark/pull/4798

> GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-5775
>                 URL: https://issues.apache.org/jira/browse/SPARK-5775
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.2.1
>            Reporter: Ayoub Benali
>            Assignee: Cheng Lian
>            Priority: Blocker
>              Labels: hivecontext, nested, parquet, partition
>
> Using the "LOAD" sql command in Hive context to load parquet files into a partitioned
table causes exceptions during query time. 
> The bug requires the table to have a column of *type Array of struct* and to be *partitioned*.

> The example bellow shows how to reproduce the bug and you can see that if the table is
not partitioned the query works fine. 
> {noformat}
> scala> val data1 = """{"data_array":[{"field1":1,"field2":2}]}"""
> scala> val data2 = """{"data_array":[{"field1":3,"field2":4}]}"""
> scala> val jsonRDD = sc.makeRDD(data1 :: data2 :: Nil)
> scala> val schemaRDD = hiveContext.jsonRDD(jsonRDD)
> scala> schemaRDD.printSchema
> root
>  |-- data_array: array (nullable = true)
>  |    |-- element: struct (containsNull = false)
>  |    |    |-- field1: integer (nullable = true)
>  |    |    |-- field2: integer (nullable = true)
> scala> hiveContext.sql("create external table if not exists partitioned_table(data_array
ARRAY <STRUCT<field1: INT, field2: INT>>) Partitioned by (date STRING) STORED
AS PARQUET Location 'hdfs://****/partitioned_table'")
> scala> hiveContext.sql("create external table if not exists none_partitioned_table(data_array
ARRAY <STRUCT<field1: INT, field2: INT>>) STORED AS PARQUET Location 'hdfs://****/none_partitioned_table'")
> scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_1")
> scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_2")
> scala> hiveContext.sql("LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_1'
INTO TABLE partitioned_table PARTITION(date='2015-02-12')")
> scala> hiveContext.sql("LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_2'
INTO TABLE none_partitioned_table")
> scala> hiveContext.sql("select data.field1 from none_partitioned_table LATERAL VIEW
explode(data_array) nestedStuff AS data").collect
> res23: Array[org.apache.spark.sql.Row] = Array([1], [3])
> scala> hiveContext.sql("select data.field1 from partitioned_table LATERAL VIEW explode(data_array)
nestedStuff AS data").collect
> 15/02/12 16:21:03 INFO ParseDriver: Parsing command: select data.field1 from partitioned_table
LATERAL VIEW explode(data_array) nestedStuff AS data
> 15/02/12 16:21:03 INFO ParseDriver: Parse Completed
> 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(260661) called with curMem=0, maxMem=280248975
> 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated
size 254.6 KB, free 267.0 MB)
> 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(28615) called with curMem=260661,
maxMem=280248975
> 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory
(estimated size 27.9 KB, free 267.0 MB)
> 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:51990
(size: 27.9 KB, free: 267.2 MB)
> 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0
> 15/02/12 16:21:03 INFO SparkContext: Created broadcast 18 from NewHadoopRDD at ParquetTableOperations.scala:119
> 15/02/12 16:21:03 INFO FileInputFormat: Total input paths to process : 3
> 15/02/12 16:21:03 INFO ParquetInputFormat: Total input paths to process : 3
> 15/02/12 16:21:03 INFO FilteringParquetRowInputFormat: Using Task Side Metadata Split
Strategy
> 15/02/12 16:21:03 INFO SparkContext: Starting job: collect at SparkPlan.scala:84
> 15/02/12 16:21:03 INFO DAGScheduler: Got job 12 (collect at SparkPlan.scala:84) with
3 output partitions (allowLocal=false)
> 15/02/12 16:21:03 INFO DAGScheduler: Final stage: Stage 13(collect at SparkPlan.scala:84)
> 15/02/12 16:21:03 INFO DAGScheduler: Parents of final stage: List()
> 15/02/12 16:21:03 INFO DAGScheduler: Missing parents: List()
> 15/02/12 16:21:03 INFO DAGScheduler: Submitting Stage 13 (MappedRDD[111] at map at SparkPlan.scala:84),
which has no missing parents
> 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(7632) called with curMem=289276,
maxMem=280248975
> 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated
size 7.5 KB, free 267.0 MB)
> 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(4230) called with curMem=296908,
maxMem=280248975
> 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory
(estimated size 4.1 KB, free 267.0 MB)
> 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:51990
(size: 4.1 KB, free: 267.2 MB)
> 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block broadcast_19_piece0
> 15/02/12 16:21:03 INFO SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:838
> 15/02/12 16:21:03 INFO DAGScheduler: Submitting 3 missing tasks from Stage 13 (MappedRDD[111]
at map at SparkPlan.scala:84)
> 15/02/12 16:21:03 INFO TaskSchedulerImpl: Adding task set 13.0 with 3 tasks
> 15/02/12 16:21:03 INFO TaskSetManager: Starting task 0.0 in stage 13.0 (TID 48, *****,
NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:03 INFO TaskSetManager: Starting task 1.0 in stage 13.0 (TID 49, *****,
NODE_LOCAL, 1641 bytes)
> 15/02/12 16:21:03 INFO TaskSetManager: Starting task 2.0 in stage 13.0 (TID 50, *****,
NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:39729
(size: 4.1 KB, free: 133.6 MB)
> 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:48213
(size: 4.1 KB, free: 133.6 MB)
> 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:45394
(size: 4.1 KB, free: 133.6 MB)
> 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:39729
(size: 27.9 KB, free: 133.6 MB)
> 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:48213
(size: 27.9 KB, free: 133.6 MB)
> 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:45394
(size: 27.9 KB, free: 133.6 MB)
> 15/02/12 16:21:04 WARN TaskSetManager: Lost task 0.0 in stage 13.0 (TID 48, *****): java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
>   at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147)
>   at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>   at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>   at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>   at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>   at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>   at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>   at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.1 in stage 13.0 (TID 51, *****,
NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.0 in stage 13.0 (TID 49) on executor
*****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow
cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate
1]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.1 in stage 13.0 (TID 52, *****,
NODE_LOCAL, 1641 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.1 in stage 13.0 (TID 51) on executor
*****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow
cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate
2]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.2 in stage 13.0 (TID 53, *****,
NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Finished task 2.0 in stage 13.0 (TID 50) in 405
ms on ***** (1/3)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.1 in stage 13.0 (TID 52) on executor
*****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow
cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate
3]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.2 in stage 13.0 (TID 54, *****,
NODE_LOCAL, 1641 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.2 in stage 13.0 (TID 53) on executor
*****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow
cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate
4]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.3 in stage 13.0 (TID 55, *****,
NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.2 in stage 13.0 (TID 54) on executor
*****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow
cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate
5]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.3 in stage 13.0 (TID 56, *****,
NODE_LOCAL, 1641 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.3 in stage 13.0 (TID 55) on executor
*****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow
cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate
6]
> 15/02/12 16:21:04 ERROR TaskSetManager: Task 0 in stage 13.0 failed 4 times; aborting
job
> 15/02/12 16:21:04 INFO TaskSchedulerImpl: Cancelling stage 13
> 15/02/12 16:21:04 INFO TaskSchedulerImpl: Stage 13 was cancelled
> 15/02/12 16:21:04 INFO DAGScheduler: Job 12 failed: collect at SparkPlan.scala:84, took
0.556942 s
> 15/02/12 16:21:04 WARN TaskSetManager: Lost task 1.3 in stage 13.0 (TID 56, *****): TaskKilled
(killed intentionally)
> 15/02/12 16:21:04 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks have all
completed, from pool 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0
failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 55, *****): java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
>   at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147)
>   at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>   at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>   at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>   at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>   at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>   at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>   at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
>   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>   at scala.Option.foreach(Option.scala:236)
>   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message