spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-7002) Persist on RDD fails the second time if the action is called on a child RDD without showing a FAILED message
Date Mon, 20 Apr 2015 21:56:59 GMT

    [ https://issues.apache.org/jira/browse/SPARK-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14503761#comment-14503761
] 

Sean Owen commented on SPARK-7002:
----------------------------------

The shuffle data is a sort of hidden, second type of caching that goes on. I don't know how
much it's supposed to be exposed. My hunch is that if there's an easy API already to access
this info, go ahead and propose adding it to the debug string, but if it's not otherwise easily
accounted for, may not be worth adding it. It's good to know that there is a logic to what
is happening, at least, rather than a bug.

> Persist on RDD fails the second time if the action is called on a child RDD without showing
a FAILED message
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7002
>                 URL: https://issues.apache.org/jira/browse/SPARK-7002
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.0
>         Environment: Platform: Power8
> OS: Ubuntu 14.10
> Java: java-8-openjdk-ppc64el
>            Reporter: Tom Hubregtsen
>            Priority: Minor
>              Labels: disk, persist, unpersist
>
> The major issue is: Persist on RDD fails the second time if the action is called on a
child RDD without showing a FAILED message. This is pointed out at 2)
> next to this:
> toDebugString on a child RDD does not show that the parent RDD is [Disk Serialized 1x
Replicated]. This is pointed out at 1)
> Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is not physically
stored, as I did not want to solely rely on a missing line in .toDebugString (see comments
in trace)
> {code}
> scala> val rdd1 = sc.parallelize(List(1,2,3))
> scala> val rdd2 = rdd1.map(x => (x,x+1))
> scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y)
> scala> import org.apache.spark.storage.StorageLevel
> scala> rdd2.persist(StorageLevel.DISK_ONLY)
> scala> rdd3.collect()
> scala> rdd2.toDebugString
> res4: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
>   \|        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0
B
>   \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized
1x Replicated]
> scala> rdd3.toDebugString
> res5: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       \|       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize:
802.0 B
>       \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x Replicated], but
the data is on disk. This is verified by
> // a) The line starting with CachedPartitions
> // b) a find in spark_local_dir: "find . -name "\*"  \| grep rdd" returns "./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*",
where * are the number of partitions
> scala> rdd2.unpersist()
> scala> rdd2.toDebugString
> res8: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 []
>   \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> scala> rdd3.toDebugString
> res9: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // successfully unpersisted, also not visible on disk
> scala> rdd2.persist(StorageLevel.DISK_ONLY)
> scala> rdd3.collect()
> scala> rdd2.toDebugString
> res18: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
>   \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized
1x Replicated]
> scala> rdd3.toDebugString
> res19: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // 2) The data is not visible on disk though the find command previously mentioned, and
is also not mentioned in the toDebugString (no line starting with CachedPartitions, even though
 [Disk Serialized 1x Replicated] is mentioned). It does work when you call the action on the
actual RDD:
> scala> rdd2.collect()
> scala> rdd2.toDebugString
> res21: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
>   \|        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0
B
>   \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized
1x Replicated]
> scala> rdd3.toDebugString
> res22: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       \|       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize:
802.0 B
>       \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // Data appears on disk again (using find command preciously mentioned), and line with
CachedPartitions is back in the .toDebugString
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message