Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 83949 invoked from network); 27 Aug 2008 06:40:17 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 27 Aug 2008 06:40:17 -0000 Received: (qmail 15791 invoked by uid 500); 27 Aug 2008 06:40:15 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 15771 invoked by uid 500); 27 Aug 2008 06:40:15 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 15762 invoked by uid 99); 27 Aug 2008 06:40:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Aug 2008 23:40:15 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Aug 2008 06:39:23 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8A7EE23889C0; Tue, 26 Aug 2008 23:39:23 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r689380 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Date: Wed, 27 Aug 2008 06:39:23 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080827063923.8A7EE23889C0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Tue Aug 26 23:39:22 2008 New Revision: 689380 URL: http://svn.apache.org/viewvc?rev=689380&view=rev Log: HADOOP-3961. Fix task disk space requirement estimates for virtual input jobs. Delays limiting task placement until after 10% of the maps have finished. (Ari Rabkin via omalley) Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=689380&r1=689379&r2=689380&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue Aug 26 23:39:22 2008 @@ -388,6 +388,10 @@ HADOOP-4030. Remove lzop from the default list of codecs. (Arun Murthy via cdouglas) + HADOOP-3961. Fix task disk space requirement estimates for virtual + input jobs. Delays limiting task placement until after 10% of the maps + have finished. (Ari Rabkin via omalley) + Release 0.18.1 - Unreleased BUG FIXES Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=689380&r1=689379&r2=689380&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Tue Aug 26 23:39:22 2008 @@ -37,23 +37,23 @@ /** - * Estimated ratio of output to input size for map tasks. + * Estimated ratio of output to (input size+1) for map tasks. */ private double mapBlowupRatio; + + /** + * How much relative weight to put on the current estimate. + * Each completed map has unit weight. + */ private double estimateWeight; - private JobInProgress job; - - //guess a factor of two blowup due to temp space for merge - public static final double INITIAL_BLOWUP_GUESS = 1; - - //initial estimate is weighted as much as this fraction of the real datapoints - static final double INITIAL_EST_WEIGHT_PERCENT = 0.05; - + final private JobInProgress job; + final private int threshholdToUse; public ResourceEstimator(JobInProgress job) { - mapBlowupRatio = INITIAL_BLOWUP_GUESS; this.job = job; - estimateWeight = INITIAL_EST_WEIGHT_PERCENT * job.desiredMaps(); + threshholdToUse = job.desiredMaps()/ 10; + mapBlowupRatio = 0; + estimateWeight = 1; } @@ -69,45 +69,71 @@ mapBlowupRatio = b; } - - - public void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) { - + void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) { + //-1 indicates error, which we don't average in. if(tip.isMapTask() && ts.getOutputSize() != -1) { double blowupOnThisTask = ts.getOutputSize() / - (double) tip.getMapInputSize(); + ((double) tip.getMapInputSize() + 1); LOG.info("measured blowup on " + tip.getTIPId() + " was " + - ts.getOutputSize() + "/" +tip.getMapInputSize() + " = " + ts.getOutputSize() + "/" +(tip.getMapInputSize()+1) + " = " + blowupOnThisTask); - double newEstimate = blowupOnThisTask / estimateWeight + - ((estimateWeight - 1) / estimateWeight) * getBlowupRatio(); - estimateWeight++; + double newEstimate; + synchronized(this) { + newEstimate = blowupOnThisTask / estimateWeight + + ((estimateWeight - 1) / estimateWeight) * getBlowupRatio(); + estimateWeight++; + } setBlowupRatio(newEstimate); + + LOG.info("new estimate is blowup = " + newEstimate); } } /** - * + * @return estimated length of this job's total map output + */ + protected long getEstimatedTotalMapOutputSize() { + double estWeight; + synchronized(this) { + estWeight = this.estimateWeight; + } + + if(estWeight < threshholdToUse) { + return 0; + } else { + double blowup =getBlowupRatio(); + long inputSize = job.getInputLength() + job.desiredMaps(); + //add desiredMaps() so that randomwriter case doesn't blow up + long estimate = Math.round(inputSize * blowup * 2.0); + + LOG.debug("estimate total map output will be " + estimate + + " bytes. (blowup = 2*" + blowup + ")"); + return estimate; + } + } + + /** * @return estimated length of this job's average map output - * @throws IOException if the split's getLength() does. */ - public long getEstimatedMapOutputSize() { - double blowup =getBlowupRatio(); - long estimate = - (long) (job.getInputLength() * blowup / job.desiredMaps() * 2.0); - LOG.info("estimate map will take " + estimate + - " bytes. (blowup = 2*" + blowup + ")"); + long getEstimatedMapOutputSize() { + long estimate = getEstimatedTotalMapOutputSize() / job.desiredMaps(); return estimate; } - - //estimate that each reduce gets an equal share of total map output - public long getEstimatedReduceInputSize() { - return - getEstimatedMapOutputSize() * job.desiredMaps() / job.desiredReduces(); + /** + * + * @return estimated length of this job's average reduce input + */ + long getEstimatedReduceInputSize() { + if(job.desiredReduces() == 0) {//no reduce output, so no size + return 0; + } else { + return getEstimatedTotalMapOutputSize() / job.desiredReduces(); + //estimate that each reduce gets an equal share of total map output + } } Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=689380&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Tue Aug 26 23:39:22 2008 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import junit.framework.TestCase; +import org.apache.hadoop.mapred.JobClient.RawSplit; + +public class TestResourceEstimation extends TestCase { + + + public void testResourceEstimator() throws Exception { + final int maps = 100; + final int reduces = 2; + final int singleMapOutputSize = 1000; + JobConf jc = new JobConf(); + JobID jid = new JobID("testJT", 0); + jc.setNumMapTasks(maps); + jc.setNumReduceTasks(reduces); + + JobInProgress jip = new JobInProgress(jid, jc); + //unfortunately, we can't set job input size from here. + ResourceEstimator re = new ResourceEstimator(jip); + + for(int i = 0; i < maps / 10 -1; ++i) { + + long estOutSize = re.getEstimatedMapOutputSize(); + System.out.println(estOutSize); + assertEquals(0, estOutSize); + + TaskStatus ts = new MapTaskStatus(); + ts.setOutputSize(singleMapOutputSize); + RawSplit split = new RawSplit(); + split.setDataLength(0); + TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0); + re.updateWithCompletedTask(ts, tip); + } + + assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize()); + assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize()); + + } + +}