spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ben (JIRA)" <>
Subject [jira] [Commented] (SPARK-22563) Spark row_number() deterministic generation and materialization as a checkpoint
Date Mon, 20 Nov 2017 15:34:01 GMT


Ben commented on SPARK-22563:

The example is what I would logically expect, but in reality, the ID column are not maintained
but mixed up.

> Spark row_number() deterministic generation and materialization as a checkpoint
> -------------------------------------------------------------------------------
>                 Key: SPARK-22563
>                 URL:
>             Project: Spark
>          Issue Type: Question
>          Components: PySpark, Shuffle, SQL
>    Affects Versions: 2.1.0
>            Reporter: Ben
> I have a large and complex DataFrame with nested structures in Spark 2.1.0 (pySpark)
and I want to add an ID column to it. The way I did it was to add a column like this:
> {code}
> df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS ID')
> {code}
> So it goes e.g. from this:
> {code}
> File     A        B
> a.txt    valA1    [valB11,valB12]
> a.txt    valA2    [valB21,valB22]
> {code}
> to this:
> {code}
> File     A         B                 ID
> a.txt    valA1    [valB11,valB12]    1
> a.txt    valA2    [valB21,valB22]    2
> {code}
> After I add this column, I don't immediately trigger a materialization in Spark, but
I first branch the DataFrame to a new variable:
> {code}
> dfOutput ='A','ID')
> {code}
> with only columns A and ID and I write {{dfOutput}} to Hive, so I get e.g. *Table1*:
> {code}
> A        ID
> valA1    1
> valA2    2
> {code}
> So far so good. Then I continue using {{df}} for further transformations, namely I explode
some of the nested arrays in the columns and drop the original, like this:
> {code}
> df = df.withColumn('Bexpl',explode('B')).drop('B')
> {code}
> and I get this:
> {code}
> File     A        Bexpl      ID
> a.txt    valA1    valB11     1
> a.txt    valA1    valB12     1
> a.txt    valA2    valB21     2
> a.txt    valA2    valB22     2
> {code}
> and output other tables from it, sometimes after creating a second ID column since there
are more rows from the exploded arrays. E.g. I create *Table2*:
> {code}
> df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS ID2')
> {code}
> to get:
> {code}
> File     A        Bexpl      ID    ID2
> a.txt    valA1    valB11     1     1
> a.txt    valA1    valB12     1     2
> a.txt    valA2    valB21     2     3
> a.txt    valA2    valB22     2     4
> {code}
> and output as earlier:
> {code}
> dfOutput2 ='Bexpl','ID','ID2')
> {code}
> to get:
> {code}
> Bexpl      ID    ID2
> valB11     1     1
> valB12     1     2
> valB21     2     3
> valB22     2     4
> {code}
> I would expect that the values of the first ID column remain the same and match the data
for each row from the point that this column was created. This would allow me to keep a relation
between *Table1* created from {{dfOutput}} and subsequent tables from {{df}}, like {{dfOutput2}}
and the resulting *Table2*.
> The problem is that ID and ID2 are not as they should be in the example above, but mixed
up, and I'm trying to find out why. My guess is that the values of the first ID column are
not deterministic because {{df}} is not materialized before branching to {{dfOutput}}. So
when the data is actually materialized when saving the table from {{dfOutput}}, the rows are
shuffled and IDs are different from the data that is materialized on a later point from {{df}},
as in {{dfOutput2}}. I am however not sure, so my questions are:
> 1. Is my assumption correct, that IDs are generated differently for the different branches
although I add the column before branching?
> 2. Would materializing the DataFrame before branching to {{dfOutput}} (e.g. through {{df.cache().count()}}
ensure a fixed ID column which I can later branch however I want from {{df}}, so that I can
use this as a checkpoint?
> 3. If not, how can I solve this?
> I would appreciate any help or at least quick confirmation because I can't test it properly.
Spark would shuffle the data only if it doesn't have enough memory, and reaching that point
would mean loading a lot of data and in turn need a long time, and may still provide coincidentally
good results (already tried with smaller datasets).

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message