spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-22828) Data corruption happens when same RDD being repeatedly used as parent RDD of a custom RDD which reads each parent RDD in concurrent threads
Date Tue, 19 Dec 2017 19:22:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-22828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean Owen updated SPARK-22828:
------------------------------
    Target Version/s:   (was: 2.2.0)
              Labels:   (was: spark)

> Data corruption happens when same RDD being repeatedly used as parent RDD of a custom
RDD which reads each parent RDD in concurrent threads
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22828
>                 URL: https://issues.apache.org/jira/browse/SPARK-22828
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Yongqin Xiao
>
> I have defined a custom RDD that 
> - computes the output based on input data using our traditional data transformation code.
To give an extreme example, this custom RDD can behave as a union, joiner etc. 
> - takes one or more parent RDDs as input, where some or all parent RDDs can be the same
> - reads input parent RDDs in concurrent threads (i.e. reader threads)
> - computes the data in one or more transformation thread that concurrently running as
the reader threads
> - ...
> In certain cases, we see{color:red} data being corrupted{color} when our reader threads
read them in. The corruption happens when all of the following conditions are met:
> - Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. same-source
union.
> {code:java}
> The scala code is kind of like this:
> Rdd rdd1 = ...
> Rdd customRdd = new MyRdd(rdd1, rdd1, ...)
> {code}
> - The parent RDD is not a result of repartitioning or sorting-within-partition.
> - There is no persistence on the same parent RDD.
> - spark.sql.shuffle.partitions is set to 1. We saw corruption as well when the value
is set to small value like 2, which is also the source partition count.
> This data corruption happens even when number of executors and cores are set to 1. Meaning
this corruption is not related to multiple partitions running concurrently.
> Data corruption doesn't happen when either of the condition is met:
> 1. Instead of setting the same parent RDD as multiple input to my custom RDD, we do a
select (of all columns) operation on that parent RDD, and use different select RDD as input.
> {code:java}
>  The scala code is like this:
> Rdd rdd1 = ...
> Rdd customRdd = new MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...)
> {code}
> 2. we persist the parent RDD
> {code:java}
> Rdd rdd1 = ...
> rdd1.persist(...)
> Rdd customRdd = MyRdd(rdd1, rdd1, ...)
> {code}
> 3. we use single thread to read parent RDD in custom RDD implementation
> 4. Use our default value (100) for spark.sql.shuffle.partitions 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message