spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kay Ousterhout (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-5360) For CoGroupedRDD, rdds for narrow dependencies and shuffle handles are included twice in serialized task
Date Wed, 21 Jan 2015 21:57:34 GMT

     [ https://issues.apache.org/jira/browse/SPARK-5360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Kay Ousterhout updated SPARK-5360:
----------------------------------
    Description: 
CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that the CoGroupedRDD
narrowly depends on, and a reference to the ShuffleHandle.  The partition is serialized separately
from the RDD, so when the RDD and partition arrive on the worker, the references in the partition
and in the RDD no longer point to the same object.

This is a relatively minor performance issue (the closure can be 2x larger than it needs to
be because the rdds and partitions are serialized twice; see numbers below) but is more annoying
as a developer issue (this is where I ran into): if any state is stored in the RDD or ShuffleHandle
on the worker side, subtle bugs can appear due to the fact that the references to the RDD
/ ShuffleHandle in the RDD and in the partition point to separate objects.  I'm not sure if
this is enough of a potential future problem to fix this old and central part of the code,
so hoping to get input from others here.

I did some simple experiments to see how much this effects closure size.  For this example:

$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()

the closure was 1902 bytes with current Spark, and 1129 bytes after my change.  The difference
comes from eliminating duplicate serialization of the shuffle handle.

For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()

the closure was 3491 bytes with current Spark, and 1333 bytes after my change. Here, the difference
comes from eliminating duplicate serialization of the two RDDs for the narrow dependencies.

The ShuffleHandle includes the ShuffleDependency, so this difference will get larger if a
ShuffleDependency includes a serializer, a key ordering, or an aggregator (all set to None
by default).  However, the difference is not affected by the size of the function the user
specifies, which (based on my understanding) is typically the source of large task closures.

  was:
CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that the CoGroupedRDD
narrowly depends on, and a reference to the ShuffleHandle.  The partition is serialized separately
from the RDD, so when the RDD and partition arrive on the worker, the references in the partition
and in the RDD no longer point to the same object.

This is a relatively minor performance issue (the closure can be 2x larger than it needs to
be because the rdds and partitions are serialized twice; see numbers below) but is more annoying
as a developer issue (this is where I ran into): if any state is stored in the RDD or ShuffleHandle
on the worker side, subtle bugs can appear due to the fact that the references to the RDD
/ ShuffleHandle in the RDD and in the partition point to separate objects.  I'm not sure if
this is enough of a potential future problem to fix this old and central part of the code,
so hoping to get input from others here.

I did some simple experiments to see how much this effects closure size.  For this example:

$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()

the closure was 1902 bytes with current Spark, and 1129 bytes after my change.  The difference
comes from eliminating duplicate serialization of the shuffle handle.

For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()

the closure was 3491 bytes with current Spark, and 1333 bytes after my change. Here, the difference
comes from eliminating duplicate serialization of the two RDDs for the narrow dependencies.


> For CoGroupedRDD, rdds for narrow dependencies and shuffle handles are included twice
in serialized task
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5360
>                 URL: https://issues.apache.org/jira/browse/SPARK-5360
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.2.0
>            Reporter: Kay Ousterhout
>            Assignee: Kay Ousterhout
>            Priority: Minor
>
> CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that the CoGroupedRDD
narrowly depends on, and a reference to the ShuffleHandle.  The partition is serialized separately
from the RDD, so when the RDD and partition arrive on the worker, the references in the partition
and in the RDD no longer point to the same object.
> This is a relatively minor performance issue (the closure can be 2x larger than it needs
to be because the rdds and partitions are serialized twice; see numbers below) but is more
annoying as a developer issue (this is where I ran into): if any state is stored in the RDD
or ShuffleHandle on the worker side, subtle bugs can appear due to the fact that the references
to the RDD / ShuffleHandle in the RDD and in the partition point to separate objects.  I'm
not sure if this is enough of a potential future problem to fix this old and central part
of the code, so hoping to get input from others here.
> I did some simple experiments to see how much this effects closure size.  For this example:

> $ val a = sc.parallelize(1 to 10).map((_, 1))
> $ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
> $ a.cogroup(b).collect()
> the closure was 1902 bytes with current Spark, and 1129 bytes after my change.  The difference
comes from eliminating duplicate serialization of the shuffle handle.
> For this example:
> $ val sortedA = a.sortByKey()
> $ val sortedB = b.sortByKey()
> $ sortedA.cogroup(sortedB).collect()
> the closure was 3491 bytes with current Spark, and 1333 bytes after my change. Here,
the difference comes from eliminating duplicate serialization of the two RDDs for the narrow
dependencies.
> The ShuffleHandle includes the ShuffleDependency, so this difference will get larger
if a ShuffleDependency includes a serializer, a key ordering, or an aggregator (all set to
None by default).  However, the difference is not affected by the size of the function the
user specifies, which (based on my understanding) is typically the source of large task closures.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message