spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yael Aharon (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-13680) Java UDAF with more than one intermediate argument returns wrong results
Date Fri, 04 Mar 2016 16:42:40 GMT

    [ https://issues.apache.org/jira/browse/SPARK-13680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180120#comment-15180120
] 

Yael Aharon edited comment on SPARK-13680 at 3/4/16 4:42 PM:
-------------------------------------------------------------

I found this in the spark executor logs when running the MyDoubleAVG UDAF. Execution continued
in spite of this exception:

java.lang.ClassCastException: org.apache.spark.sql.types.GenericArrayData cannot be cast to
java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41)
	at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getLong(rows.scala:247)
	at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply772_0$(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
Source)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$11.apply(AggregationIterator.scala:174)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$11.apply(AggregationIterator.scala:171)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:100)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:139)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:119)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:74)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)


was (Author: yael):
I found this in the spark executor logs when running the MyDoubleAVG UDAF. Execution continued
in spite of this exception:
java.lang.ClassCastException: org.apache.spark.sql.types.GenericArrayData cannot be cast to
java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41)
	at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getLong(rows.scala:247)
	at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply772_0$(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
Source)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$11.apply(AggregationIterator.scala:174)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$11.apply(AggregationIterator.scala:171)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:100)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:139)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:119)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:74)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

> Java UDAF with more than one intermediate argument returns wrong results
> ------------------------------------------------------------------------
>
>                 Key: SPARK-13680
>                 URL: https://issues.apache.org/jira/browse/SPARK-13680
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.0
>         Environment: CDH 5.5.2
>            Reporter: Yael Aharon
>         Attachments: data.csv
>
>
> I am trying to incorporate the Java UDAF from https://github.com/apache/spark/blob/master/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java
into an SQL query. 
> I registered the UDAF like this:
>  sqlContext.udf().register("myavg", new MyDoubleAvg());
> My SQL query is:
> SELECT AVG(seqi) AS `avg_seqi`, AVG(seqd) AS `avg_seqd`, AVG(ci) AS `avg_ci`, AVG(cd)
AS `avg_cd`, AVG(stdevd) AS `avg_stdevd`, AVG(stdevi) AS `avg_stdevi`, MAX(seqi) AS `max_seqi`,
MAX(seqd) AS `max_seqd`, MAX(ci) AS `max_ci`, MAX(cd) AS `max_cd`, MAX(stdevd) AS `max_stdevd`,
MAX(stdevi) AS `max_stdevi`, MIN(seqi) AS `min_seqi`, MIN(seqd) AS `min_seqd`, MIN(ci) AS
`min_ci`, MIN(cd) AS `min_cd`, MIN(stdevd) AS `min_stdevd`, MIN(stdevi) AS `min_stdevi`,SUM(seqi)
AS `sum_seqi`, SUM(seqd) AS `sum_seqd`, SUM(ci) AS `sum_ci`, SUM(cd) AS `sum_cd`, SUM(stdevd)
AS `sum_stdevd`, SUM(stdevi) AS `sum_stdevi`, myavg(seqd) as `myavg_seqd`,          AVG(zero)
AS `avg_zero`, AVG(nulli) AS `avg_nulli`,AVG(nulld) AS `avg_nulld`, SUM(zero) AS `sum_zero`,
SUM(nulli) AS `sum_nulli`,SUM(nulld) AS `sum_nulld`,MAX(zero) AS `max_zero`, MAX(nulli) AS
`max_nulli`,MAX(nulld) AS `max_nulld`,count( * ) AS `count_all`, count(nulli) AS `count_nulli`
FROM mytable
> As soon as I add the UDAF myavg to the SQL, all the results become incorrect. When I
remove the call to the UDAF, the results are correct.
> I was able to go around the issue by modifying bufferSchema of the UDAF to use an array
and the corresponding update and merge methods. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message