Return-Path: X-Original-To: apmail-tez-issues-archive@minotaur.apache.org Delivered-To: apmail-tez-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A721C10FD5 for ; Fri, 16 Aug 2013 07:47:22 +0000 (UTC) Received: (qmail 96671 invoked by uid 500); 16 Aug 2013 07:47:20 -0000 Delivered-To: apmail-tez-issues-archive@tez.apache.org Received: (qmail 96647 invoked by uid 500); 16 Aug 2013 07:47:18 -0000 Mailing-List: contact issues-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list issues@tez.incubator.apache.org Received: (qmail 96634 invoked by uid 99); 16 Aug 2013 07:47:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Aug 2013 07:47:17 +0000 X-ASF-Spam-Status: No, hits=-2002.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 16 Aug 2013 07:47:14 +0000 Received: (qmail 96474 invoked by uid 99); 16 Aug 2013 07:46:49 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Aug 2013 07:46:49 +0000 Date: Fri, 16 Aug 2013 07:46:49 +0000 (UTC) From: "Siddharth Seth (JIRA)" To: issues@tez.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (TEZ-338) Determine reduce task parallelism MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/TEZ-338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13742009#comment-13742009 ] Siddharth Seth commented on TEZ-338: ------------------------------------ bq. I reused the min fraction because that the maximum time we have to make up our mind. Any shorter gives us less accuracy and anything longer is not possible since the task would have already started running. My concern with this was that scheduling keys are required to be set correctly for partitioning to work - which can get confusing for users. What I was suggesting is setting a config for min-completed-tasks for repartitioning. This can be used to either override the scheduler slow start configuration, or avoid re-partitioning if the value is too low. Comes down to - is the sample set big enough to re-partition, for which there can be other heuristics as well. Separate jira if you don't want to add such a config rightnow. bq. Over-partitioning in the ancestor vertex is something that has to be set by the user. User code does partitioning and so Tez cannot understand or make changes to it. Partitioning, in case of MR, is currently determined by the number of tasks in the receiving vertex (taskOutDegree from the OutputSpecList). In the current form atleast, this could be achieved. Reasonable big change if we want to do this. bq. Yes. current impl re-distributes all partitions equally across the new set of tasks. It tries to not introduce artificial skews by distributing them unequally. Using "multiple" based logic ensures uniform distribution. We round of to a multiple and should work in most cases. Obviously it wont if the number of partitions is a prime number. I think I'm missing something here. It looks like re-partitioning can end up skipping certain partitions completely. If we require the original partitions to be a multiple of new partitions - we should enforce this - i.e. don't re-partition if not possible; otherwise we end up loosing data. As an example. InitialPartitions = 25, expectedOutputSize=700MB based on completed tasks - looks like we end up reducing parallelism to 8, with each getting 3 partitions. 1 partition is not handled by any task. bq. TezEngineContext does not know that shuffle is being used. OK, I got that completely wrong. TezEngineTaskContext can't be used. Configuration for the engine components is in a mess rightnow, and needs looking into. It just seems wrong to modify user-payload, which is MR conf, to set configuration for the ShuffleHandler which is a Tez Engine component. This could probably be solved when per-edge configuration is used, based on whatever the configuration mechanism is for Tez Engine components. However, I think TaskDependencyCompletionEvents would be a better way to configure this. User code will eventually generate this; for now partition ranges can be part of the event generated by the AM. That allows dynamic ranges as well, instead of a single conf for all tasks in the vertex. Thoughts ? bq. Currently ShuffleHandler does not serve multiple partitions in a single call. Looks like it serves multiple partitions per connection. The patch would end up using one connection per connection + partitionId if I understand it correctly. We could continue with a single connection per host with the current ShuffleHandler. Separate jira. Comments on the patch - TEZ_TASK_SHUFFLE_PARTITION_RANGE - if we continue to use this, belongs in TezJobConfig, which contains Engine specific parameters. - {code}+ if(multiple <= 1) { + // desired is fairly close to current. No point changing. + return; + } {code} With a reasonable number of tasks, this check may be too limiting. e.g. Reducing 19 to 10, or 600 to 301. Again, can be fixed later. - Does VertexImpl.setParallelism need to be synchronized on tasksSyncHandle, does the entire method need to be synchronized (also scheduleTasks) ? - May be worth adding per-partition counters at some point. Minor nits - There's a TODO about renaming to ShuffleVertexManager which is already fixed. - desiredTaskInputDataSize, minTaskParallelism are initialized with defaults which aren't read from TezConfiguration. Will always be over-written at construction time. Can lead to confusion while reading the code. - ImmediateStartVertexScheduler is missing an initialize call (Does nothing rightnow though) > Determine reduce task parallelism > --------------------------------- > > Key: TEZ-338 > URL: https://issues.apache.org/jira/browse/TEZ-338 > Project: Apache Tez > Issue Type: Sub-task > Reporter: Bikas Saha > Assignee: Bikas Saha > Labels: TEZ-0.2.0 > Attachments: TEZ-338.1.patch, TEZ-338.2.patch, TEZ-338.3.patch > > > Determine the parallelism of reduce tasks at runtime. This is important because its difficult to determine this accurately before the job actually runs due to unknown data reduction ratios in the intermediate stages. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira