Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 92258 invoked from network); 4 Mar 2011 03:51:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:51:48 -0000 Received: (qmail 23879 invoked by uid 500); 4 Mar 2011 03:51:47 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 23853 invoked by uid 500); 4 Mar 2011 03:51:47 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 23844 invoked by uid 99); 4 Mar 2011 03:51:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:51:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:51:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8C5062388C1F; Fri, 4 Mar 2011 03:51:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077202 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ReduceTask.java test/org/apache/hadoop/mapred/TestReduceFetch.java Date: Fri, 04 Mar 2011 03:51:26 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304035126.8C5062388C1F@eris.apache.org> Author: omalley Date: Fri Mar 4 03:51:26 2011 New Revision: 1077202 URL: http://svn.apache.org/viewvc?rev=1077202&view=rev Log: commit 2d65ac5b02f05d4a3cc00fcc488835d43cc3fed0 Author: Chris Douglas Date: Mon Feb 22 22:32:40 2010 -0800 MAPREDUCE:433 from https://issues.apache.org/jira/secure/attachment/12436678/M433-1y20.patch Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077202&r1=1077201&r2=1077202&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 4 03:51:26 2011 @@ -1011,9 +1011,10 @@ class ReduceTask extends Task { throw new IOException("mapred.job.shuffle.input.buffer.percent" + maxInMemCopyUse); } - maxSize = (long)Math.min( - Runtime.getRuntime().maxMemory() * maxInMemCopyUse, - Integer.MAX_VALUE); + // Allow unit tests to fix Runtime memory + maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes", + (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) + * maxInMemCopyUse); maxSingleShuffleLimit = (long)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=1077202&r1=1077201&r2=1077202&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Fri Mar 4 03:51:26 2011 @@ -101,48 +101,53 @@ public class TestReduceFetch extends Tes } public void testReduceFromDisk() throws Exception { + final int MAP_TASKS = 8; JobConf job = mrCluster.createJobConf(); job.set("mapred.job.reduce.input.buffer.percent", "0.0"); - job.setNumMapTasks(3); + job.setNumMapTasks(MAP_TASKS); + job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20); + job.set("mapred.job.shuffle.input.buffer.percent", "0.05"); + job.setInt("io.sort.factor", 2); + job.setInt("mapred.inmem.merge.threshold", 4); Counters c = runJob(job); - final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("hdfs")[1]).getCounter(); - final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("file")[0]).getCounter(); - assertTrue("Expected more bytes read from local (" + - localRead + ") than written to HDFS (" + hdfsWritten + ")", - hdfsWritten <= localRead); + final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter(); + final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter(); + assertTrue("Expected all records spilled during reduce (" + spill + ")", + spill >= 2 * out); // all records spill at map, reduce + assertTrue("Expected intermediate merges (" + spill + ")", + spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice } public void testReduceFromPartialMem() throws Exception { + final int MAP_TASKS = 7; JobConf job = mrCluster.createJobConf(); - job.setNumMapTasks(5); + job.setNumMapTasks(MAP_TASKS); job.setInt("mapred.inmem.merge.threshold", 0); job.set("mapred.job.reduce.input.buffer.percent", "1.0"); job.setInt("mapred.reduce.parallel.copies", 1); job.setInt("io.sort.mb", 10); - job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m"); + job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20); job.set("mapred.job.shuffle.input.buffer.percent", "0.14"); job.setNumTasksToExecutePerJvm(1); job.set("mapred.job.shuffle.merge.percent", "1.0"); Counters c = runJob(job); - final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("hdfs")[1]).getCounter(); - final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("file")[0]).getCounter(); - assertTrue("Expected at least 1MB fewer bytes read from local (" + - localRead + ") than written to HDFS (" + hdfsWritten + ")", - hdfsWritten >= localRead + 1024 * 1024); + final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter(); + final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter(); + assertTrue("Expected some records not spilled during reduce" + spill + ")", + spill < 2 * out); // spilled map records, some records at the reduce } public void testReduceFromMem() throws Exception { + final int MAP_TASKS = 3; JobConf job = mrCluster.createJobConf(); job.set("mapred.job.reduce.input.buffer.percent", "1.0"); - job.setNumMapTasks(3); + job.set("mapred.job.shuffle.input.buffer.percent", "1.0"); + job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20); + job.setNumMapTasks(MAP_TASKS); Counters c = runJob(job); - final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("file")[0]).getCounter(); - assertTrue("Non-zero read from local: " + localRead, localRead == 0); + final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter(); + final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter(); + assertEquals("Spilled records: " + spill, out, spill); // no reduce spill } }