spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <>
Subject [jira] [Assigned] (SPARK-21964) Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics
Date Sat, 09 Sep 2017 13:51:02 GMT


Apache Spark reassigned SPARK-21964:

    Assignee: Apache Spark

> Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics
> --------------------------------------------------------------------------------------------
>                 Key: SPARK-21964
>                 URL:
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.0.0, 2.0.2, 2.1.0, 2.1.1, 2.2.0
>            Reporter: Feng Zhu
>            Assignee: Apache Spark
>         Attachments: before.png, OOMRetry.png, Union.png
> In current versions, Spark SQL implements grouping analytics clauses (i.e., cube, rollup
and grouping sets) as a single Aggregate operator on a single Expand operator. With this implementation,
we can read the table only once.
> However, for many scenarios (e.g., high dimensions cube), the Expand operator is too
"heavy" with a large number of projections, resulting vast shuffle write.
> In our production environment, we have encountered various such cases, leading to low
performance or even OOM issues for direct buffer memory. We demonstrate the issue with the
following real-world query of a 6-dimensional cube.
> {code:sql}
> SELECT CASE WHEN grouping(iFrom) = 1 THEN -1 ELSE iFrom END AS iFrom
>       ,CASE WHEN grouping(iSrcId) = 1 THEN -1 ELSE iSrcId END AS iSrcId
>       ,CASE WHEN grouping(sgametype) = 1 THEN '-1' ELSE sgametype END AS sgametype
>       ,CASE WHEN grouping(iOperId) = 1 THEN -1 ELSE iOperId END AS iOperId
>       ,CASE WHEN grouping(igameid) = 1 THEN -1 ELSE igameid END AS igameid
>       ,CASE WHEN grouping(iacttypeid) = 1 THEN -1 ELSE iacttypeid END AS iacttypeid
>       ,SUM(iclickcnt) AS iclickcnt
> FROM p_day_advert
> WHERE  statedate = 20170810
> GROUP BY iFrom, iSrcId, sgametype, iOperId, igameid, iacttypeid WITH CUBE
> {code}
> For such query, the Expand operator will generates 64 (i.e., 64=2^6) projections. Though
the query reads only about 3GB data, it produces about 250GB data for shuffle write. In our
environment, the first stage costs about 2 hours.
> !!
> The second stage is easy to get an OOM error unless we enlarge the some configurations.
> !!
> Therefore, we tend to provide another choice which enables splitting the heavyweight
aggregate into a number of lightweight aggregates for each group. Actually, it implements
the grouping analytics as Union and executes the aggregates one by one. Though it reads the
data many times, we can still achieve overall high performance. With such implementation,
the query can be accomplished in about 20 mins, of which each aggregation takes 1~4 mins.
> !!

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message