spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Rosen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19091) Implement more accurate statistics for LogicalRDD when child is a mapped ParallelCollectionRDD
Date Fri, 06 Jan 2017 01:02:58 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Josh Rosen updated SPARK-19091:
-------------------------------
    Description: 


The Catalyst optimizer uses LogicalRDD to represent scans from existing RDDs. In general,
it's hard to produce size estimates for arbitrary RDDs, which is why the current implementation
simply estimates these relations sizes using the default size (see the TODO at https://github.com/apache/spark/blob/f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L174)

In the special case where data has been parallelized with {{sc.parallelize()}} we'll wind
up with a ParallelCollectionRDD whose number of elements is known. When we construct a LogicalRDD
plan node in {{SparkSession.createDataFrame()}} we'll probably be using an RDD which is a
one-to-one transformation of a parallel collection RDD (e.g. mapping an encoder to convert
case classes to internal rows). Thus we can have LogicalRDD's statistics method pattern-match
on cases where we have a MappedPartitionsRDD stacked immediately on top of a ParallelCollectionRDD,
then walk up the RDD parent chain to determine the number of elements and we can combine this
with the schema and a conservative "fudge factor" to produce an over-estimate of the LogicalRDD's
size which will be more accurate than the default size.

I believe that this will help us to avoid falling into pathologically bad plans when joining
tiny parallelize()d data sets against huge tables and have one of my own production use-cases
which would have benefitted directly from such an optimization. 

  was:
The Catalyst optimizer uses LogicalRDD to represent scans from existing RDDs. In general,
it's hard to produce size estimates for arbitrary RDDs, which is why the current implementation
simply estimates these relations sizes using the default size (see the TODO at https://github.com/apache/spark/blob/f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L174)

In the special case where data has been parallelized with {{sc.parallelize()}} we'll wind
up with a ParallelCollectionRDD whose number of elements is known. When we construct a LogicalRDD
plan node in {{SparkSession.createDataFrame()}} we'll probably be using an RDD which is a
one-to-one transformation of a parallel collection RDD (e.g. mapping an encoder to convert
case classes to internal rows). Thus we can have LogicalRDD's statistics method pattern-match
on cases where we have a MappedPartitionsRDD stacked immediately on top of a ParallelCollectionRDD,
then walk up the RDD parent chain to determine the number of elements and we can combine this
with the schema and a conservative "fudge factor" to produce an over-estimate of the LogicalRDD's
size which will be more accurate than the default size.

I believe that this will help us to avoid falling into pathologically bad plans when joining
tiny parallelize()d data sets against huge tables and have one of my own production use-cases
which would have benefitted directly from such an optimization. 


> Implement more accurate statistics for LogicalRDD when child is a mapped ParallelCollectionRDD

> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19091
>                 URL: https://issues.apache.org/jira/browse/SPARK-19091
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Josh Rosen
>
> The Catalyst optimizer uses LogicalRDD to represent scans from existing RDDs. In general,
it's hard to produce size estimates for arbitrary RDDs, which is why the current implementation
simply estimates these relations sizes using the default size (see the TODO at https://github.com/apache/spark/blob/f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L174)
> In the special case where data has been parallelized with {{sc.parallelize()}} we'll
wind up with a ParallelCollectionRDD whose number of elements is known. When we construct
a LogicalRDD plan node in {{SparkSession.createDataFrame()}} we'll probably be using an RDD
which is a one-to-one transformation of a parallel collection RDD (e.g. mapping an encoder
to convert case classes to internal rows). Thus we can have LogicalRDD's statistics method
pattern-match on cases where we have a MappedPartitionsRDD stacked immediately on top of a
ParallelCollectionRDD, then walk up the RDD parent chain to determine the number of elements
and we can combine this with the schema and a conservative "fudge factor" to produce an over-estimate
of the LogicalRDD's size which will be more accurate than the default size.
> I believe that this will help us to avoid falling into pathologically bad plans when
joining tiny parallelize()d data sets against huge tables and have one of my own production
use-cases which would have benefitted directly from such an optimization. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message