spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Vrba (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-24399) Reused Exchange is used where it should not be
Date Sun, 27 May 2018 13:42:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

David Vrba updated SPARK-24399:
-------------------------------
    Description: 
Reused Exchange produces wrong result. Here is the code to reproduce the issue:
{code:java}
 
import org.apache.spark.sql.functions.{sum, lit}
import org.apache.spark.sql.expressions.Window


val row1 = (1, 3, 4, 50)
val row2 = (2, 2, 2, 250)
val row3 = (3, 2, 4, 250)
val row4 = (4, 3, 1, 350)
val data = Seq(row1, row2, row3, row4)


val df = data.toDF("id", "pFilter", "secondFilter", "metricToAgg").cache()

val w = Window.partitionBy($"id")

val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
  .filter($"activity_sum" > 50)
  .filter($"pFilter".isin(2, 3))
  .agg(sum($"metricToAgg"))
  .withColumn("t", lit("first_union_part"))

val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
  .filter($"activity_sum" > 50)
  .filter($"secondFilter".isin(2, 3))
  .agg(sum($"metricToAgg"))
  .withColumn("t", lit("second_union_part"))

val finalDF = firstUnionPart.union(secondUnionPart)
finalDF.show()

+----------------+-----------------+ 
|sum(metricToAgg)| t               | 
+----------------+-----------------+ 
| 850            | first_union_part| 
| 850            |second_union_part| 
+----------------+-----------------+
{code}
 

The second row is wrong, it should be 250, instead of 850, which you can see if you show both
unionParts separately:
{code:java}
firstUnionPart.show() 
+----------------+----------------+ 
|sum(metricToAgg)|               t| 
+----------------+----------------+ 
|             850|first_union_part| 
+----------------+----------------+

secondUnionPart.show()
+----------------+-----------------+
|sum(metricToAgg)|                t|
+----------------+-----------------+
|             250|second_union_part|
+----------------+-----------------+{code}
 

The ReusedExchange replaced the part of the query plan in the second branch of the union by
the query plan from the first branch as you can see from explain() function.

I did some inspection and it appears that both sub-plans have the same canonicalized plans
and therefore the ReusedExchange takes place. But I don't think they should have the same
canonicalized plan, since according to the notes in the source code only plans that evaluate
to the same result can have same canonicalized plans. And the two sub-plans in this query
lead in principle to different results, because in the second union there is filter on different
column than in the first union.

 

Interesting think happens when we change the name of the second column from "pFilter" to "kFilter".
In this case query works fine and produces correct result, as you can see here:
{code:java}
import org.apache.spark.sql.functions.{sum, lit}
import org.apache.spark.sql.expressions.Window


val row1 = (1, 3, 4, 50)
val row2 = (2, 2, 2, 250)
val row3 = (3, 2, 4, 250)
val row4 = (4, 3, 1, 350)
val data = Seq(row1, row2, row3, row4)


val df = data.toDF("id", "kFilter", "secondFilter", "metricToAgg").cache()

val w = Window.partitionBy($"id")

val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
  .filter($"activity_sum" > 50)
  .filter($"kFilter".isin(2, 3))
  .agg(sum($"metricToAgg"))
  .withColumn("t", lit("first_union_part"))

val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
  .filter($"activity_sum" > 50)
  .filter($"secondFilter".isin(2, 3))
  .agg(sum($"metricToAgg"))
  .withColumn("t", lit("second_union_part"))

val finalDF = firstUnionPart.union(secondUnionPart)

finalDF.show()

+----------------+-----------------+
|sum(metricToAgg)|                t|
+----------------+-----------------+
|             850| first_union_part|
|             250|second_union_part|
+----------------+-----------------+{code}
 

The result is now correct and the only think we changed is a name of one column. The ReusedExchange
does not happen here and I checked that the canonicalized plans now really differ.

 

