spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jamie Hutton (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-17061) Incorrect results returned following a join of two datasets and a map step where total number of columns >100
Date Mon, 15 Aug 2016 15:52:20 GMT

    [ https://issues.apache.org/jira/browse/SPARK-17061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421169#comment-15421169
] 

Jamie Hutton commented on SPARK-17061:
--------------------------------------

Apologies for setting blocker. I wont use that again.

Is the above definitely the same issue as the one you marked as a duplicate? That does seem
to be slightly different as it relates to persist and 200 columns. Did you manage to run the
above test case on a 2.0.1 codebase and did it work?

> Incorrect results returned following a join of two datasets and a map step where total
number of columns >100
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-17061
>                 URL: https://issues.apache.org/jira/browse/SPARK-17061
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Jamie Hutton
>            Priority: Blocker
>
> We have hit a consistent bug where we have a dataset with more than 100 columns. I am
raising as a blocker because spark is returning the WRONG results rather than erroring, leading
to data integrity issues
> I have put together the following test case which will show the issue (it will run in
spark-shell). In this example i am joining a dataset with lots of fields onto another dataset.

> The join works fine and if you show the dataset you will get the expected result. However
if you run a map step over the dataset you end up with a strange error where the sequence
that is in the right dataset now only contains the last value.
> Whilst this test may seem a rather contrived example, what we are doing here is a very
standard analtical pattern. My original code was designed to:
>  - take a dataset of child records
>  - groupByKey up to the parent: giving a Dataset of (ParentID, Seq[Children])
>  - join the children onto the parent by parentID: giving ((Parent),(ParentID,Seq[Children])
>  - map over the result to give a tuple of (Parent,Seq[Children])
> Notes:
> - The issue is resolved by having less fields - as soon as we go <= 100 the integrity
issue goes away. Try removing one of the fields from BigCaseClass below
> - The issue will arise based on the total number of fields in the resulting dataset.
Below i have a small case class and a big case class, but two case classes of 50 variable
would give the same issue
> - the issue occurs where the case class being joined on (on the right) has a case class
type. It doesnt occur if you have a Seq[String]
> - If i go back to an RDD for the map step after the join i can workaround the issue,
but i lose all the benefits of datasets
> Scala code test case:
>   case class Name(name: String)
>   case class SmallCaseClass (joinkey: Integer, names: Seq[Name])    
>   case class BigCaseClass  (field1: Integer,field2: Integer,field3: Integer,field4: Integer,field5:
Integer,field6: Integer,field7: Integer,field8: Integer,field9: Integer,field10: Integer,field11:
Integer,field12: Integer,field13: Integer,field14: Integer,field15: Integer,field16: Integer,field17:
Integer,field18: Integer,field19: Integer,field20: Integer,field21: Integer,field22: Integer,field23:
Integer,field24: Integer,field25: Integer,field26: Integer,field27: Integer,field28: Integer,field29:
Integer,field30: Integer,field31: Integer,field32: Integer,field33: Integer,field34: Integer,field35:
Integer,field36: Integer,field37: Integer,field38: Integer,field39: Integer,field40: Integer,field41:
Integer,field42: Integer,field43: Integer,field44: Integer,field45: Integer,field46: Integer,field47:
Integer,field48: Integer,field49: Integer,field50: Integer,field51: Integer,field52: Integer,field53:
Integer,field54: Integer,field55: Integer,field56: Integer,field57: Integer,field58: Integer,field59:
Integer,field60: Integer,field61: Integer,field62: Integer,field63: Integer,field64: Integer,field65:
Integer,field66: Integer,field67: Integer,field68: Integer,field69: Integer,field70: Integer,field71:
Integer,field72: Integer,field73: Integer,field74: Integer,field75: Integer,field76: Integer,field77:
Integer,field78: Integer,field79: Integer,field80: Integer,field81: Integer,field82: Integer,field83:
Integer,field84: Integer,field85: Integer,field86: Integer,field87: Integer,field88: Integer,field89:
Integer,field90: Integer,field91: Integer,field92: Integer,field93: Integer,field94: Integer,field95:
Integer,field96: Integer,field97: Integer,field98: Integer,field99: Integer)
>       
>     val bigCC=Seq(BigCaseClass(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62,
63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85,
86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99))
>     
>     val smallCC=Seq(SmallCaseClass(1,Seq(
>         Name("Jamie"), 
>         Name("Ian"),
>         Name("Dave"),
>         Name("Will")
>         )))
>     
>     
>     val bigCCDS = spark.createDataset(spark.sparkContext.parallelize(bigCC))
>     val smallCCDS = spark.createDataset(spark.sparkContext.parallelize(smallCC))
>     
>     val joined_test=bigCCDS.as("A").joinWith(smallCCDS.as("B"),  $"A.field1"===$"B.joinkey",
"LEFT")
>     
>     /*This next step is fine - it shows all 4 names:
>      * [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
>      * [1,WrappedArray([Jamie], [Ian], [Dave], [Will])]
>      * */
>     joined_test.show(false)
>     
>     /*This one ends up repeating will - I did the most simple map step possible here
>      * [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
>      * [1,WrappedArray([Will], [Will], [Will], [Will])]
>      * */
>     joined_test.map(identity).show(false)
>     
>     /*This one works because we have less than 100 fields:
>      * [Jamie], [Ian], [Dave], [Will]*/
>     joined_test.map(_._2).show(false)
>     



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message