hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r796674 - in /hadoop/mapreduce/trunk: CHANGES.txt src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
Date Wed, 22 Jul 2009 11:32:21 GMT
Author: cdouglas
Date: Wed Jul 22 11:32:21 2009
New Revision: 796674

URL: http://svn.apache.org/viewvc?rev=796674&view=rev
Log:
MAPREDUCE-785. Separate sub-test of TestReduceFetch to be included in
MR-670. Contributed by Jothi Padmanabhan

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
      - copied, changed from r796660, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796674&r1=796673&r2=796674&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 22 11:32:21 2009
@@ -129,6 +129,9 @@
     MAPREDUCE-772. Merge HADOOP-4010 changes to LineRecordReader into mapreduce
     package. (Abdul Qadeer via cdouglas)
 
+    MAPREDUCE-785. Separate sub-test of TestReduceFetch to be included in
+    MR-670. (Jothi Padmanabhan via cdouglas)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java?rev=796674&r1=796673&r2=796674&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Wed
Jul 22 11:32:21 2009
@@ -18,228 +18,18 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Formatter;
-import java.util.Iterator;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-import junit.extensions.TestSetup;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
-import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.TaskCounter;
 
-public class TestReduceFetch extends TestCase {
-
-  private static MiniMRCluster mrCluster = null;
-  private static MiniDFSCluster dfsCluster = null;
-  public static Test suite() {
-    TestSetup setup = new TestSetup(new TestSuite(TestReduceFetch.class)) {
-      protected void setUp() throws Exception {
-        Configuration conf = new Configuration();
-        dfsCluster = new MiniDFSCluster(conf, 2, true, null);
-        mrCluster = new MiniMRCluster(2,
-            dfsCluster.getFileSystem().getUri().toString(), 1);
-      }
-      protected void tearDown() throws Exception {
-        if (dfsCluster != null) { dfsCluster.shutdown(); }
-        if (mrCluster != null) { mrCluster.shutdown(); }
-      }
-    };
-    return setup;
-  }
-
-  private static final String tagfmt = "%04d";
-  private static final String keyfmt = "KEYKEYKEYKEYKEYKEYKE";
-  private static final int keylen = keyfmt.length();
+public class TestReduceFetch extends TestReduceFetchFromPartialMem {
 
-  private static int getValLen(int id, int nMaps) {
-    return 4096 / nMaps * (id + 1);
+  static {
+    setSuite(TestReduceFetch.class);
   }
 
   /**
-   * Emit 4096 small keys, 2 "tagged" keys. Emits a fixed amount of
-   * data so the in-memory fetch semantics can be tested.
+   * Verify that all segments are read from disk
+   * @throws Exception might be thrown
    */
-  public static class MapMB implements
-    Mapper<NullWritable,NullWritable,Text,Text> {
-
-    private int id;
-    private int nMaps;
-    private final Text key = new Text();
-    private final Text val = new Text();
-    private final byte[] b = new byte[4096];
-    private final Formatter fmt = new Formatter(new StringBuilder(25));
-
-    @Override
-    public void configure(JobConf conf) {
-      nMaps = conf.getNumMapTasks();
-      id = nMaps - conf.getInt("mapred.task.partition", -1) - 1;
-      Arrays.fill(b, 0, 4096, (byte)'V');
-      ((StringBuilder)fmt.out()).append(keyfmt);
-    }
-
-    @Override
-    public void map(NullWritable nk, NullWritable nv,
-        OutputCollector<Text, Text> output, Reporter reporter)
-        throws IOException {
-      // Emit 4096 fixed-size records
-      val.set(b, 0, 1000);
-      val.getBytes()[0] = (byte) id;
-      for (int i = 0; i < 4096; ++i) {
-        key.set(fmt.format(tagfmt, i).toString());
-        output.collect(key, val);
-        ((StringBuilder)fmt.out()).setLength(keylen);
-      }
-
-      // Emit two "tagged" records from the map. To validate the merge, segments
-      // should have both a small and large record such that reading a large
-      // record from an on-disk segment into an in-memory segment will write
-      // over the beginning of a record in the in-memory segment, causing the
-      // merge and/or validation to fail.
-
-      // Add small, tagged record
-      val.set(b, 0, getValLen(id, nMaps) - 128);
-      val.getBytes()[0] = (byte) id;
-      ((StringBuilder)fmt.out()).setLength(keylen);
-      key.set("A" + fmt.format(tagfmt, id).toString());
-      output.collect(key, val);
-      // Add large, tagged record
-      val.set(b, 0, getValLen(id, nMaps));
-      val.getBytes()[0] = (byte) id;
-      ((StringBuilder)fmt.out()).setLength(keylen);
-      key.set("B" + fmt.format(tagfmt, id).toString());
-      output.collect(key, val);
-    }
-
-    @Override
-    public void close() throws IOException { }
-  }
-
-  /**
-   * Confirm that each small key is emitted once by all maps, each tagged key
-   * is emitted by only one map, all IDs are consistent with record data, and
-   * all non-ID record data is consistent.
-   */
-  public static class MBValidate
-      implements Reducer<Text,Text,Text,Text> {
-
-    private static int nMaps;
-    private static final Text vb = new Text();
-    static {
-      byte[] v = new byte[4096];
-      Arrays.fill(v, (byte)'V');
-      vb.set(v);
-    }
-
-    private int nRec = 0;
-    private int nKey = -1;
-    private int aKey = -1;
-    private int bKey = -1;
-    private final Text kb = new Text();
-    private final Formatter fmt = new Formatter(new StringBuilder(25));
-
-    @Override
-    public void configure(JobConf conf) {
-      nMaps = conf.getNumMapTasks();
-      ((StringBuilder)fmt.out()).append(keyfmt);
-    }
-
-    @Override
-    public void reduce(Text key, Iterator<Text> values,
-        OutputCollector<Text,Text> out, Reporter reporter)
-        throws IOException {
-      int vc = 0;
-      final int vlen;
-      final int preRec = nRec;
-      final int vcCheck, recCheck;
-      ((StringBuilder)fmt.out()).setLength(keylen);
-      if (25 == key.getLength()) {
-        // tagged record
-        recCheck = 1;   // expect only 1 record
-        switch ((char)key.getBytes()[0]) {
-          case 'A':
-            vlen = getValLen(++aKey, nMaps) - 128;
-            vcCheck = aKey; // expect eq id
-            break;
-          case 'B':
-            vlen = getValLen(++bKey, nMaps);
-            vcCheck = bKey; // expect eq id
-            break;
-          default:
-            vlen = vcCheck = -1;
-            fail("Unexpected tag on record: " + ((char)key.getBytes()[24]));
-        }
-        kb.set((char)key.getBytes()[0] + fmt.format(tagfmt,vcCheck).toString());
-      } else {
-        kb.set(fmt.format(tagfmt, ++nKey).toString());
-        vlen = 1000;
-        recCheck = nMaps;                      // expect 1 rec per map
-        vcCheck = (nMaps * (nMaps - 1)) >>> 1; // expect eq sum(id)
-      }
-      assertEquals(kb, key);
-      while (values.hasNext()) {
-        final Text val = values.next();
-        // increment vc by map ID assoc w/ val
-        vc += val.getBytes()[0];
-        // verify that all the fixed characters 'V' match
-        assertEquals(0, WritableComparator.compareBytes(
-              vb.getBytes(), 1, vlen - 1,
-              val.getBytes(), 1, val.getLength() - 1));
-        out.collect(key, val);
-        ++nRec;
-      }
-      assertEquals("Bad rec count for " + key, recCheck, nRec - preRec);
-      assertEquals("Bad rec group for " + key, vcCheck, vc);
-    }
-
-    @Override
-    public void close() throws IOException {
-      assertEquals(4095, nKey);
-      assertEquals(nMaps - 1, aKey);
-      assertEquals(nMaps - 1, bKey);
-      assertEquals("Bad record count", nMaps * (4096 + 2), nRec);
-    }
-  }
-
-  public static Counters runJob(JobConf conf) throws Exception {
-    conf.setMapperClass(MapMB.class);
-    conf.setReducerClass(MBValidate.class);
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(Text.class);
-    conf.setNumReduceTasks(1);
-    conf.setInputFormat(FakeIF.class);
-    conf.setNumTasksToExecutePerJvm(1);
-    conf.setInt("mapred.map.max.attempts", 0);
-    conf.setInt("mapred.reduce.max.attempts", 0);
-    FileInputFormat.setInputPaths(conf, new Path("/in"));
-    final Path outp = new Path("/out");
-    FileOutputFormat.setOutputPath(conf, outp);
-    RunningJob job = null;
-    try {
-      job = JobClient.runJob(conf);
-      assertTrue(job.isSuccessful());
-    } finally {
-      FileSystem fs = dfsCluster.getFileSystem();
-      if (fs.exists(outp)) {
-        fs.delete(outp, true);
-      }
-    }
-    return job.getCounters();
-  }
-
-  /** Verify that all segments are read from disk */
   public void testReduceFromDisk() throws Exception {
     final int MAP_TASKS = 8;
     JobConf job = mrCluster.createJobConf();
@@ -259,27 +49,10 @@
         spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
   }
 
-  /** Verify that at least one segment does not hit disk */
-  public void testReduceFromPartialMem() throws Exception {
-    final int MAP_TASKS = 5;
-    JobConf job = mrCluster.createJobConf();
-    job.setNumMapTasks(MAP_TASKS);
-    job.setInt("mapred.inmem.merge.threshold", 0);
-    job.set("mapred.job.reduce.input.buffer.percent", "1.0");
-    job.setInt("mapred.reduce.parallel.copies", 1);
-    job.setInt("io.sort.mb", 10);
-    job.set("mapred.child.java.opts", "-Xmx128m");
-    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
-    job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
-    job.set("mapred.job.shuffle.merge.percent", "1.0");
-    Counters c = runJob(job);
-    final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
-    final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
-    assertTrue("Expected some records not spilled during reduce" + spill + ")",
-        spill < 2 * out); // spilled map records, some records at the reduce
-  }
-
-  /** Verify that no segment hits disk. */
+  /**
+   * Verify that no segment hits disk.
+   * @throws Exception might be thrown
+   */
   public void testReduceFromMem() throws Exception {
     final int MAP_TASKS = 3;
     JobConf job = mrCluster.createJobConf();
@@ -292,5 +65,4 @@
     final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
     assertEquals("Spilled records: " + spill, out, spill); // no reduce spill
   }
-
 }

Copied: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
(from r796660, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java?p2=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java&p1=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java&r1=796660&r2=796674&rev=796674&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
Wed Jul 22 11:32:21 2009
@@ -18,16 +18,10 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Formatter;
-import java.util.Iterator;
-
+import junit.extensions.TestSetup;
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
-import junit.extensions.TestSetup;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,16 +30,29 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
-import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.TaskCounter;
 
-public class TestReduceFetch extends TestCase {
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Formatter;
+import java.util.Iterator;
+
+public class TestReduceFetchFromPartialMem extends TestCase {
 
-  private static MiniMRCluster mrCluster = null;
-  private static MiniDFSCluster dfsCluster = null;
+  protected static MiniMRCluster mrCluster = null;
+  protected static MiniDFSCluster dfsCluster = null;
+  protected static TestSuite mySuite;
+
+  protected static void setSuite(Class<? extends TestCase> klass) {
+    mySuite  = new TestSuite(klass);
+  }
+
+  static {
+    setSuite(TestReduceFetchFromPartialMem.class);
+  }
+  
   public static Test suite() {
-    TestSetup setup = new TestSetup(new TestSuite(TestReduceFetch.class)) {
+    TestSetup setup = new TestSetup(mySuite) {
       protected void setUp() throws Exception {
         Configuration conf = new Configuration();
         dfsCluster = new MiniDFSCluster(conf, 2, true, null);
@@ -68,6 +75,26 @@
     return 4096 / nMaps * (id + 1);
   }
 
+  /** Verify that at least one segment does not hit disk */
+  public void testReduceFromPartialMem() throws Exception {
+    final int MAP_TASKS = 5;
+    JobConf job = mrCluster.createJobConf();
+    job.setNumMapTasks(MAP_TASKS);
+    job.setInt("mapred.inmem.merge.threshold", 0);
+    job.set("mapred.job.reduce.input.buffer.percent", "1.0");
+    job.setInt("mapred.reduce.parallel.copies", 1);
+    job.setInt("io.sort.mb", 10);
+    job.set("mapred.child.java.opts", "-Xmx128m");
+    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
+    job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
+    job.set("mapred.job.shuffle.merge.percent", "1.0");
+    Counters c = runJob(job);
+    final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
+    final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
+    assertTrue("Expected some records not spilled during reduce" + spill + ")",
+        spill < 2 * out); // spilled map records, some records at the reduce
+  }
+
   /**
    * Emit 4096 small keys, 2 &quot;tagged&quot; keys. Emits a fixed amount of
    * data so the in-memory fetch semantics can be tested.
@@ -239,58 +266,4 @@
     return job.getCounters();
   }
 
-  /** Verify that all segments are read from disk */
-  public void testReduceFromDisk() throws Exception {
-    final int MAP_TASKS = 8;
-    JobConf job = mrCluster.createJobConf();
-    job.set("mapred.job.reduce.input.buffer.percent", "0.0");
-    job.setNumMapTasks(MAP_TASKS);
-    job.set("mapred.child.java.opts", "-Xmx128m");
-    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
-    job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
-    job.setInt("io.sort.factor", 2);
-    job.setInt("mapred.inmem.merge.threshold", 4);
-    Counters c = runJob(job);
-    final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
-    final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
-    assertTrue("Expected all records spilled during reduce (" + spill + ")",
-        spill >= 2 * out); // all records spill at map, reduce
-    assertTrue("Expected intermediate merges (" + spill + ")",
-        spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
-  }
-
-  /** Verify that at least one segment does not hit disk */
-  public void testReduceFromPartialMem() throws Exception {
-    final int MAP_TASKS = 5;
-    JobConf job = mrCluster.createJobConf();
-    job.setNumMapTasks(MAP_TASKS);
-    job.setInt("mapred.inmem.merge.threshold", 0);
-    job.set("mapred.job.reduce.input.buffer.percent", "1.0");
-    job.setInt("mapred.reduce.parallel.copies", 1);
-    job.setInt("io.sort.mb", 10);
-    job.set("mapred.child.java.opts", "-Xmx128m");
-    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
-    job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
-    job.set("mapred.job.shuffle.merge.percent", "1.0");
-    Counters c = runJob(job);
-    final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
-    final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
-    assertTrue("Expected some records not spilled during reduce" + spill + ")",
-        spill < 2 * out); // spilled map records, some records at the reduce
-  }
-
-  /** Verify that no segment hits disk. */
-  public void testReduceFromMem() throws Exception {
-    final int MAP_TASKS = 3;
-    JobConf job = mrCluster.createJobConf();
-    job.set("mapred.job.reduce.input.buffer.percent", "1.0");
-    job.set("mapred.job.shuffle.input.buffer.percent", "1.0");
-    job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
-    job.setNumMapTasks(MAP_TASKS);
-    Counters c = runJob(job);
-    final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
-    final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
-    assertEquals("Spilled records: " + spill, out, spill); // no reduce spill
-  }
-
 }



Mime
View raw message