tez-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bikas Saha (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
Date Mon, 06 Jul 2015 19:00:05 GMT

    [ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615484#comment-14615484
] 

Bikas Saha commented on TEZ-2496:
---------------------------------

IMO, the solution is risky and I am wary of adding it into the API for everyone to use because
its not clear that we have a viable long term solution. A single job with 50Kx20K vertices
would end up with 1GB or more in the bitmap. Or multiple large vertices running in parallel
would hold memory. Secondly, clearing the partition stats makes this susceptible to even 1
task failure. Clearing some stats but keeping others may lead of inconsistencies down the
road or affect other stats improvements.
Approximating large histograms is not a new problem and we may be able to find some known
work (like https://metamarkets.com/2013/histograms/) to create a more viable algorithmic solution.
However, we do need to start experimenting with what we can do with stats and so making progress
is also important. So here is my suggestion.
How about we dont make this part of the API or the internal stats flow for now? That keeps
the main engine scalable and we dont expose a WIP API. We can send the same bitmap information
in VertexManagerEvents from the output to the ShuffleVertexManager. We already do that to
send the total output size per task. We can enhance the payload to send the partition info
instead. This way we can send the necessary info and solve the scenario this jira is trying
to address. Here are the pros I see with this
1) Uses the existing event mechanism by enhancing its payload. We know this works e2e and
less scope for bugs.
2) Does not change the internal flow or API's.
3) Is localized to the ShuffleVertexManager (which needs this) and the feature can be turned
on/off based on config for large jobs if needed, API's cannot be turned on/off based on config.

Essentially, this lives in "user" land instead of "tez" land. Once we have experimented with
this and refined this, we can consider moving this from user land into tez land by adding
it to the API infra. Thoughts?

> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the
source
> ----------------------------------------------------------------------------------------------
>
>                 Key: TEZ-2496
>                 URL: https://issues.apache.org/jira/browse/TEZ-2496
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, TEZ-2496.4.patch,
TEZ-2496.5.patch, TEZ-2496.6.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the
source.  This would be helpful in scenarios, where there is limited resources (or concurrent
jobs running or multiple waves) with dataskew and the task which gets large amount of data
gets sceheduled much later.
> e.g Consider the following hive query running in a queue with limited capacity (42 slots
in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>            WHEN ss_sold_time_sk IS NULL THEN 70429
>            ELSE ss_sold_time_sk
>        END AS ss_sold_time_sk,
>        ss_item_sk,
>        ss_customer_sk,
>        ss_cdemo_sk,
>        ss_hdemo_sk,
>        ss_addr_sk,
>        ss_store_sk,
>        ss_promo_sk,
>        ss_ticket_number,
>        ss_quantity,
>        ss_wholesale_cost,
>        ss_list_price,
>        ss_sales_price,
>        ss_ext_discount_amt,
>        ss_ext_sales_price,
>        ss_ext_wholesale_cost,
>        ss_ext_list_price,
>        ss_ext_tax,
>        ss_coupon_amt,
>        ss_net_paid,
>        ss_net_paid_inc_tax,
>        ss_net_profit,
>        ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of nulls are
there for ss_sold_time_sk, it would tend to have data skew towards 70429.  If the reducer
which gets this data gets scheduled much earlier (i.e in first wave itself), entire job would
finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message