Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 75939 invoked from network); 14 Jul 2009 22:41:53 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 14 Jul 2009 22:41:53 -0000 Received: (qmail 41219 invoked by uid 500); 14 Jul 2009 22:42:03 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 41185 invoked by uid 500); 14 Jul 2009 22:42:03 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 41175 invoked by uid 99); 14 Jul 2009 22:42:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jul 2009 22:42:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jul 2009 22:41:52 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E6EDC23889E1; Tue, 14 Jul 2009 22:41:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r794101 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Date: Tue, 14 Jul 2009 22:41:30 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090714224130.E6EDC23889E1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Tue Jul 14 22:41:30 2009 New Revision: 794101 URL: http://svn.apache.org/viewvc?rev=794101&view=rev Log: MAPREDUCE-565. Fix partitioner to work with new API. Contributed by Owen O'Malley Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=794101&r1=794100&r2=794101&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jul 14 22:41:30 2009 @@ -194,3 +194,5 @@ MAPREDUCE-742. Fix output messages and java comments in the Pi related examples. (szetszwo) + MAPREDUCE-565. Fix partitioner to work with new API. (Owen O'Malley via + cdouglas) Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=794101&r1=794100&r2=794101&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Jul 14 22:41:30 2009 @@ -360,7 +360,7 @@ ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { - runner.run(in, collector, reporter); + runner.run(in, new OldOutputCollector(collector, conf), reporter); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); @@ -437,20 +437,80 @@ } } + /** + * Since the mapred and mapreduce Partitioners don't share a common interface + * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the + * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs, + * the configured partitioner should not be called. It's common for + * partitioners to compute a result mod numReduces, which causes a div0 error + */ + private static class OldOutputCollector implements OutputCollector { + private final Partitioner partitioner; + private final MapOutputCollector collector; + private final int numPartitions; + + @SuppressWarnings("unchecked") + OldOutputCollector(MapOutputCollector collector, JobConf conf) { + numPartitions = conf.getNumReduceTasks(); + if (numPartitions > 0) { + partitioner = (Partitioner) + ReflectionUtils.newInstance(conf.getPartitionerClass(), conf); + } else { + partitioner = new Partitioner() { + @Override + public void configure(JobConf job) { } + @Override + public int getPartition(K key, V value, int numPartitions) { + return -1; + } + }; + } + this.collector = collector; + } + + @Override + public void collect(K key, V value) throws IOException { + try { + collector.collect(key, value, + partitioner.getPartition(key, value, numPartitions)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("interrupt exception", ie); + } + } + } + private class NewOutputCollector extends org.apache.hadoop.mapreduce.RecordWriter { - private MapOutputCollector collector; + private final MapOutputCollector collector; + private final org.apache.hadoop.mapreduce.Partitioner partitioner; + private final int partitions; - NewOutputCollector(JobConf job, + @SuppressWarnings("unchecked") + NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, + JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector = new MapOutputBuffer(umbilical, job, reporter); + partitions = jobContext.getNumReduceTasks(); + if (partitions > 0) { + partitioner = (org.apache.hadoop.mapreduce.Partitioner) + ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); + } else { + partitioner = new org.apache.hadoop.mapreduce.Partitioner() { + @Override + public int getPartition(K key, V value, int numPartitions) { + return -1; + } + }; + } } @Override - public void write(K key, V value) throws IOException { - collector.collect(key, value); + public void write(K key, V value) throws IOException, InterruptedException { + collector.collect(key, value, + partitioner.getPartition(key, value, partitions)); } @Override @@ -520,7 +580,7 @@ if (job.getNumReduceTasks() == 0) { output = outputFormat.getRecordWriter(taskContext); } else { - output = new NewOutputCollector(job, umbilical, reporter); + output = new NewOutputCollector(taskContext, job, umbilical, reporter); } mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(), @@ -545,9 +605,10 @@ } } - interface MapOutputCollector - extends OutputCollector { + interface MapOutputCollector { + public void collect(K key, V value, int partition + ) throws IOException, InterruptedException; public void close() throws IOException, InterruptedException; public void flush() throws IOException, InterruptedException, @@ -587,7 +648,7 @@ ClassNotFoundException { } - public void collect(K key, V value) throws IOException { + public void collect(K key, V value, int partition) throws IOException { reporter.progress(); out.write(key, value); mapOutputRecordCounter.increment(1); @@ -598,7 +659,6 @@ class MapOutputBuffer implements MapOutputCollector, IndexedSortable { private final int partitions; - private final Partitioner partitioner; private final JobConf job; private final TaskReporter reporter; private final Class keyClass; @@ -668,7 +728,6 @@ this.reporter = reporter; localFs = FileSystem.getLocal(job); partitions = job.getNumReduceTasks(); - partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job); rfs = ((LocalFileSystem)localFs).getRaw(); @@ -756,8 +815,8 @@ } } - public synchronized void collect(K key, V value) - throws IOException { + public synchronized void collect(K key, V value, int partition + ) throws IOException { reporter.progress(); if (key.getClass() != keyClass) { throw new IOException("Type mismatch in key from map: expected " @@ -832,7 +891,6 @@ valSerializer.serialize(value); int valend = bb.markRecord(); - final int partition = partitioner.getPartition(key, value, partitions); if (partition < 0 || partition >= partitions) { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); @@ -852,7 +910,7 @@ kvindex = kvnext; } catch (MapBufferTooSmallException e) { LOG.info("Record too large for in-memory buffer: " + e.getMessage()); - spillSingleRecord(key, value); + spillSingleRecord(key, value, partition); mapOutputRecordCounter.increment(1); return; } @@ -1250,11 +1308,10 @@ * the in-memory buffer, so we must spill the record from collect * directly to a spill file. Consider this "losing". */ - private void spillSingleRecord(final K key, final V value) - throws IOException { + private void spillSingleRecord(final K key, final V value, + int partition) throws IOException { long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; - final int partition = partitioner.getPartition(key, value, partitions); try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=794101&r1=794100&r2=794101&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Jul 14 22:41:30 2009 @@ -27,6 +27,12 @@ import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.SecondarySort; +import org.apache.hadoop.examples.WordCount; +import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator; +import org.apache.hadoop.examples.SecondarySort.FirstPartitioner; +import org.apache.hadoop.examples.SecondarySort.IntPair; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -35,7 +41,11 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.MRCaching.TestResult; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.TestMapReduceLocal; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Progressable; /** @@ -82,6 +92,7 @@ assertEquals("number of reduce outputs", 9, counters.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS)); runCustomFormats(mr); + runSecondarySort(mr.createJobConf()); } finally { if (mr != null) { mr.shutdown(); } } @@ -286,4 +297,47 @@ JobConf job) throws IOException { } } + + private void runSecondarySort(Configuration conf) throws IOException, + InterruptedException, + ClassNotFoundException { + FileSystem localFs = FileSystem.getLocal(conf); + localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true); + localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true); + TestMapReduceLocal.writeFile + ("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" + + "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n"); + Job job = new Job(conf, "word count"); + job.setJarByClass(WordCount.class); + job.setNumReduceTasks(2); + job.setMapperClass(SecondarySort.MapClass.class); + job.setReducerClass(SecondarySort.Reduce.class); + // group and partition by the first int in the pair + job.setPartitionerClass(FirstPartitioner.class); + job.setGroupingComparatorClass(FirstGroupingComparator.class); + + // the map output is IntPair, IntWritable + job.setMapOutputKeyClass(IntPair.class); + job.setMapOutputValueClass(IntWritable.class); + + // the reduce output is Text, IntWritable + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in")); + FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out")); + assertTrue(job.waitForCompletion(true)); + String out = TestMapReduceLocal.readFile("out/part-r-00000"); + assertEquals("------------------------------------------------\n" + + "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" + + "------------------------------------------------\n" + + "10\t20\n10\t25\n10\t30\n", out); + out = TestMapReduceLocal.readFile("out/part-r-00001"); + assertEquals("------------------------------------------------\n" + + "-3\t23\n" + + "------------------------------------------------\n" + + "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" + + "------------------------------------------------\n" + + "5\t10\n", out); + } } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=794101&r1=794100&r2=794101&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Jul 14 22:41:30 2009 @@ -28,11 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.MultiFileWordCount; -import org.apache.hadoop.examples.SecondarySort; import org.apache.hadoop.examples.WordCount; -import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator; -import org.apache.hadoop.examples.SecondarySort.FirstPartitioner; -import org.apache.hadoop.examples.SecondarySort.IntPair; import org.apache.hadoop.examples.WordCount.IntSumReducer; import org.apache.hadoop.examples.WordCount.TokenizerMapper; import org.apache.hadoop.fs.FileSystem; @@ -63,7 +59,7 @@ } } - public Path writeFile(String name, String data) throws IOException { + public static Path writeFile(String name, String data) throws IOException { Path file = new Path(TEST_ROOT_DIR + "/" + name); localFs.delete(file, false); DataOutputStream f = localFs.create(file); @@ -72,7 +68,7 @@ return file; } - public String readFile(String name) throws IOException { + public static String readFile(String name) throws IOException { DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name)); BufferedReader b = new BufferedReader(new InputStreamReader(f)); StringBuilder result = new StringBuilder(); @@ -92,7 +88,6 @@ mr = new MiniMRCluster(2, "file:///", 3); Configuration conf = mr.createJobConf(); runWordCount(conf); - runSecondarySort(conf); runMultiFileWordCount(conf); } finally { if (mr != null) { mr.shutdown(); } @@ -168,45 +163,6 @@ assertTrue("combine in > combine out", combineIn > combineOut); } - private void runSecondarySort(Configuration conf) throws IOException, - InterruptedException, - ClassNotFoundException { - localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true); - localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true); - writeFile("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" + - "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n"); - Job job = new Job(conf, "word count"); - job.setJarByClass(WordCount.class); - job.setMapperClass(SecondarySort.MapClass.class); - job.setReducerClass(SecondarySort.Reduce.class); - // group and partition by the first int in the pair - job.setPartitionerClass(FirstPartitioner.class); - job.setGroupingComparatorClass(FirstGroupingComparator.class); - - // the map output is IntPair, IntWritable - job.setMapOutputKeyClass(IntPair.class); - job.setMapOutputValueClass(IntWritable.class); - - // the reduce output is Text, IntWritable - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in")); - FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out")); - assertTrue(job.waitForCompletion(true)); - String out = readFile("out/part-r-00000"); - assertEquals("------------------------------------------------\n" + - "-3\t23\n" + - "------------------------------------------------\n" + - "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" + - "------------------------------------------------\n" + - "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" + - "------------------------------------------------\n" + - "5\t10\n" + - "------------------------------------------------\n" + - "10\t20\n10\t25\n10\t30\n", out); - } - public void runMultiFileWordCount(Configuration conf) throws Exception { localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true); localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);