spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hanna Mäki (JIRA) <j...@apache.org>
Subject [jira] [Created] (SPARK-17377) Joining Datasets read and aggregated from a partitioned Parquet file gives wrong results
Date Fri, 02 Sep 2016 10:00:28 GMT
Hanna Mäki created SPARK-17377:
----------------------------------

             Summary: Joining Datasets read and aggregated from a partitioned Parquet file
gives wrong results
                 Key: SPARK-17377
                 URL: https://issues.apache.org/jira/browse/SPARK-17377
             Project: Spark
          Issue Type: Bug
    Affects Versions: 2.0.0
            Reporter: Hanna Mäki


Reproduction: 

1) Read two Datasets from a partitioned Parquet file with different filter conditions on the
partitioning column
2) Group by a column and aggregate the two data sets
3) Join the aggregated Datasets on the group by column
4) In the joined dataset, the aggregated values from the right Dataset have been replaced
with the aggregated values from the left Dataset 

The issue is only reproduced when the input parquet file is partitioned.

Example: 

val dataPath= "/your/data/path/" 

case class InputData(id: Int, value: Int, filterColumn: Int)

val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), InputData(4,
4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 2), InputData(4, 40, 2)).toDS()

inputDS.show
+---+-----+------------+
| id|value|filterColumn|
+---+-----+------------+
|  1|    1|           1|
|  2|    2|           1|
|  3|    3|           1|
|  4|    4|           1|
|  1|   10|           2|
|  2|   20|           2|
|  3|   30|           2|
|  4|   40|           2|
+---+-----+------------+

inputDS.write.partitionBy("filterColumn").parquet(dataPath)

val dataDF = spark.read.parquet(dataPath)

case class LeftClass(id: Int, aggLeft: Long)

case class RightClass(id: Int, aggRight: Long)

val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") as "aggLeft").as[LeftClass]

val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") as "aggRight").as[RightClass]

leftDS.show
+---+-------+
| id|aggLeft|
+---+-------+
|  1|      1|
|  3|      3|
|  4|      4|
|  2|      2|
+---+-------+

rightDS.show
+---+--------+
| id|aggRight|
+---+--------+
|  1|      10|
|  3|      30|
|  4|      40|
|  2|      20|
+---+--------+


val joinedDS = leftDS.join(rightDS,"id")
joinedDS.show
+---+-------+--------+
| id|aggLeft|aggRight|
+---+-------+--------+
|  1|      1|       1|
|  3|      3|       3|
|  4|      4|       4|
|  2|      2|       2|
+---+-------+--------+



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message