Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 71090 invoked from network); 29 Apr 2009 04:11:16 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 29 Apr 2009 04:11:16 -0000 Received: (qmail 98227 invoked by uid 500); 29 Apr 2009 04:11:16 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 98144 invoked by uid 500); 29 Apr 2009 04:11:15 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 98135 invoked by uid 99); 29 Apr 2009 04:11:15 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2009 04:11:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2009 04:11:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 66B9723889D7; Wed, 29 Apr 2009 04:10:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r769643 - in /hadoop/core/trunk: CHANGES.txt src/test/org/apache/hadoop/mapred/TestReduceFetch.java Date: Wed, 29 Apr 2009 04:10:50 -0000 To: core-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090429041051.66B9723889D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Wed Apr 29 04:10:49 2009 New Revision: 769643 URL: http://svn.apache.org/viewvc?rev=769643&view=rev Log: HADOOP-5657. Validate data in TestReduceFetch to improve merge test coverage. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=769643&r1=769642&r2=769643&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Wed Apr 29 04:10:49 2009 @@ -265,6 +265,9 @@ HADOOP-5734. Correct block placement policy description in HDFS Design document. (Konstantin Boudnik via shv) + HADOOP-5657. Validate data in TestReduceFetch to improve merge test + coverage. (cdouglas) + OPTIMIZATIONS HADOOP-5595. NameNode does not need to run a replicator to choose a Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=769643&r1=769642&r2=769643&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Wed Apr 29 04:10:49 2009 @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Formatter; +import java.util.Iterator; import junit.framework.Test; import junit.framework.TestCase; @@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.TestMapCollection.FakeIF; import org.apache.hadoop.mapred.TestMapCollection.FakeSplit; import org.apache.hadoop.mapred.lib.IdentityReducer; @@ -56,34 +59,169 @@ return setup; } - public static class MapMB - implements Mapper { + private static final String tagfmt = "%04d"; + private static final String keyfmt = "KEYKEYKEYKEYKEYKEYKE"; + private static final int keylen = keyfmt.length(); + private static int getValLen(int id, int nMaps) { + return 4096 / nMaps * (id + 1); + } + + /** + * Emit 4096 small keys, 2 "tagged" keys. Emits a fixed amount of + * data so the in-memory fetch semantics can be tested. + */ + public static class MapMB implements + Mapper { + + private int id; + private int nMaps; + private final Text key = new Text(); + private final Text val = new Text(); + private final byte[] b = new byte[4096]; + private final Formatter fmt = new Formatter(new StringBuilder(25)); + + @Override + public void configure(JobConf conf) { + nMaps = conf.getNumMapTasks(); + id = nMaps - conf.getInt("mapred.task.partition", -1) - 1; + Arrays.fill(b, 0, 4096, (byte)'V'); + ((StringBuilder)fmt.out()).append(keyfmt); + } + + @Override public void map(NullWritable nk, NullWritable nv, OutputCollector output, Reporter reporter) throws IOException { - Text key = new Text(); - Text val = new Text(); - key.set("KEYKEYKEYKEYKEYKEYKEYKEY"); - byte[] b = new byte[1000]; - Arrays.fill(b, (byte)'V'); - val.set(b); - b = null; - for (int i = 0; i < 4 * 1024; ++i) { + // Emit 4096 fixed-size records + val.set(b, 0, 1000); + val.getBytes()[0] = (byte) id; + for (int i = 0; i < 4096; ++i) { + key.set(fmt.format(tagfmt, i).toString()); output.collect(key, val); + ((StringBuilder)fmt.out()).setLength(keylen); } + + // Emit two "tagged" records from the map. To validate the merge, segments + // should have both a small and large record such that reading a large + // record from an on-disk segment into an in-memory segment will write + // over the beginning of a record in the in-memory segment, causing the + // merge and/or validation to fail. + + // Add small, tagged record + val.set(b, 0, getValLen(id, nMaps) - 128); + val.getBytes()[0] = (byte) id; + ((StringBuilder)fmt.out()).setLength(keylen); + key.set("A" + fmt.format(tagfmt, id).toString()); + output.collect(key, val); + // Add large, tagged record + val.set(b, 0, getValLen(id, nMaps)); + val.getBytes()[0] = (byte) id; + ((StringBuilder)fmt.out()).setLength(keylen); + key.set("B" + fmt.format(tagfmt, id).toString()); + output.collect(key, val); } - public void configure(JobConf conf) { } + + @Override public void close() throws IOException { } } + /** + * Confirm that each small key is emitted once by all maps, each tagged key + * is emitted by only one map, all IDs are consistent with record data, and + * all non-ID record data is consistent. + */ + public static class MBValidate + implements Reducer { + + private static int nMaps; + private static final Text vb = new Text(); + static { + byte[] v = new byte[4096]; + Arrays.fill(v, (byte)'V'); + vb.set(v); + } + + private int nRec = 0; + private int nKey = -1; + private int aKey = -1; + private int bKey = -1; + private final Text kb = new Text(); + private final Formatter fmt = new Formatter(new StringBuilder(25)); + + @Override + public void configure(JobConf conf) { + nMaps = conf.getNumMapTasks(); + ((StringBuilder)fmt.out()).append(keyfmt); + } + + @Override + public void reduce(Text key, Iterator values, + OutputCollector out, Reporter reporter) + throws IOException { + int vc = 0; + final int vlen; + final int preRec = nRec; + final int vcCheck, recCheck; + ((StringBuilder)fmt.out()).setLength(keylen); + if (25 == key.getLength()) { + // tagged record + recCheck = 1; // expect only 1 record + switch ((char)key.getBytes()[0]) { + case 'A': + vlen = getValLen(++aKey, nMaps) - 128; + vcCheck = aKey; // expect eq id + break; + case 'B': + vlen = getValLen(++bKey, nMaps); + vcCheck = bKey; // expect eq id + break; + default: + vlen = vcCheck = -1; + fail("Unexpected tag on record: " + ((char)key.getBytes()[24])); + } + kb.set((char)key.getBytes()[0] + fmt.format(tagfmt,vcCheck).toString()); + } else { + kb.set(fmt.format(tagfmt, ++nKey).toString()); + vlen = 1000; + recCheck = nMaps; // expect 1 rec per map + vcCheck = (nMaps * (nMaps - 1)) >>> 1; // expect eq sum(id) + } + assertEquals(kb, key); + while (values.hasNext()) { + final Text val = values.next(); + // increment vc by map ID assoc w/ val + vc += val.getBytes()[0]; + // verify that all the fixed characters 'V' match + assertEquals(0, WritableComparator.compareBytes( + vb.getBytes(), 1, vlen - 1, + val.getBytes(), 1, val.getLength() - 1)); + out.collect(key, val); + ++nRec; + } + assertEquals("Bad rec count for " + key, recCheck, nRec - preRec); + assertEquals("Bad rec group for " + key, vcCheck, vc); + } + + @Override + public void close() throws IOException { + assertEquals(4095, nKey); + assertEquals(nMaps - 1, aKey); + assertEquals(nMaps - 1, bKey); + assertEquals("Bad record count", nMaps * (4096 + 2), nRec); + } + } + public static Counters runJob(JobConf conf) throws Exception { conf.setMapperClass(MapMB.class); - conf.setReducerClass(IdentityReducer.class); + conf.setReducerClass(MBValidate.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setNumReduceTasks(1); conf.setInputFormat(FakeIF.class); + conf.setNumTasksToExecutePerJvm(1); + conf.setInt("mapred.map.max.attempts", 0); + conf.setInt("mapred.reduce.max.attempts", 0); FileInputFormat.setInputPaths(conf, new Path("/in")); final Path outp = new Path("/out"); FileOutputFormat.setOutputPath(conf, outp); @@ -100,10 +238,15 @@ return job.getCounters(); } + /** Verify that all segments are read from disk */ public void testReduceFromDisk() throws Exception { JobConf job = mrCluster.createJobConf(); job.set("mapred.job.reduce.input.buffer.percent", "0.0"); - job.setNumMapTasks(3); + job.setNumMapTasks(8); + job.set("mapred.child.java.opts", "-Xmx128m"); + job.set("mapred.job.shuffle.input.buffer.percent", "0.14"); + job.setInt("io.sort.factor", 2); + job.setInt("mapred.inmem.merge.threshold", 4); Counters c = runJob(job); final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, Task.getFileSystemCounterNames("hdfs")[1]).getCounter(); @@ -114,6 +257,7 @@ hdfsWritten <= localRead); } + /** Verify that at least one segment does not hit disk */ public void testReduceFromPartialMem() throws Exception { JobConf job = mrCluster.createJobConf(); job.setNumMapTasks(5); @@ -123,7 +267,6 @@ job.setInt("io.sort.mb", 10); job.set("mapred.child.java.opts", "-Xmx128m"); job.set("mapred.job.shuffle.input.buffer.percent", "0.14"); - job.setNumTasksToExecutePerJvm(1); job.set("mapred.job.shuffle.merge.percent", "1.0"); Counters c = runJob(job); final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, @@ -135,6 +278,7 @@ hdfsWritten >= localRead + 1024 * 1024); } + /** Verify that no segment hits disk. */ public void testReduceFromMem() throws Exception { JobConf job = mrCluster.createJobConf(); job.set("mapred.job.reduce.input.buffer.percent", "1.0");