spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result
Date Mon, 15 Jan 2018 11:47:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326153#comment-16326153
] 

Sean Owen commented on SPARK-23076:
-----------------------------------

That sounds serious, although if true, I would expect a lot of stuff fails.

Where are you calling cache by the way?

Maybe CC [~cloud_fan]

> When we call cache() on ShuffleRowRDD, we will get an error result
> ------------------------------------------------------------------
>
>                 Key: SPARK-23076
>                 URL: https://issues.apache.org/jira/browse/SPARK-23076
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: zhoukang
>            Priority: Major
>         Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:10000/> select * from csv_demo limit 3;
>  +---------------+-++--
> |_c0|_c1|
> +---------------+-++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> +---------------+-++--
>  However,when we call cache on ShuffleRowRDD(or RDD which depends on ShuffleRowRDD in
one stage):
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  +---------------+-++--
> |_c0|_c1|
> +---------------+-++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> +---------------+-++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
>     if (i != 0) build.append(',');
>     build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset +
i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset
here
>   return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value:
[0,1800000003,2000000002,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value:
[0,1800000003,2000000002,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value:
[0,1800000007,2000000002,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when
config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
>     if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
>       && x._2.isInstanceOf[UnsafeRow]) {
>       (x._2).asInstanceOf[UnsafeRow].copy()
>     } else {
>       x._2
>     }
>   })
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message