From mapreduce-commits-return-31-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Tue Jun 30 04:50:58 2009 Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 98592 invoked from network); 30 Jun 2009 04:50:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 30 Jun 2009 04:50:58 -0000 Received: (qmail 49848 invoked by uid 500); 30 Jun 2009 04:51:08 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 49821 invoked by uid 500); 30 Jun 2009 04:51:08 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 49811 invoked by uid 99); 30 Jun 2009 04:51:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2009 04:51:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2009 04:50:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 973982388891; Tue, 30 Jun 2009 04:50:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r789558 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Date: Tue, 30 Jun 2009 04:50:39 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090630045039.973982388891@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Tue Jun 30 04:50:38 2009 New Revision: 789558 URL: http://svn.apache.org/viewvc?rev=789558&view=rev Log: MAPREDUCE-433. Use more reliable counters in TestReduceFetch. Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=789558&r1=789557&r2=789558&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jun 30 04:50:38 2009 @@ -67,3 +67,5 @@ MAPREDUCE-671. Update ignore list to include untracked, generated build artifacts and config files. (cdouglas) + + MAPREDUCE-433. Use more reliable counters in TestReduceFetch. (cdouglas) Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=789558&r1=789557&r2=789558&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jun 30 04:50:38 2009 @@ -1004,9 +1004,10 @@ throw new IOException("mapred.job.shuffle.input.buffer.percent" + maxInMemCopyUse); } - maxSize = (int)Math.min( - Runtime.getRuntime().maxMemory() * maxInMemCopyUse, - Integer.MAX_VALUE); + // Allow unit tests to fix Runtime memory + maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes", + (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) + * maxInMemCopyUse); maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java?rev=789558&r1=789557&r2=789558&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Tue Jun 30 04:50:38 2009 @@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.TestMapCollection.FakeIF; import org.apache.hadoop.mapred.TestMapCollection.FakeSplit; import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.mapreduce.TaskCounter; public class TestReduceFetch extends TestCase { @@ -240,53 +241,56 @@ /** Verify that all segments are read from disk */ public void testReduceFromDisk() throws Exception { + final int MAP_TASKS = 8; JobConf job = mrCluster.createJobConf(); job.set("mapred.job.reduce.input.buffer.percent", "0.0"); - job.setNumMapTasks(8); + job.setNumMapTasks(MAP_TASKS); job.set("mapred.child.java.opts", "-Xmx128m"); + job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20); job.set("mapred.job.shuffle.input.buffer.percent", "0.14"); job.setInt("io.sort.factor", 2); job.setInt("mapred.inmem.merge.threshold", 4); Counters c = runJob(job); - final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("hdfs")[1]).getCounter(); - final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("file")[0]).getCounter(); - assertTrue("Expected more bytes read from local (" + - localRead + ") than written to HDFS (" + hdfsWritten + ")", - hdfsWritten <= localRead); + final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter(); + final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter(); + assertTrue("Expected all records spilled during reduce (" + spill + ")", + spill >= 2 * out); // all records spill at map, reduce + assertTrue("Expected intermediate merges (" + spill + ")", + spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice } /** Verify that at least one segment does not hit disk */ public void testReduceFromPartialMem() throws Exception { + final int MAP_TASKS = 5; JobConf job = mrCluster.createJobConf(); - job.setNumMapTasks(5); + job.setNumMapTasks(MAP_TASKS); job.setInt("mapred.inmem.merge.threshold", 0); job.set("mapred.job.reduce.input.buffer.percent", "1.0"); job.setInt("mapred.reduce.parallel.copies", 1); job.setInt("io.sort.mb", 10); job.set("mapred.child.java.opts", "-Xmx128m"); + job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20); job.set("mapred.job.shuffle.input.buffer.percent", "0.14"); job.set("mapred.job.shuffle.merge.percent", "1.0"); Counters c = runJob(job); - final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("hdfs")[1]).getCounter(); - final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("file")[0]).getCounter(); - assertTrue("Expected at least 1MB fewer bytes read from local (" + - localRead + ") than written to HDFS (" + hdfsWritten + ")", - hdfsWritten >= localRead + 1024 * 1024); + final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter(); + final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter(); + assertTrue("Expected some records not spilled during reduce" + spill + ")", + spill < 2 * out); // spilled map records, some records at the reduce } /** Verify that no segment hits disk. */ public void testReduceFromMem() throws Exception { + final int MAP_TASKS = 3; JobConf job = mrCluster.createJobConf(); job.set("mapred.job.reduce.input.buffer.percent", "1.0"); - job.setNumMapTasks(3); + job.set("mapred.job.shuffle.input.buffer.percent", "1.0"); + job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20); + job.setNumMapTasks(MAP_TASKS); Counters c = runJob(job); - final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, - Task.getFileSystemCounterNames("file")[0]).getCounter(); - assertTrue("Non-zero read from local: " + localRead, localRead == 0); + final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter(); + final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter(); + assertEquals("Spilled records: " + spill, out, spill); // no reduce spill } }