hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xuefu Zhang (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (HIVE-16840) Investigate the performance of order by limit in HoS
Date Thu, 08 Jun 2017 03:52:21 GMT

    [ https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041829#comment-16041829
] 

Xuefu Zhang edited comment on HIVE-16840 at 6/8/17 3:52 AM:
------------------------------------------------------------

[~kellyzly], I think you're right and I was confused: sortByKey does produce global order.
The reason for that you gave in the description is also accurate.

Looking back at your proposals, #1 is similar to select in a subquery that outputs ordered
data. It depends on SELECT following FETCH FIRST semantics. I'm not sure if this is reliable.

The proposal #2 seems more plausible except this zipWithIndex(), which could be expensive.

Maybe we can something like this: First, we do parallel sort (with N partitions) but filter
out rows other than first M (M is the limit), followed by another sort (with 1 partition)
with limit of M. This way, one task will sort only MxN rows, which should be fast if both
MxN is small. Basically we will do this:
{code}
val composite1 = sc.parallelize(1 to 200, 10).map(p=>(1-p,p)).sortByKey(N).filter(first
M).sortByKey(1).take(M)
{code}
Could you please check if this is possible?


was (Author: xuefuz):
[~kellyzly], I think you're right and I was confused: sortByKey does produce global order.
The reason for that you gave in the description is also accurate.

Looking back at your proposals, #1 is similar to select in a subquery that outputs ordered
data. It depends on SELECT following FETCH FIRST semantics. I'm not sure if this is reliable.

The proposal #2 seems more plausible except this zipWithIndex(), which could be expensive.

Maybe we can do a combination of the two: First, we do parallel sort (with N partitions) but
filter out rows other than first M (M is the limit), followed by another sort (with 1 partition)
with limit of M. This way, one task will sort only MxN rows, which should be fast if both
MxN is small. Basically we will do this:
{code}
val composite1 = sc.parallelize(1 to 200, 10).map(p=>(1-p,p)).sortByKey(N).filter(first
M).sortByKey(1).take(M)
{code}
Could you please check if this is possible?

> Investigate the performance of order by limit in HoS
> ----------------------------------------------------
>
>                 Key: HIVE-16840
>                 URL: https://issues.apache.org/jira/browse/HIVE-16840
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>        ,i_item_desc
>        ,s_state
>        ,count(ss_quantity) as store_sales_quantitycount
>        ,avg(ss_quantity) as store_sales_quantityave
>        ,stddev_samp(ss_quantity) as store_sales_quantitystdev
>        ,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>        ,count(sr_return_quantity) as_store_returns_quantitycount
>        ,avg(sr_return_quantity) as_store_returns_quantityave
>        ,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>        ,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as store_returns_quantitycov
>        ,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) as catalog_sales_quantityave
>        ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitystdev
>        ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>      ,store_returns
>      ,catalog_sales
>      ,date_dim d1
>      ,date_dim d2
>      ,date_dim d3
>      ,store
>      ,item
>  where d1.d_quarter_name = '2000Q1'
>    and d1.d_date_sk = store_sales.ss_sold_date_sk
>    and item.i_item_sk = store_sales.ss_item_sk
>    and store.s_store_sk = store_sales.ss_store_sk
>    and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>    and store_sales.ss_item_sk = store_returns.sr_item_sk
>    and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>    and store_returns.sr_returned_date_sk = d2.d_date_sk
>    and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>    and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>    and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>    and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>    and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>          ,i_item_desc
>          ,s_state
>  order by i_item_id
>          ,i_item_desc
>          ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
>     Spark
>       Edges:
>         Reducer 10 <- Reducer 9 (SORT, 1)
>         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 (PARTITION-LEVEL SORT,
889)
>         Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 (PARTITION-LEVEL
SORT, 1009)
>         Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 (PARTITION-LEVEL
SORT, 683)
>         Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 (PARTITION-LEVEL
SORT, 751)
>         Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 (PARTITION-LEVEL
SORT, 826)
>         Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 (PARTITION-LEVEL
SORT, 909)
>         Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 (PARTITION-LEVEL
SORT, 1001)
>         Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 task to execute
to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message