tez-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bikas Saha (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
Date Tue, 07 Jul 2015 20:47:04 GMT

    [ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617362#comment-14617362

Bikas Saha commented on TEZ-2496:

Summarizing an offline discussion on the pros and cons on this approach
1) lives in user land - easier to iterate
2) memory efficient - shuffle vertex manager can apply policy specific to partition stats
use case. deterministic sizes mean it can aggregate upon event receipt and discard the raw
3) cpu efficient - because its not calling getStatistics() repeatedly
4) works with pipelining and getting early stats from running tasks. getStatistics() would
get expensive for this.
5) allows for other use cases like sending partition stats to inputs for runtime optimizations.
Only the shuffle vertex manager can correctly do this since it merges partitions during auto
6) Once this has stabilized and optimized then we can transfer the logic to a partition stats
API that would be generally available as part of the system.

> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the
> ----------------------------------------------------------------------------------------------
>                 Key: TEZ-2496
>                 URL: https://issues.apache.org/jira/browse/TEZ-2496
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, TEZ-2496.4.patch,
TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, TEZ-2496.8.patch
> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the
source.  This would be helpful in scenarios, where there is limited resources (or concurrent
jobs running or multiple waves) with dataskew and the task which gets large amount of data
gets sceheduled much later.
> e.g Consider the following hive query running in a queue with limited capacity (42 slots
in total) @ 200 GB scale
> {noformat}
>            WHEN ss_sold_time_sk IS NULL THEN 70429
>            ELSE ss_sold_time_sk
>        END AS ss_sold_time_sk,
>        ss_item_sk,
>        ss_customer_sk,
>        ss_cdemo_sk,
>        ss_hdemo_sk,
>        ss_addr_sk,
>        ss_store_sk,
>        ss_promo_sk,
>        ss_ticket_number,
>        ss_quantity,
>        ss_wholesale_cost,
>        ss_list_price,
>        ss_sales_price,
>        ss_ext_discount_amt,
>        ss_ext_sales_price,
>        ss_ext_wholesale_cost,
>        ss_ext_list_price,
>        ss_ext_tax,
>        ss_coupon_amt,
>        ss_net_paid,
>        ss_net_paid_inc_tax,
>        ss_net_profit,
>        ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of nulls are
there for ss_sold_time_sk, it would tend to have data skew towards 70429.  If the reducer
which gets this data gets scheduled much earlier (i.e in first wave itself), entire job would
finish fast.

This message was sent by Atlassian JIRA

View raw message