hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r769643 - in /hadoop/core/trunk: CHANGES.txt src/test/org/apache/hadoop/mapred/TestReduceFetch.java
Date Wed, 29 Apr 2009 04:10:50 GMT
Author: cdouglas
Date: Wed Apr 29 04:10:49 2009
New Revision: 769643

URL: http://svn.apache.org/viewvc?rev=769643&view=rev
Log:
HADOOP-5657. Validate data in TestReduceFetch to improve merge test coverage.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=769643&r1=769642&r2=769643&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 29 04:10:49 2009
@@ -265,6 +265,9 @@
     HADOOP-5734. Correct block placement policy description in HDFS
     Design document. (Konstantin Boudnik via shv)
 
+    HADOOP-5657. Validate data in TestReduceFetch to improve merge test
+    coverage. (cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=769643&r1=769642&r2=769643&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Wed Apr 29 04:10:49
2009
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Formatter;
+import java.util.Iterator;
 
 import junit.framework.Test;
 import junit.framework.TestCase;
@@ -32,6 +34,7 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
 import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -56,34 +59,169 @@
     return setup;
   }
 
-  public static class MapMB
-      implements Mapper<NullWritable,NullWritable,Text,Text> {
+  private static final String tagfmt = "%04d";
+  private static final String keyfmt = "KEYKEYKEYKEYKEYKEYKE";
+  private static final int keylen = keyfmt.length();
 
+  private static int getValLen(int id, int nMaps) {
+    return 4096 / nMaps * (id + 1);
+  }
+
+  /**
+   * Emit 4096 small keys, 2 &quot;tagged&quot; keys. Emits a fixed amount of
+   * data so the in-memory fetch semantics can be tested.
+   */
+  public static class MapMB implements
+    Mapper<NullWritable,NullWritable,Text,Text> {
+
+    private int id;
+    private int nMaps;
+    private final Text key = new Text();
+    private final Text val = new Text();
+    private final byte[] b = new byte[4096];
+    private final Formatter fmt = new Formatter(new StringBuilder(25));
+
+    @Override
+    public void configure(JobConf conf) {
+      nMaps = conf.getNumMapTasks();
+      id = nMaps - conf.getInt("mapred.task.partition", -1) - 1;
+      Arrays.fill(b, 0, 4096, (byte)'V');
+      ((StringBuilder)fmt.out()).append(keyfmt);
+    }
+
+    @Override
     public void map(NullWritable nk, NullWritable nv,
         OutputCollector<Text, Text> output, Reporter reporter)
         throws IOException {
-      Text key = new Text();
-      Text val = new Text();
-      key.set("KEYKEYKEYKEYKEYKEYKEYKEY");
-      byte[] b = new byte[1000];
-      Arrays.fill(b, (byte)'V');
-      val.set(b);
-      b = null;
-      for (int i = 0; i < 4 * 1024; ++i) {
+      // Emit 4096 fixed-size records
+      val.set(b, 0, 1000);
+      val.getBytes()[0] = (byte) id;
+      for (int i = 0; i < 4096; ++i) {
+        key.set(fmt.format(tagfmt, i).toString());
         output.collect(key, val);
+        ((StringBuilder)fmt.out()).setLength(keylen);
       }
+
+      // Emit two "tagged" records from the map. To validate the merge, segments
+      // should have both a small and large record such that reading a large
+      // record from an on-disk segment into an in-memory segment will write
+      // over the beginning of a record in the in-memory segment, causing the
+      // merge and/or validation to fail.
+
+      // Add small, tagged record
+      val.set(b, 0, getValLen(id, nMaps) - 128);
+      val.getBytes()[0] = (byte) id;
+      ((StringBuilder)fmt.out()).setLength(keylen);
+      key.set("A" + fmt.format(tagfmt, id).toString());
+      output.collect(key, val);
+      // Add large, tagged record
+      val.set(b, 0, getValLen(id, nMaps));
+      val.getBytes()[0] = (byte) id;
+      ((StringBuilder)fmt.out()).setLength(keylen);
+      key.set("B" + fmt.format(tagfmt, id).toString());
+      output.collect(key, val);
     }
-    public void configure(JobConf conf) { }
+
+    @Override
     public void close() throws IOException { }
   }
 
+  /**
+   * Confirm that each small key is emitted once by all maps, each tagged key
+   * is emitted by only one map, all IDs are consistent with record data, and
+   * all non-ID record data is consistent.
+   */
+  public static class MBValidate
+      implements Reducer<Text,Text,Text,Text> {
+
+    private static int nMaps;
+    private static final Text vb = new Text();
+    static {
+      byte[] v = new byte[4096];
+      Arrays.fill(v, (byte)'V');
+      vb.set(v);
+    }
+
+    private int nRec = 0;
+    private int nKey = -1;
+    private int aKey = -1;
+    private int bKey = -1;
+    private final Text kb = new Text();
+    private final Formatter fmt = new Formatter(new StringBuilder(25));
+
+    @Override
+    public void configure(JobConf conf) {
+      nMaps = conf.getNumMapTasks();
+      ((StringBuilder)fmt.out()).append(keyfmt);
+    }
+
+    @Override
+    public void reduce(Text key, Iterator<Text> values,
+        OutputCollector<Text,Text> out, Reporter reporter)
+        throws IOException {
+      int vc = 0;
+      final int vlen;
+      final int preRec = nRec;
+      final int vcCheck, recCheck;
+      ((StringBuilder)fmt.out()).setLength(keylen);
+      if (25 == key.getLength()) {
+        // tagged record
+        recCheck = 1;   // expect only 1 record
+        switch ((char)key.getBytes()[0]) {
+          case 'A':
+            vlen = getValLen(++aKey, nMaps) - 128;
+            vcCheck = aKey; // expect eq id
+            break;
+          case 'B':
+            vlen = getValLen(++bKey, nMaps);
+            vcCheck = bKey; // expect eq id
+            break;
+          default:
+            vlen = vcCheck = -1;
+            fail("Unexpected tag on record: " + ((char)key.getBytes()[24]));
+        }
+        kb.set((char)key.getBytes()[0] + fmt.format(tagfmt,vcCheck).toString());
+      } else {
+        kb.set(fmt.format(tagfmt, ++nKey).toString());
+        vlen = 1000;
+        recCheck = nMaps;                      // expect 1 rec per map
+        vcCheck = (nMaps * (nMaps - 1)) >>> 1; // expect eq sum(id)
+      }
+      assertEquals(kb, key);
+      while (values.hasNext()) {
+        final Text val = values.next();
+        // increment vc by map ID assoc w/ val
+        vc += val.getBytes()[0];
+        // verify that all the fixed characters 'V' match
+        assertEquals(0, WritableComparator.compareBytes(
+              vb.getBytes(), 1, vlen - 1,
+              val.getBytes(), 1, val.getLength() - 1));
+        out.collect(key, val);
+        ++nRec;
+      }
+      assertEquals("Bad rec count for " + key, recCheck, nRec - preRec);
+      assertEquals("Bad rec group for " + key, vcCheck, vc);
+    }
+
+    @Override
+    public void close() throws IOException {
+      assertEquals(4095, nKey);
+      assertEquals(nMaps - 1, aKey);
+      assertEquals(nMaps - 1, bKey);
+      assertEquals("Bad record count", nMaps * (4096 + 2), nRec);
+    }
+  }
+
   public static Counters runJob(JobConf conf) throws Exception {
     conf.setMapperClass(MapMB.class);
-    conf.setReducerClass(IdentityReducer.class);
+    conf.setReducerClass(MBValidate.class);
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(Text.class);
     conf.setNumReduceTasks(1);
     conf.setInputFormat(FakeIF.class);
+    conf.setNumTasksToExecutePerJvm(1);
+    conf.setInt("mapred.map.max.attempts", 0);
+    conf.setInt("mapred.reduce.max.attempts", 0);
     FileInputFormat.setInputPaths(conf, new Path("/in"));
     final Path outp = new Path("/out");
     FileOutputFormat.setOutputPath(conf, outp);
@@ -100,10 +238,15 @@
     return job.getCounters();
   }
 
+  /** Verify that all segments are read from disk */
   public void testReduceFromDisk() throws Exception {
     JobConf job = mrCluster.createJobConf();
     job.set("mapred.job.reduce.input.buffer.percent", "0.0");
-    job.setNumMapTasks(3);
+    job.setNumMapTasks(8);
+    job.set("mapred.child.java.opts", "-Xmx128m");
+    job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
+    job.setInt("io.sort.factor", 2);
+    job.setInt("mapred.inmem.merge.threshold", 4);
     Counters c = runJob(job);
     final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
         Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
@@ -114,6 +257,7 @@
         hdfsWritten <= localRead);
   }
 
+  /** Verify that at least one segment does not hit disk */
   public void testReduceFromPartialMem() throws Exception {
     JobConf job = mrCluster.createJobConf();
     job.setNumMapTasks(5);
@@ -123,7 +267,6 @@
     job.setInt("io.sort.mb", 10);
     job.set("mapred.child.java.opts", "-Xmx128m");
     job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
-    job.setNumTasksToExecutePerJvm(1);
     job.set("mapred.job.shuffle.merge.percent", "1.0");
     Counters c = runJob(job);
     final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
@@ -135,6 +278,7 @@
         hdfsWritten >= localRead + 1024 * 1024);
   }
 
+  /** Verify that no segment hits disk. */
   public void testReduceFromMem() throws Exception {
     JobConf job = mrCluster.createJobConf();
     job.set("mapred.job.reduce.input.buffer.percent", "1.0");



Mime
View raw message