The key points to reproduce this bug are:
 # Use union (or some operator with multiple branches)
 # Use cache to have InMemoryTableScan
 # Use operator that forces Exchange in the plan (in this case window function call)
 # Use column names that will have specific alphabetical order

 

 

  was:
Reused Exchange produces wrong result. Here is the code to reproduce the issue:
{code:java}
 
import org.apache.spark.sql.functions.{sum, lit}
import org.apache.spark.sql.expressions.Window


val row1 = (1, 3, 4, 50)
val row2 = (2, 2, 2, 250)
val row3 = (3, 2, 4, 250)
val row4 = (4, 3, 1, 350)
val data = Seq(row1, row2, row3, row4)


val df = data.toDF("id", "pFilter", "secondFilter", "metricToAgg").cache()

val w = Window.partitionBy($"id")

val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
  .filter($"activity_sum" > 50)
  .filter($"pFilter".isin(2, 3))
  .agg(sum($"metricToAgg"))
  .withColumn("t", lit("first_union_part"))

val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
  .filter($"activity_sum" > 50)
  .filter($"secondFilter".isin(2, 3))
  .agg(sum($"metricToAgg"))
  .withColumn("t", lit("second_union_part"))

val finalDF = firstUnionPart.union(secondUnionPart)
finalDF.show()

+----------------+-----------------+ 
|sum(metricToAgg)| t               | 
+----------------+-----------------+ 
| 850            | first_union_part| 
| 850            |second_union_part| 
+----------------+-----------------+
{code}
 

The second row is wrong, it should be 250, instead of 850, which you can see if you show both
unionParts separately:
{code:java}
firstUnionPart.show() 
+----------------+----------------+ 
|sum(metricToAgg)|               t| 
+----------------+----------------+ 
|             850|first_union_part| 
+----------------+----------------+

secondUnionPart.show()
+----------------+-----------------+
|sum(metricToAgg)|                t|
+----------------+-----------------+
|             250|second_union_part|
+----------------+-----------------+{code}
 

The ReusedExchange replaced the part of the query plan in the second branch of the union by
the query plan from the first branch as you can see from explain() function.

I did some inspection and it appears that both sub-plans have the same canonicalized plans
and therefore the ReusedExchange takes place. But I don't think they should have the same
canonicalized plan, since according to the notes in the source code only plans that lead to
the same result can have same canonicalized plans. And the two sub-plans in this query lead
in principle to different result, because in the second union there is filter on different
column than in the first union.

 

Interesting think happens when we change the name of the second column from "pFilter" to "kFilter".
In this case query works fine and produces correct result, as you can see here: 
{code:java}
import org.apache.spark.sql.functions.{sum, lit}
import org.apache.spark.sql.expressions.Window


val row1 = (1, 3, 4, 50)
val row2 = (2, 2, 2, 250)
val row3 = (3, 2, 4, 250)
val row4 = (4, 3, 1, 350)
val data = Seq(row1, row2, row3, row4)


val df = data.toDF("id", "kFilter", "secondFilter", "metricToAgg").cache()

val w = Window.partitionBy($"id")

val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
  .filter($"activity_sum" > 50)
  .filter($"kFilter".isin(2, 3))
  .agg(sum($"metricToAgg"))
  .withColumn("t", lit("first_union_part"))

val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
  .filter($"activity_sum" > 50)
  .filter($"secondFilter".isin(2, 3))
  .agg(sum($"metricToAgg"))
  .withColumn("t", lit("second_union_part"))

val finalDF = firstUnionPart.union(secondUnionPart)

finalDF.show()

+----------------+-----------------+
|sum(metricToAgg)|                t|
+----------------+-----------------+
|             850| first_union_part|
|             250|second_union_part|
+----------------+-----------------+{code}
 

The result is now correct and the only think we changed is a name of one column. The ReusedExchange
does not happen here and I checked that the canonicalized plans now really differ.

 

