spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Feng Zhu (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-21964) Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics
Date Sat, 09 Sep 2017 12:43:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-21964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Feng Zhu updated SPARK-21964:
-----------------------------
    Description: 
In current versions, Spark SQL implements grouping analytics clauses (i.e., cube, rollup and
grouping sets) as a single Aggregate operator on a single Expand operator. With this implementation,
we can read the table only once.

However, for many scenarios (e.g., high dimensions cube), the Expand operator is too "heavy"
with a large number of projections, resulting vast shuffle write.

In our production environment, we have encountered various such cases, leading to low performance
or even OOM issues for direct buffer memory.

We demonstrate the issue with the following real-world query of a 6-dimensional cube.
  
{code:sql}
SELECT CASE WHEN grouping(iFrom) = 1 THEN -1 ELSE iFrom END AS iFrom
      ,CASE WHEN grouping(iSrcId) = 1 THEN -1 ELSE iSrcId END AS iSrcId
      ,CASE WHEN grouping(sgametype) = 1 THEN '-1' ELSE sgametype END AS sgametype
      ,CASE WHEN grouping(iOperId) = 1 THEN -1 ELSE iOperId END AS iOperId
      ,CASE WHEN grouping(igameid) = 1 THEN -1 ELSE igameid END AS igameid
      ,CASE WHEN grouping(iacttypeid) = 1 THEN -1 ELSE iacttypeid END AS iacttypeid
      ,SUM(iclickcnt) AS iclickcnt
FROM p_day_advert
WHERE  statedate = 20170810
GROUP BY iFrom, iSrcId, sgametype, iOperId, igameid, iacttypeid WITH CUBE
{code}

For such query, the Expand operator will generates 64 (i.e., 64=2^6) projections. Though the
query reads only about 3GB data, it produces about 250GB data for shuffle write. In our environment,
the first stage costs about 2 hours, and the second stage is easy to get an OOM error unless
we enlarge the some configurations.

Therefore, we tend to provide another choice which enables splitting the heavyweight aggregate
into a number of lightweight aggregates for each group. Actually, it implements the grouping
analytics as Union and executes the aggregates one by one. Though it reads the data many times,
we can still achieve overall high performance. With such implementation, the query can be
accomplished in about 20 mins, of which each aggregation takes 1~4 mins.

!https://issues.apache.org/jira/secure/attachment/12886245/Union.png!

  was:
In current versions, Spark SQL implements grouping analytics clauses (i.e., cube, rollup and
grouping sets) as a single Aggregate operator on a single Expand operator. With this implementation,
we can read the table only once.

However, for many scenarios (e.g., high dimensions cube), the Expand operator is too "heavy"
with a large number of projections, resulting vast shuffle write.

In our production environment, we have encountered various such cases, leading to low performance
or even OOM issues for direct buffer memory.

We demonstrate the issue with the following real-world query of a 6-dimensional cube.
  
{code:sql}
SELECT CASE WHEN grouping(iFrom) = 1 THEN -1 ELSE iFrom END AS iFrom
      ,CASE WHEN grouping(iSrcId) = 1 THEN -1 ELSE iSrcId END AS iSrcId
      ,CASE WHEN grouping(sgametype) = 1 THEN '-1' ELSE sgametype END AS sgametype
      ,CASE WHEN grouping(iOperId) = 1 THEN -1 ELSE iOperId END AS iOperId
      ,CASE WHEN grouping(igameid) = 1 THEN -1 ELSE igameid END AS igameid
      ,CASE WHEN grouping(iacttypeid) = 1 THEN -1 ELSE iacttypeid END AS iacttypeid
      ,SUM(iclickcnt) AS iclickcnt
FROM p_day_advert
WHERE  statedate = 20170810
GROUP BY iFrom, iSrcId, sgametype, iOperId, igameid, iacttypeid WITH CUBE
{code}

For such query, the Expand operator will generates 64 (i.e., 64=2^6) projections. Though the
query reads only about 3GB data, it produces about 250GB data for shuffle write. In our environment,
the first stage costs about 2 hours, and the second stage is easy to get an OOM error unless
we enlarge the some configurations.

Therefore, we tend to provide another choice which enables splitting the heavyweight aggregate
into a number of lightweight aggregates for each group. Actually, it implements the grouping
analytics as Union and executes the aggregates one by one. Though it reads the data many times,
we can still achieve overall high performance. With such implementation, the query can be
accomplished in about 20 mins, of which each aggregation takes 1~4 mins.

!Union.png|


> Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics
> --------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21964
>                 URL: https://issues.apache.org/jira/browse/SPARK-21964
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.0.0, 2.0.2, 2.1.0, 2.1.1, 2.2.0
>            Reporter: Feng Zhu
>         Attachments: before.png, OOMRetry.png, Union.png
>
>
> In current versions, Spark SQL implements grouping analytics clauses (i.e., cube, rollup
and grouping sets) as a single Aggregate operator on a single Expand operator. With this implementation,
we can read the table only once.
> However, for many scenarios (e.g., high dimensions cube), the Expand operator is too
"heavy" with a large number of projections, resulting vast shuffle write.
> In our production environment, we have encountered various such cases, leading to low
performance or even OOM issues for direct buffer memory.
> We demonstrate the issue with the following real-world query of a 6-dimensional cube.
>   
> {code:sql}
> SELECT CASE WHEN grouping(iFrom) = 1 THEN -1 ELSE iFrom END AS iFrom
>       ,CASE WHEN grouping(iSrcId) = 1 THEN -1 ELSE iSrcId END AS iSrcId
>       ,CASE WHEN grouping(sgametype) = 1 THEN '-1' ELSE sgametype END AS sgametype
>       ,CASE WHEN grouping(iOperId) = 1 THEN -1 ELSE iOperId END AS iOperId
>       ,CASE WHEN grouping(igameid) = 1 THEN -1 ELSE igameid END AS igameid
>       ,CASE WHEN grouping(iacttypeid) = 1 THEN -1 ELSE iacttypeid END AS iacttypeid
>       ,SUM(iclickcnt) AS iclickcnt
> FROM p_day_advert
> WHERE  statedate = 20170810
> GROUP BY iFrom, iSrcId, sgametype, iOperId, igameid, iacttypeid WITH CUBE
> {code}
> For such query, the Expand operator will generates 64 (i.e., 64=2^6) projections. Though
the query reads only about 3GB data, it produces about 250GB data for shuffle write. In our
environment, the first stage costs about 2 hours, and the second stage is easy to get an OOM
error unless we enlarge the some configurations.
> Therefore, we tend to provide another choice which enables splitting the heavyweight
aggregate into a number of lightweight aggregates for each group. Actually, it implements
the grouping analytics as Union and executes the aggregates one by one. Though it reads the
data many times, we can still achieve overall high performance. With such implementation,
the query can be accomplished in about 20 mins, of which each aggregation takes 1~4 mins.
> !https://issues.apache.org/jira/secure/attachment/12886245/Union.png!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message