tez-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Siddharth Seth (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (TEZ-338) Determine reduce task parallelism
Date Fri, 16 Aug 2013 07:46:49 GMT

    [ https://issues.apache.org/jira/browse/TEZ-338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13742009#comment-13742009

Siddharth Seth commented on TEZ-338:

bq. I reused the min fraction because that the maximum time we have to make up our mind. Any
shorter gives us less accuracy and anything longer is not possible since the task would have
already started running.
My concern with this was that scheduling keys are required to be set correctly for partitioning
to work - which can get confusing for users. What I was suggesting is setting a config for
min-completed-tasks for repartitioning. This can be used to either override the scheduler
slow start configuration, or avoid re-partitioning if the value is too low. Comes down to
- is the sample set big enough to re-partition, for which there can be other heuristics as
well. Separate jira if you don't want to add such a config rightnow.

bq. Over-partitioning in the ancestor vertex is something that has to be set by the user.
User code does partitioning and so Tez cannot understand or make changes to it.
Partitioning, in case of MR, is currently determined by the number of tasks in the receiving
vertex (taskOutDegree from the OutputSpecList). In the current form atleast, this could be
achieved. Reasonable big change if we want to do this.

bq. Yes. current impl re-distributes all partitions equally across the new set of tasks. It
tries to not introduce artificial skews by distributing them unequally. Using "multiple" based
logic ensures uniform distribution. We round of to a multiple and should work in most cases.
Obviously it wont if the number of partitions is a prime number.
I think I'm missing something here. It looks like re-partitioning can end up skipping certain
partitions completely. If we require the original partitions to be a multiple of new partitions
- we should enforce this - i.e. don't re-partition if not possible; otherwise we end up loosing
data. As an example. InitialPartitions = 25, expectedOutputSize=700MB based on completed tasks
- looks like we end up reducing parallelism to 8, with each getting 3 partitions. 1 partition
is not handled by any task.

bq. TezEngineContext does not know that shuffle is being used.
OK, I got that completely wrong. TezEngineTaskContext can't be used. Configuration for the
engine components is in a mess rightnow, and needs looking into. It just seems wrong to modify
user-payload, which is MR conf, to set configuration for the ShuffleHandler which is a Tez
Engine component. This could probably be solved when per-edge configuration is used, based
on whatever the configuration mechanism is for Tez Engine components.
However, I think TaskDependencyCompletionEvents would be a better way to configure this. User
code will eventually generate this; for now partition ranges can be part of the event generated
by the AM. That allows dynamic ranges as well, instead of a single conf for all tasks in the
vertex. Thoughts ?

bq. Currently ShuffleHandler does not serve multiple partitions in a single call.
Looks like it serves multiple partitions per connection. The patch would end up using one
connection per connection + partitionId if I understand it correctly. We could continue with
a single connection per host with the current ShuffleHandler. Separate jira.

Comments on the patch
- TEZ_TASK_SHUFFLE_PARTITION_RANGE - if we continue to use this, belongs in TezJobConfig,
which contains Engine specific parameters.
- {code}+    if(multiple <= 1) {
+      // desired is fairly close to current. No point changing.
+      return;
+    }
With a reasonable number of tasks, this check may be too limiting. e.g. Reducing 19 to 10,
or 600 to 301. Again, can be fixed later.
- Does VertexImpl.setParallelism need to be synchronized on tasksSyncHandle, does the entire
method need to be synchronized (also scheduleTasks) ?
- May be worth adding per-partition counters at some point.
Minor nits
- There's a TODO about renaming to ShuffleVertexManager which is already fixed.
- desiredTaskInputDataSize, minTaskParallelism are initialized with defaults which aren't
read from TezConfiguration. Will always be over-written at construction time. Can lead to
confusion while reading the code.
- ImmediateStartVertexScheduler is missing an initialize call (Does nothing rightnow though)

> Determine reduce task parallelism
> ---------------------------------
>                 Key: TEZ-338
>                 URL: https://issues.apache.org/jira/browse/TEZ-338
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Bikas Saha
>            Assignee: Bikas Saha
>              Labels: TEZ-0.2.0
>         Attachments: TEZ-338.1.patch, TEZ-338.2.patch, TEZ-338.3.patch
> Determine the parallelism of reduce tasks at runtime. This is important because its difficult
to determine this accurately before the job actually runs due to unknown data reduction ratios
in the intermediate stages.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

View raw message