spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "StanZhai (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-19471) [SQL]A confusing NullPointerException when creating table
Date Mon, 06 Feb 2017 04:27:41 GMT
StanZhai created SPARK-19471:
--------------------------------

             Summary: [SQL]A confusing NullPointerException when creating table
                 Key: SPARK-19471
                 URL: https://issues.apache.org/jira/browse/SPARK-19471
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.1.0
            Reporter: StanZhai
            Priority: Critical


After upgrading our Spark from 1.6.2 to 2.1.0, I encounter a confusing NullPointerException
when creating table under Spark 2.1.0, but the problem does not exists in Spark 1.6.1. 

Environment: Hive 1.2.1, Hadoop 2.6.4 

==================== Code ==================== 
// spark is an instance of HiveContext 
// merge is a Hive UDF 
val df = spark.sql("SELECT merge(field_a, null) AS new_a, field_b AS new_b FROM tb_1 group
by field_a, field_b") 
df.createTempView("tb_temp") 
spark.sql("create table tb_result stored as parquet as " + 
  "SELECT new_a" + 
  "FROM tb_temp" + 
  "LEFT JOIN `tb_2` ON " + 
  "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), concat('GrLSRwZE_', cast((rand()
* 200) AS int)), (`tb_temp`.`new_b`)) = `tb_2`.`fka6862f17`") 

==================== Physical Plan ==================== 
*Project [new_a] 
+- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_, cast(cast((_nondeterministic
* 200.0) as int) as string)) else new_b], [fka6862f17], LeftOuter, BuildRight 
   :- HashAggregate(keys=[field_a, field_b], functions=[], output=[new_a, new_b, _nondeterministic])

   :  +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180), coordinator[target
post-shuffle partition size: 1024880] 
   :     +- *HashAggregate(keys=[field_a, field_b], functions=[], output=[field_a, field_b])

   :        +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1, PartitionFilters: [], PushedFilters:
[], ReadSchema: struct 
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) 
      +- *Project [fka6862f17] 
         +- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[hdfs://hdcluster/data/tb_2, PartitionFilters: [], PushedFilters: [], ReadSchema:
struct 

What does '*' mean before HashAggregate? 

==================== Exception ==================== 
org.apache.spark.SparkException: Task failed while writing rows 
... 
java.lang.NullPointerException 
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown
Source) 
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source) 
        at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:260)

        at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:259)

        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:392)

        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)

        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source) 
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:252)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:199)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:197)

        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:202)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:138)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:137)

        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
        at org.apache.spark.scheduler.Task.run(Task.scala:99) 
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745) 

I also found that when I changed my code as follow: 

spark.sql("create table tb_result stored as parquet as " + 
  "SELECT new_b" + 
  "FROM tb_temp" + 
  "LEFT JOIN `tb_2` ON " + 
  "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), concat('GrLSRwZE_', cast((rand()
* 200) AS int)), (`tb_temp`.`new_b`)) = `tb_2`.`fka6862f17`") 

or 

spark.sql("create table tb_result stored as parquet as " + 
  "SELECT new_a" + 
  "FROM tb_temp" + 
  "LEFT JOIN `tb_2` ON " + 
  "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), concat('GrLSRwZE_', cast((200)
AS int)), (`tb_temp`.`new_b`)) = `tb_2`.`fka6862f17`") 

will not have this problem. 

== Physical Plan of select new_b ... == 
*Project [new_b] 
+- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_, cast(cast((_nondeterministic
* 200.0) as int) as string)) else new_b], [fka6862f17], LeftOuter, BuildRight 
   :- *HashAggregate(keys=[field_a, field_b], functions=[], output=[new_b, _nondeterministic])

   :  +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180), coordinator[target
post-shuffle partition size: 1024880] 
   :     +- *HashAggregate(keys=[field_a, field_b], functions=[], output=[field_a, field_b])

   :        +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1, PartitionFilters: [], PushedFilters:
[], ReadSchema: struct 
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) 
      +- *Project [fka6862f17] 
         +- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[hdfs://hdcluster/data/tb_2, PartitionFilters: [], PushedFilters: [], ReadSchema:
struct 

Difference is `HashAggregate(keys=[field_a, field_b], functions=[], output=[new_b, _nondeterministic])`
has a '*' char before it. 

It looks like something wrong with WholeStageCodegen when combine HiveUDF + rand() + group
by + join. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message