tez-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rajesh Balamohan (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
Date Mon, 06 Jul 2015 05:30:04 GMT

     [ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Rajesh Balamohan updated TEZ-2496:
----------------------------------
    Attachment: TEZ-2496.5.patch


Addressing review comments. Attaching the patch and comments here itself, as it might be easier
to look at all comments/history in JIRA.

>> Shift operator is more efficient
- Won't the runtime take care of this automatically?

>> Considering using a bitmap library like RoaringBitmaps for storage within the AM.
That depends on whether we store this at task level or not. 
- Using RoaringBitMaps for storing the partition stats. It reduced the memory pressure to
80-100 MB for 10K x 10 K job. However, it might be redundunt to store partition stats after
merging with vertex stats. Such transient stats can be cleared once it is merged with Vertex
stats (plz note that the rest of details in the task stats remain intact). Advantage is that,
the memory footprint reduces to negligible amount. Drawback is that, when a task gets rescheduled
stats are cleared.  In such cases, it might not be able to re-populate these transient data.
It should fine, as users can always check via hasPartitionStats() to check whether stats are
present or not. For example, If stats are not present, ShuffleVertexManager would schedule
tasks as it was doing earlier.  Based on the current patch (which clears off the transient
data like partition stats from taskStats), the memory usage is neglible as only vertex level
stats are maintained for part-stats. For e.g, used memory which is the memory after GC remains
in-tact with/without patch. For long running vertices, the task stats would be maintained
until tasks are complete, which might be acceptable as memory is further is reduced by roaringbitmaps.

{noformat}
With Patch:
##### Heap utilization statistics [MB] for testIOStatisticsMemory
##### Used Memory:93
##### Free Memory:573
##### Total Memory:666
##### Max Memory:3641

Without Patch:
##### Heap utilization statistics [MB] for testIOStatisticsMemory
##### Used Memory:93
##### Free Memory:188
##### Total Memory:282
##### Max Memory:3641
{noformat}


>> Is this bit required (index) ?
- No, removed it. Just storing the stats related bits now.

>> Given that we're no longer gaining from individual bits representing ranges (earlier
via a simple bitwise operation) - packing the bits to represent ranges shold be possible.
- Changed the logic with the new patch with RoaringBitMaps.

>> ShuffleVertexManager. Reading this from configuration implies the user will have
to setup this parameter for the configuration on the VM, and all edges that this VM is connected
to.
- Removed the config from ShuffleVertexManager. OutputStatistics provides hasPartitionStats()
with which, it should be possible to check whether stats are present or not. Partition stats
are taken into account based on this information. 

>> ExternalSorter. Nit: initialize only if per-partition stats reporting is enabled
?
- Fixed.

>> ExternalSoter. Report this (total data size) anyway ? Considering the other API is
lossy.
- Fixed. 

>> OutputStatisticsReporter exposes public void reportDataSize(long[] sizes);. OUtputStatistics
exposes  public DATA_RANGE_IN_MB getDataSize(int destIndex);. I think it'll be cleaner to
use either a size on both APIs, or a range.
- Fixed. Reporting long in both cases.

> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the
source
> ----------------------------------------------------------------------------------------------
>
>                 Key: TEZ-2496
>                 URL: https://issues.apache.org/jira/browse/TEZ-2496
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, TEZ-2496.4.patch,
TEZ-2496.5.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the
source.  This would be helpful in scenarios, where there is limited resources (or concurrent
jobs running or multiple waves) with dataskew and the task which gets large amount of data
gets sceheduled much later.
> e.g Consider the following hive query running in a queue with limited capacity (42 slots
in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>            WHEN ss_sold_time_sk IS NULL THEN 70429
>            ELSE ss_sold_time_sk
>        END AS ss_sold_time_sk,
>        ss_item_sk,
>        ss_customer_sk,
>        ss_cdemo_sk,
>        ss_hdemo_sk,
>        ss_addr_sk,
>        ss_store_sk,
>        ss_promo_sk,
>        ss_ticket_number,
>        ss_quantity,
>        ss_wholesale_cost,
>        ss_list_price,
>        ss_sales_price,
>        ss_ext_discount_amt,
>        ss_ext_sales_price,
>        ss_ext_wholesale_cost,
>        ss_ext_list_price,
>        ss_ext_tax,
>        ss_coupon_amt,
>        ss_net_paid,
>        ss_net_paid_inc_tax,
>        ss_net_profit,
>        ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of nulls are
there for ss_sold_time_sk, it would tend to have data skew towards 70429.  If the reducer
which gets this data gets scheduled much earlier (i.e in first wave itself), entire job would
finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message