tez-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bikas Saha (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
Date Wed, 08 Jul 2015 00:21:04 GMT

    [ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617708#comment-14617708
] 

Bikas Saha commented on TEZ-2496:
---------------------------------

Looks good overall.

This does not have to be in the API package.
{code}diff --git tez-api/src/main/java/org/apache/tez/runtime/api/DATA_RANGE_IN_MB.java tez-api/src/main/java/org/apache/tez/runtime/api/DATA_RANGE_IN_MB.java
{code}

TEN? Also, looks like the constructor and member var are dead code?
{code}+public enum DATA_RANGE_IN_MB {
+  THOUSAND(1000), HUNDRED(100), TEZ(10), ONE(1), ZERO(0);{code}

Do we really need to do math.ceil()? There is probably a bit manipulation method to do this
cheaper.
{code}+  public static final DATA_RANGE_IN_MB getRange(long sizeInBytes) {
+    int sizeInMB = (int) Math.ceil(sizeInBytes / (1024 * 1024 * 1.0));{code}

Does runtime-internals need roaring bitmaps?
{code}diff --git tez-runtime-internals/pom.xml tez-runtime-internals/pom.xml
... +      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
{code}

Unnecessary diff
{code}diff --git tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
{code}

Why do the +1 here instead of in getBucket()? Spreading the bucket logic in 3 places - here
+ getBucket() + DATA_RANGE_MB is error prone. Perhaps replace all 3 with getBucket()?
{code}+    for (int i = 0; i < sizes.length; i++) {
+      int bucket = getBucket(sizes[i]) + 1;
{code}

No point having 2 vars that can be tracked as one? reportPartitionStats === reportPartitionStats()
{ return partitions != null}, right?
{code}+  protected OutputStatisticsReporter statsReporter;
+  protected final long[] partitionStats;{code}

Still needed?
{code}
     VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getVertexStatistics(any(String.class))).thenReturn(mock(VertexStatistics.class));{code}

Are there existing OrderedPartitionedOutput/PipeLinedSorter/ExternalSorter tests that can
be enhanced to verify that partition stats are being recorded? 

Assuming the ShuffleVertexManager code is the same as when I looked at it the last time. 

Not sure why the second if part of each of the if checks is useful? Any issues in simply over-writing
the new value?
{code}+        if ((totalStats > 0) && (taskInfo.outputStats != totalStats)) {
+          computedPartitionSizes = true;
+          taskInfo.outputStats = totalStats;
+        }
+      } else {
+        if ((stats[index] > 0) && (stats[index] != taskInfo.outputStats)) {
+          computedPartitionSizes = true;
+          taskInfo.outputStats = stats[index];{code}
If I understand this right, the code is trying to not sort based on this check. But could
this be done simply by whether we have received a new stats update event? And move the code
from computePartitionSizes()+sortPendingTasksBasedOnDataSize into parsePartitionStats()? Nothing
should change unless we have received new stats, right? So all stats dependent updates can
be made when we receive new stats.

Spurious change?
{code}-  @Test(timeout = 5000)
+  @Test(timeout = 5000000)
   public void testShuffleVertexManagerAutoParallelism() throws Exception {{code}

Why did this change?
{code}
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
-    Assert.assertTrue(scheduledTasks.size() == 3);
+    Assert.assertTrue(scheduledTasks.size() == 1);{code}

Can the shuffle vertex manager bucket calculation test be further enhanced to verify that
the first task to be scheduled is the largest task? that is the final intent of the jira right
:)

In a separate jira we need to track the bug that the vertex manager event is not resilient
to task retries because it does not provide that info. So the same task rerun would cause
double counting. Its an existing bug not introduced in this patch.

> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the
source
> ----------------------------------------------------------------------------------------------
>
>                 Key: TEZ-2496
>                 URL: https://issues.apache.org/jira/browse/TEZ-2496
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, TEZ-2496.4.patch,
TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, TEZ-2496.8.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the
source.  This would be helpful in scenarios, where there is limited resources (or concurrent
jobs running or multiple waves) with dataskew and the task which gets large amount of data
gets sceheduled much later.
> e.g Consider the following hive query running in a queue with limited capacity (42 slots
in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>            WHEN ss_sold_time_sk IS NULL THEN 70429
>            ELSE ss_sold_time_sk
>        END AS ss_sold_time_sk,
>        ss_item_sk,
>        ss_customer_sk,
>        ss_cdemo_sk,
>        ss_hdemo_sk,
>        ss_addr_sk,
>        ss_store_sk,
>        ss_promo_sk,
>        ss_ticket_number,
>        ss_quantity,
>        ss_wholesale_cost,
>        ss_list_price,
>        ss_sales_price,
>        ss_ext_discount_amt,
>        ss_ext_sales_price,
>        ss_ext_wholesale_cost,
>        ss_ext_list_price,
>        ss_ext_tax,
>        ss_coupon_amt,
>        ss_net_paid,
>        ss_net_paid_inc_tax,
>        ss_net_profit,
>        ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of nulls are
there for ss_sold_time_sk, it would tend to have data skew towards 70429.  If the reducer
which gets this data gets scheduled much earlier (i.e in first wave itself), entire job would
finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message