The key points to reproduce this bug are:
 # Use union (or some operator with multiple branches)
 # Use cache to have InMemoryTableScan
 # Use operator that forces Exchange in the plan (in this case window function call)
 # Use column names that will have specific alphabetical order

 

 


> Reused Exchange is used where it should not be
> ----------------------------------------------
>
>                 Key: SPARK-24399
>                 URL: https://issues.apache.org/jira/browse/SPARK-24399
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: David Vrba
>            Priority: Critical
>              Labels: correctness
>
> Reused Exchange produces wrong result. Here is the code to reproduce the issue:
> {code:java}
>  
> import org.apache.spark.sql.functions.{sum, lit}
> import org.apache.spark.sql.expressions.Window
> val row1 = (1, 3, 4, 50)
> val row2 = (2, 2, 2, 250)
> val row3 = (3, 2, 4, 250)
> val row4 = (4, 3, 1, 350)
> val data = Seq(row1, row2, row3, row4)
> val df = data.toDF("id", "pFilter", "secondFilter", "metricToAgg").cache()
> val w = Window.partitionBy($"id")
> val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"pFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("first_union_part"))
> val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"secondFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("second_union_part"))
> val finalDF = firstUnionPart.union(secondUnionPart)
> finalDF.show()
> +----------------+-----------------+ 
> |sum(metricToAgg)| t               | 
> +----------------+-----------------+ 
> | 850            | first_union_part| 
> | 850            |second_union_part| 
> +----------------+-----------------+
> {code}
>  
> The second row is wrong, it should be 250, instead of 850, which you can see if you show
both unionParts separately:
> {code:java}
> firstUnionPart.show() 
> +----------------+----------------+ 
> |sum(metricToAgg)|               t| 
> +----------------+----------------+ 
> |             850|first_union_part| 
> +----------------+----------------+
> secondUnionPart.show()
> +----------------+-----------------+
> |sum(metricToAgg)|                t|
> +----------------+-----------------+
> |             250|second_union_part|
> +----------------+-----------------+{code}
>  
> The ReusedExchange replaced the part of the query plan in the second branch of the union
by the query plan from the first branch as you can see from explain() function.
> I did some inspection and it appears that both sub-plans have the same canonicalized
plans and therefore the ReusedExchange takes place. But I don't think they should have the
same canonicalized plan, since according to the notes in the source code only plans that evaluate
to the same result can have same canonicalized plans. And the two sub-plans in this query
lead in principle to different results, because in the second union there is filter on different
column than in the first union.
>  
> Interesting think happens when we change the name of the second column from "pFilter"
to "kFilter". In this case query works fine and produces correct result, as you can see here:
> {code:java}
> import org.apache.spark.sql.functions.{sum, lit}
> import org.apache.spark.sql.expressions.Window
> val row1 = (1, 3, 4, 50)
> val row2 = (2, 2, 2, 250)
> val row3 = (3, 2, 4, 250)
> val row4 = (4, 3, 1, 350)
> val data = Seq(row1, row2, row3, row4)
> val df = data.toDF("id", "kFilter", "secondFilter", "metricToAgg").cache()
> val w = Window.partitionBy($"id")
> val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"kFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("first_union_part"))
> val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"secondFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("second_union_part"))
> val finalDF = firstUnionPart.union(secondUnionPart)
> finalDF.show()
> +----------------+-----------------+
> |sum(metricToAgg)|                t|
> +----------------+-----------------+
> |             850| first_union_part|
> |             250|second_union_part|
> +----------------+-----------------+{code}
>  
> The result is now correct and the only think we changed is a name of one column. The
ReusedExchange does not happen here and I checked that the canonicalized plans now really
differ.
>  
> The key points to reproduce this bug are:
>  # Use union (or some operator with multiple branches)
>  # Use cache to have InMemoryTableScan
>  # Use operator that forces Exchange in the plan (in this case window function call)
>  # Use column names that will have specific alphabetical order
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message