spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhan Zhang <>
Subject SparkPlan/Shuffle stage reuse with Dataset/DataFrame
Date Wed, 19 Oct 2016 01:04:32 GMT
Hi Folks,

We have some Dataset/Dataframe use cases that will benefit from reuse the
SparkPlan and shuffle stage. 

For example, the following cases. Because the query optimization and
sparkplan is generated by catalyst when it is executed, as a result, the
underlying RDD lineage is regenerated for dataset1. Thus, the shuffle stage
will be executed multiple times.

val dataset1 = dataset.groupby.agg
spark.sql("select * from tmpTable where condition").collect
spark.sql("select * from tmpTable where condition1").cllect

On the one side, we get optimized query plan, but on the other side, we
cannot reuse the data generated by shuffle stage.

Currently, to reuse the dataset1, we have to use persist to cache the data.
It is helpful but sometimes is not what we want, as it has some side effect.
For example, we cannot release the executor that has active cache in it even
it is idle and dynamic allocator is enabled.

In other words, we only want to reuse the shuffle data as much as possible
without caching in a long pipeline with multiple shuffle stages.

I am wondering does it make sense to add a new feature to Dataset/Dataframe
to work as barrier and prevent the query optimization happens across the

For example, in the above case, we want catalyst take tmpTable as a barrier,
and stop optimization across it, so that we can reuse the underlying rdd
lineage of dataset1.

The prototype code to make it work is quite small, and we tried in house
with a new API as Dataset.cacheShuffle to make this happen.

But I want some feedback from community before opening a JIRA, as in some
sense, it does stop the optimization earlier. Any comments?

View this message in context:
Sent from the Apache Spark Developers List mailing list archive at

To unsubscribe e-mail:

View raw message