hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HIVE-16980) The partition of join is not divided evently in HOS
Date Fri, 30 Jun 2017 08:41:00 GMT

    [ https://issues.apache.org/jira/browse/HIVE-16980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069708#comment-16069708
] 

liyunzhang_intel commented on HIVE-16980:
-----------------------------------------

[~lirui] and [~xuefuz]:  attached is the screenshot of TPC-DS/query17.sql on 3TB. 
TPC-DS/query17.sql
{code}
select  i_item_id
       ,i_item_desc
       ,s_state
       ,count(ss_quantity) as store_sales_quantitycount
       ,avg(ss_quantity) as store_sales_quantityave
       ,stddev_samp(ss_quantity) as store_sales_quantitystdev
       ,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
       ,count(sr_return_quantity) as_store_returns_quantitycount
       ,avg(sr_return_quantity) as_store_returns_quantityave
       ,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
       ,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as store_returns_quantitycov
       ,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) as catalog_sales_quantityave
       ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitystdev
       ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
 from store_sales
     ,store_returns
     ,catalog_sales
     ,date_dim d1
     ,date_dim d2
     ,date_dim d3
     ,store
     ,item
 where d1.d_quarter_name = '2000Q1'
   and d1.d_date_sk = store_sales.ss_sold_date_sk
   and item.i_item_sk = store_sales.ss_item_sk
   and store.s_store_sk = store_sales.ss_store_sk
   and store_sales.ss_customer_sk = store_returns.sr_customer_sk
   and store_sales.ss_item_sk = store_returns.sr_item_sk
   and store_sales.ss_ticket_number = store_returns.sr_ticket_number
   and store_returns.sr_returned_date_sk = d2.d_date_sk
   and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
   and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
   and store_returns.sr_item_sk = catalog_sales.cs_item_sk
   and catalog_sales.cs_sold_date_sk = d3.d_date_sk
   and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
 group by i_item_id
         ,i_item_desc
         ,s_state
 order by i_item_id
         ,i_item_desc
         ,s_state
limit 100;
{code}
explain is also attached. 
Let's explain the explain
   store, item, d2,d3, d1 is small table.
   store_sales, store_returns, ctalog_sales are big table.
   there are 7 stages in the job
   Stage-0:  d2 union d3 union store union item  ( all these small table will be converted
to map join. Here first strange thing is d1 is also small ,why d1 is in the first stage-0)
   Stage-1
        Reducer 2 <- Map 1 (store_sales), Map 7 (store_returns)
        Reducer 3 <- Map 8 (catalog_sales), Reducer 2 
        Reducer 4 <- Map 9 (d1), Reducer 3 
        Reducer 5 <- Reducer 4 (GROUP)
        Reducer 6 <- Reducer 5 (SORT)
the screenshot is about  Stage :Reducer 3 <- Map 8 (catalog_sales), Reducer 2 " . In the
history server, it shows 2178 tasks finished, Median of duration time is 4s. 75 percentile
of duration is 20 min. Max of duration time 32min.  About Shuffle Read size/Records, Median
of it is 0.0B/0. 75 percentile of it is 274.9MB/8695090. Max of it  is 275.3MB/8709548.  I
don't understand these metrics very much but it seems that the difference between tasks are
too big especially some tasks need a lot of shuffle read while others are not.  Can you help
to see where is wrong?


   

> The partition of join is not divided evently in HOS
> ---------------------------------------------------
>
>                 Key: HIVE-16980
>                 URL: https://issues.apache.org/jira/browse/HIVE-16980
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang_intel
>
> In HoS,the join implementation is union+repartition sort. We use HashPartitioner to
partition the result of union. 
> SortByShuffler.java
> {code}
>     public JavaPairRDD<HiveKey, BytesWritable> shuffle(
>       JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions) {
>     JavaPairRDD<HiveKey, BytesWritable> rdd;
>     if (totalOrder) {
>       if (numPartitions > 0) {
>         if (numPartitions > 1 && input.getStorageLevel() == StorageLevel.NONE())
{
>           input.persist(StorageLevel.DISK_ONLY());
>           sparkPlan.addCachedRDDId(input.id());
>         }
>         rdd = input.sortByKey(true, numPartitions);
>       } else {
>         rdd = input.sortByKey(true);
>       }
>     } else {
>       Partitioner partitioner = new HashPartitioner(numPartitions);
>       rdd = input.repartitionAndSortWithinPartitions(partitioner);
>     }
>     return rdd;
>   }
> {code}
> In spark history server, i saw that there are 28 tasks in the repartition sort period
while 21 tasks are finished less than 1s and the remaining 7 tasks spend long time to execute.
Is there any way to make the data evenly assigned to every partition?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message