spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sylvain Zimmer (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
Date Tue, 26 Jul 2016 19:55:20 GMT

    [ https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15394437#comment-15394437
] 

Sylvain Zimmer commented on SPARK-16740:
----------------------------------------

I'm not a Scala expert but from a quick review of the code, it appears that it's easy to overflow
the {{range}} variable in {{optimize()}}:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L608

In my example, that would yield
{{ scala> val range = 4661454128115150227L - (-5543241376386463808L)
range: Long = -8242048569207937581
}}

Maybe we should add {{range >= 0}} as a condition of doing that optimization?

> joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
> ----------------------------------------------------------------
>
>                 Key: SPARK-16740
>                 URL: https://issues.apache.org/jira/browse/SPARK-16740
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core, SQL
>    Affects Versions: 2.0.0
>            Reporter: Sylvain Zimmer
>
> Hello,
> Here is a crash in Spark SQL joins, with a minimal reproducible test case. Interestingly,
it only seems to happen when reading Parquet data (I added a {{crash = True}} variable to
show it)
> This is an {{left_outer}} example, but it also crashes with a regular {{inner}} join.
> {code}
> import os
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> schema1 = SparkTypes.StructType([
>     SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True)
> ])
> schema2 = SparkTypes.StructType([
>     SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True)
> ])
> # Valid Long values (-9223372036854775808 < -5543241376386463808 , 4661454128115150227
< 9223372036854775807)
> data1 = [(4661454128115150227,), (-5543241376386463808,)]
> data2 = [(650460285, )]
> df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
> df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)
> crash = True
> if crash:
>     os.system("rm -rf /tmp/sparkbug")
>     df1.write.parquet("/tmp/sparkbug/vertex")
>     df2.write.parquet("/tmp/sparkbug/edge")
>     df1 = sqlc.read.load("/tmp/sparkbug/vertex")
>     df2 = sqlc.read.load("/tmp/sparkbug/edge")
> result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer")
> # Should print [Row(id2=650460285, id1=None)]
> print result_df.collect()
> {code}
> When ran with {{spark-submit}}, the final {{collect()}} call crashes with this:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling o61.collectToPython.
> : org.apache.spark.SparkException: Exception thrown in awaitResult:
> 	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
> 	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
> 	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> 	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
> 	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
> 	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
> 	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
> 	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
> 	at org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225)
> 	at org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328)
> 	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> 	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> 	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
> 	at org.apache.spark.sql.execution.BatchedDataSourceScanExec.produce(ExistingRDD.scala:225)
> 	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
> 	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> 	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> 	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
> 	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:309)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:347)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
> 	at org.apache.spark.sql.Dataset.javaToPython(Dataset.scala:2507)
> 	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
> 	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
> 	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> 	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
> 	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 	at py4j.Gateway.invoke(Gateway.java:280)
> 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
> 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
> 	at py4j.GatewayConnection.run(GatewayConnection.java:211)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NegativeArraySizeException
> 	at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.optimize(HashedRelation.scala:619)
> 	at org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:806)
> 	at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:105)
> 	at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:816)
> 	at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:812)
> 	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:90)
> 	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:72)
> 	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94)
> 	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
> 	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	... 1 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message