hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r789558 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
Date Tue, 30 Jun 2009 04:50:39 GMT
Author: cdouglas
Date: Tue Jun 30 04:50:38 2009
New Revision: 789558

URL: http://svn.apache.org/viewvc?rev=789558&view=rev
Log:
MAPREDUCE-433. Use more reliable counters in TestReduceFetch.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=789558&r1=789557&r2=789558&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jun 30 04:50:38 2009
@@ -67,3 +67,5 @@
 
     MAPREDUCE-671. Update ignore list to include untracked, generated
     build artifacts and config files. (cdouglas)
+
+    MAPREDUCE-433. Use more reliable counters in TestReduceFetch. (cdouglas)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=789558&r1=789557&r2=789558&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jun 30 04:50:38
2009
@@ -1004,9 +1004,10 @@
           throw new IOException("mapred.job.shuffle.input.buffer.percent" +
                                 maxInMemCopyUse);
         }
-        maxSize = (int)Math.min(
-            Runtime.getRuntime().maxMemory() * maxInMemCopyUse,
-            Integer.MAX_VALUE);
+        // Allow unit tests to fix Runtime memory
+        maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
+            (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
+          * maxInMemCopyUse);
         maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
         LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + 
                  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java?rev=789558&r1=789557&r2=789558&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Tue
Jun 30 04:50:38 2009
@@ -38,6 +38,7 @@
 import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
 import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.TaskCounter;
 
 public class TestReduceFetch extends TestCase {
 
@@ -240,53 +241,56 @@
 
   /** Verify that all segments are read from disk */
   public void testReduceFromDisk() throws Exception {
+    final int MAP_TASKS = 8;
     JobConf job = mrCluster.createJobConf();
     job.set("mapred.job.reduce.input.buffer.percent", "0.0");
-    job.setNumMapTasks(8);
+    job.setNumMapTasks(MAP_TASKS);
     job.set("mapred.child.java.opts", "-Xmx128m");
+    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
     job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
     job.setInt("io.sort.factor", 2);
     job.setInt("mapred.inmem.merge.threshold", 4);
     Counters c = runJob(job);
-    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
-        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
-    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
-        Task.getFileSystemCounterNames("file")[0]).getCounter();
-    assertTrue("Expected more bytes read from local (" +
-        localRead + ") than written to HDFS (" + hdfsWritten + ")",
-        hdfsWritten <= localRead);
+    final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
+    final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
+    assertTrue("Expected all records spilled during reduce (" + spill + ")",
+        spill >= 2 * out); // all records spill at map, reduce
+    assertTrue("Expected intermediate merges (" + spill + ")",
+        spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
   }
 
   /** Verify that at least one segment does not hit disk */
   public void testReduceFromPartialMem() throws Exception {
+    final int MAP_TASKS = 5;
     JobConf job = mrCluster.createJobConf();
-    job.setNumMapTasks(5);
+    job.setNumMapTasks(MAP_TASKS);
     job.setInt("mapred.inmem.merge.threshold", 0);
     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
     job.setInt("mapred.reduce.parallel.copies", 1);
     job.setInt("io.sort.mb", 10);
     job.set("mapred.child.java.opts", "-Xmx128m");
+    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
     job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
     job.set("mapred.job.shuffle.merge.percent", "1.0");
     Counters c = runJob(job);
-    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
-        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
-    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
-        Task.getFileSystemCounterNames("file")[0]).getCounter();
-    assertTrue("Expected at least 1MB fewer bytes read from local (" +
-        localRead + ") than written to HDFS (" + hdfsWritten + ")",
-        hdfsWritten >= localRead + 1024 * 1024);
+    final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
+    final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
+    assertTrue("Expected some records not spilled during reduce" + spill + ")",
+        spill < 2 * out); // spilled map records, some records at the reduce
   }
 
   /** Verify that no segment hits disk. */
   public void testReduceFromMem() throws Exception {
+    final int MAP_TASKS = 3;
     JobConf job = mrCluster.createJobConf();
     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
-    job.setNumMapTasks(3);
+    job.set("mapred.job.shuffle.input.buffer.percent", "1.0");
+    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
+    job.setNumMapTasks(MAP_TASKS);
     Counters c = runJob(job);
-    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
-        Task.getFileSystemCounterNames("file")[0]).getCounter();
-    assertTrue("Non-zero read from local: " + localRead, localRead == 0);
+    final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
+    final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
+    assertEquals("Spilled records: " + spill, out, spill); // no reduce spill
   }
 
 }



Mime
View raw message