Author: ddas Date: Wed Mar 12 21:15:29 2008 New Revision: 636623 URL: http://svn.apache.org/viewvc?rev=636623&view=rev Log: HADOOP-2399. Input key and value to combiner and reducer is reused. Contributed by Owen O'Malley. Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=636623&r1=636622&r2=636623&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Wed Mar 12 21:15:29 2008 @@ -91,6 +91,9 @@ HADOOP-2758. Reduce buffer copies in DataNode when data is read from HDFS, without negatively affecting read throughput. (rangadi) + HADOOP-2399. Input key and value to combiner and reducer is reused. + (Owen O'Malley via ddas). + BUG FIXES HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=636623&r1=636622&r2=636623&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Mar 12 21:15:29 2008 @@ -681,16 +681,19 @@ //empty for now } - private class CombineValuesIterator extends ValuesIterator { + private class CombineValuesIterator + extends ValuesIterator { public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, - RawComparator comparator, Class keyClass, - Class valClass, Configuration conf, Reporter reporter) + RawComparator comparator, + Class keyClass, + Class valClass, Configuration conf, + Reporter reporter) throws IOException { super(in, comparator, keyClass, valClass, conf, reporter); } - public Object next() { + public VALUE next() { combineInputCounter.increment(1); return super.next(); } Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=636623&r1=636622&r2=636623&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Mar 12 21:15:29 2008 @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Random; import java.util.Set; import java.util.SortedSet; @@ -51,7 +52,6 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.ChecksumFileSystem; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.InputBuffer; import org.apache.hadoop.io.IntWritable; @@ -67,6 +67,7 @@ import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.Updater; import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -184,50 +185,60 @@ } /** Iterates values while keys match in sorted input. */ - static class ValuesIterator implements Iterator { + static class ValuesIterator implements Iterator { private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator - private Object key; // current key - private Object value; // current value + private KEY key; // current key + private KEY nextKey; + private VALUE value; // current value private boolean hasNext; // more w/ this key private boolean more; // more in file - private RawComparator comparator; - private DataOutputBuffer valOut = new DataOutputBuffer(); + private RawComparator comparator; + private DataOutputBuffer nextValue = new DataOutputBuffer(); private InputBuffer valIn = new InputBuffer(); private InputBuffer keyIn = new InputBuffer(); - protected Reporter reporter; - private Deserializer keyDeserializer; - private Deserializer valDeserializer; + protected Progressable reporter; + private Deserializer keyDeserializer; + private Deserializer valDeserializer; @SuppressWarnings("unchecked") public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, - RawComparator comparator, Class keyClass, - Class valClass, Configuration conf, - Reporter reporter) + RawComparator comparator, + Class keyClass, + Class valClass, Configuration conf, + Progressable reporter) throws IOException { this.in = in; this.comparator = comparator; this.reporter = reporter; + nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf); + value = (VALUE) ReflectionUtils.newInstance(valClass, conf); SerializationFactory serializationFactory = new SerializationFactory(conf); this.keyDeserializer = serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(keyIn); this.valDeserializer = serializationFactory.getDeserializer(valClass); this.valDeserializer.open(valIn); - getNext(); + readNextKey(); + key = nextKey; + nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf); + hasNext = more; } /// Iterator methods public boolean hasNext() { return hasNext; } - public Object next() { - Object result = value; // save value + public VALUE next() { + if (!hasNext) { + throw new NoSuchElementException("iterate past last value"); + } try { - getNext(); // move to next - } catch (IOException e) { - throw new RuntimeException(e); + readNextValue(); + readNextKey(); + } catch (IOException ie) { + throw new RuntimeException("problem advancing", ie); } reporter.progress(); - return result; // return saved value + return value; } public void remove() { throw new RuntimeException("not implemented"); } @@ -235,45 +246,62 @@ /// Auxiliary methods /** Start processing next unique key. */ - public void nextKey() { - while (hasNext) { next(); } // skip any unread + public void nextKey() throws IOException { + // read until we find a new key + while (hasNext) { + readNextKey(); + } + // move the next key to the current one + KEY tmpKey = key; + key = nextKey; + nextKey = tmpKey; hasNext = more; } /** True iff more keys remain. */ - public boolean more() { return more; } + public boolean more() { + return more; + } /** The current key. */ - public Object getKey() { return key; } + public Object getKey() { + return key; + } - @SuppressWarnings("unchecked") - private void getNext() throws IOException { - Object lastKey = key; // save previous key + /** + * read the next key + */ + private void readNextKey() throws IOException { more = in.next(); if (more) { - //de-serialize the raw key/value - keyIn.reset(in.getKey().getData(), in.getKey().getLength()); - key = keyDeserializer.deserialize(null); // force new object - valOut.reset(); - (in.getValue()).writeUncompressedBytes(valOut); - valIn.reset(valOut.getData(), valOut.getLength()); - value = valDeserializer.deserialize(null); // force new object - - if (lastKey == null) { - hasNext = true; - } else { - hasNext = (comparator.compare(key, lastKey) == 0); - } + DataOutputBuffer nextKeyBytes = in.getKey(); + keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getLength()); + keyDeserializer.deserialize(nextKey); + hasNext = key != null && (comparator.compare(key, nextKey) == 0); } else { hasNext = false; } } + + /** + * Read the next value + * @throws IOException + */ + private void readNextValue() throws IOException { + nextValue.reset(); + in.getValue().writeUncompressedBytes(nextValue); + valIn.reset(nextValue.getData(), nextValue.getLength()); + valDeserializer.deserialize(value); + } } - private class ReduceValuesIterator extends ValuesIterator { + + private class ReduceValuesIterator + extends ValuesIterator { public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, - RawComparator comparator, Class keyClass, - Class valClass, - Configuration conf, Reporter reporter) + RawComparator comparator, + Class keyClass, + Class valClass, + Configuration conf, Progressable reporter) throws IOException { super(in, comparator, keyClass, valClass, conf, reporter); } @@ -281,7 +309,7 @@ reducePhase.set(super.in.getProgress().get()); // update progress reporter.progress(); } - public Object next() { + public VALUE next() { reduceInputValueCounter.increment(1); return super.next(); } @@ -809,7 +837,7 @@ return CopyResult.OBSOLETE; } - bytes = fs.getLength(tmpFilename); + bytes = fs.getFileStatus(tmpFilename).getLen(); //resolve the final filename against the directory where the tmpFile //got created filename = new Path(tmpFilename.getParent(), filename.getName()); @@ -1065,7 +1093,7 @@ // all reduce-tasks swamping the same tasktracker Collections.shuffle(knownOutputs, this.random); - Iterator locIt = knownOutputs.iterator(); + Iterator locIt = knownOutputs.iterator(); currentTime = System.currentTimeMillis(); while (locIt.hasNext()) { @@ -1255,7 +1283,7 @@ // the failure is due to a lost tasktracker (causes many // unnecessary backoffs). If not, we only take a small hit // polling the tasktracker a few more times - Iterator locIt = knownOutputs.iterator(); + Iterator locIt = knownOutputs.iterator(); while (locIt.hasNext()) { MapOutputLocation loc = (MapOutputLocation)locIt.next(); if (cr.getHost().equals(loc.getHost())) { Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java?rev=636623&r1=636622&r2=636623&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java Wed Mar 12 21:15:29 2008 @@ -80,7 +80,7 @@ */ public void addNextValue(Object val) { if (this.numItems <= this.maxNumItems) { - uniqItems.put(val, "1"); + uniqItems.put(val.toString(), "1"); this.numItems = this.uniqItems.size(); } } @@ -122,4 +122,4 @@ } return retv; } -} \ No newline at end of file +} Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?rev=636623&r1=636622&r2=636623&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Wed Mar 12 21:15:29 2008 @@ -218,8 +218,8 @@ // match the real string. check if there are 3 instances or not. Path result = new Path(TEST_ROOT_DIR + "/test.txt"); { - BufferedReader file = new BufferedReader(new InputStreamReader( - FileSystem.getLocal(conf).open(result))); + BufferedReader file = new BufferedReader + (new InputStreamReader(FileSystem.getLocal(conf).open(result))); String line = file.readLine(); while (line != null) { if (!testStr.equals(line)) Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=636623&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Wed Mar 12 21:15:29 2008 @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.util.Progressable; + +/** + * This test exercises the ValueIterator. + */ +public class TestReduceTask extends TestCase { + + static class NullProgress implements Progressable { + public void progress() { } + } + + private static class Pair { + String key; + String value; + Pair(String k, String v) { + key = k; + value = v; + } + } + private static Pair[][] testCases = + new Pair[][]{ + new Pair[]{ + new Pair("k1", "v1"), + new Pair("k2", "v2"), + new Pair("k3", "v3"), + new Pair("k3", "v4"), + new Pair("k4", "v5"), + new Pair("k5", "v6"), + }, + new Pair[]{ + new Pair("", "v1"), + new Pair("k1", "v2"), + new Pair("k2", "v3"), + new Pair("k2", "v4"), + }, + new Pair[] {}, + new Pair[]{ + new Pair("k1", "v1"), + new Pair("k1", "v2"), + new Pair("k1", "v3"), + new Pair("k1", "v4"), + } + }; + + public void runValueIterator(Path tmpDir, Pair[] vals, + Configuration conf) throws IOException { + FileSystem fs = tmpDir.getFileSystem(conf); + Path path = new Path(tmpDir, "data.in"); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, + Text.class, + Text.class); + for(Pair p: vals) { + writer.append(new Text(p.key), new Text(p.value)); + } + writer.close(); + SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, + Text.class, conf); + SequenceFile.Sorter.RawKeyValueIterator rawItr = + sorter.merge(new Path[]{path}, false, tmpDir); + ReduceTask.ValuesIterator valItr = + new ReduceTask.ValuesIterator(rawItr, WritableComparator.get(Text.class), + Text.class, Text.class, + conf, new NullProgress()); + int i = 0; + while (valItr.more()) { + Object key = valItr.getKey(); + String keyString = key.toString(); + // make sure it matches! + assertEquals(vals[i].key, keyString); + // must have at least 1 value! + assertTrue(valItr.hasNext()); + while (valItr.hasNext()) { + String valueString = valItr.next().toString(); + // make sure the values match + assertEquals(vals[i].value, valueString); + // make sure the keys match + assertEquals(vals[i].key, valItr.getKey().toString()); + i += 1; + } + // make sure the key hasn't changed under the hood + assertEquals(keyString, valItr.getKey().toString()); + valItr.nextKey(); + } + assertEquals(vals.length, i); + } + + public void testValueIterator() throws Exception { + Path tmpDir = new Path("build/test/test.reduce.task"); + Configuration conf = new Configuration(); + for (Pair[] testCase: testCases) { + runValueIterator(tmpDir, testCase, conf); + } + } +}