Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 55701 invoked from network); 7 Aug 2009 11:54:45 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 7 Aug 2009 11:54:45 -0000 Received: (qmail 7056 invoked by uid 500); 7 Aug 2009 11:54:53 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 7015 invoked by uid 500); 7 Aug 2009 11:54:53 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 7005 invoked by uid 99); 7 Aug 2009 11:54:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Aug 2009 11:54:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Aug 2009 11:54:47 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F1ECF23888D0; Fri, 7 Aug 2009 11:54:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r801959 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/java/org/apache/hadoop/mapreduce/lib/output/ src/test/mapred/org/apache/h... Date: Fri, 07 Aug 2009 11:54:25 -0000 To: mapreduce-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090807115425.F1ECF23888D0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Fri Aug 7 11:54:25 2009 New Revision: 801959 URL: http://svn.apache.org/viewvc?rev=801959&view=rev Log: MAPREDUCE-375. Change org.apache.hadoop.mapred.lib.NLineInputFormat and org.apache.hadoop.mapred.MapFileOutputFormat to use new api. Contributed by Amareshwari Sriramadasu. Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=801959&r1=801958&r2=801959&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri Aug 7 11:54:25 2009 @@ -171,6 +171,10 @@ MAPREDUCE-670. Creates ant target for 10 mins patch test build. (Jothi Padmanabhan via gkesavan) + MAPREDUCE-375. Change org.apache.hadoop.mapred.lib.NLineInputFormat + and org.apache.hadoop.mapred.MapFileOutputFormat to use new api. + (Amareshwari Sriramadasu via ddas) + BUG FIXES MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy. (Aaron Kimball via matei) Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java?rev=801959&r1=801958&r2=801959&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java Fri Aug 7 11:54:25 2009 @@ -22,7 +22,6 @@ import java.io.DataInput; import java.io.DataOutput; -import org.apache.hadoop.io.UTF8; import org.apache.hadoop.fs.Path; /** A section of an input file. Returned by {@link @@ -34,12 +33,10 @@ @Deprecated public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit implements InputSplit { - private Path file; - private long start; - private long length; - private String[] hosts; - - FileSplit() {} + org.apache.hadoop.mapreduce.lib.input.FileSplit fs; + FileSplit() { + fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(); + } /** Constructs a split. * @deprecated @@ -60,45 +57,38 @@ * @param hosts the list of hosts containing the block, possibly null */ public FileSplit(Path file, long start, long length, String[] hosts) { - this.file = file; - this.start = start; - this.length = length; - this.hosts = hosts; + fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start, + length, hosts); + } + + public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) { + this.fs = fs; } /** The file containing this split's data. */ - public Path getPath() { return file; } + public Path getPath() { return fs.getPath(); } /** The position of the first byte in the file to process. */ - public long getStart() { return start; } + public long getStart() { return fs.getStart(); } /** The number of bytes in the file to process. */ - public long getLength() { return length; } + public long getLength() { return fs.getLength(); } - public String toString() { return file + ":" + start + "+" + length; } + public String toString() { return fs.toString(); } //////////////////////////////////////////// // Writable methods //////////////////////////////////////////// public void write(DataOutput out) throws IOException { - UTF8.writeString(out, file.toString()); - out.writeLong(start); - out.writeLong(length); + fs.write(out); } public void readFields(DataInput in) throws IOException { - file = new Path(UTF8.readString(in)); - start = in.readLong(); - length = in.readLong(); - hosts = null; + fs.readFields(in); } public String[] getLocations() throws IOException { - if (this.hosts == null) { - return new String[]{}; - } else { - return this.hosts; - } + return fs.getLocations(); } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=801959&r1=801958&r2=801959&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Fri Aug 7 11:54:25 2009 @@ -19,11 +19,9 @@ package org.apache.hadoop.mapred; import java.io.IOException; -import java.util.Arrays; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.WritableComparable; @@ -35,7 +33,11 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -/** An {@link OutputFormat} that writes {@link MapFile}s. */ +/** An {@link OutputFormat} that writes {@link MapFile}s. + * @deprecated Use + * {@link org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat} instead + */ +@Deprecated public class MapFileOutputFormat extends FileOutputFormat { @@ -81,18 +83,9 @@ /** Open the output generated by this format. */ public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir, Configuration conf) - throws IOException { - FileSystem fs = dir.getFileSystem(conf); - Path[] names = FileUtil.stat2Paths(fs.listStatus(dir)); - - // sort names, so that hash partitioning works - Arrays.sort(names); - - MapFile.Reader[] parts = new MapFile.Reader[names.length]; - for (int i = 0; i < names.length; i++) { - parts[i] = new MapFile.Reader(fs, names[i].toString(), conf); - } - return parts; + throws IOException { + return org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat. + getReaders(dir, conf); } /** Get an entry from output generated by this class. */ Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=801959&r1=801958&r2=801959&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java Fri Aug 7 11:54:25 2009 @@ -21,10 +21,7 @@ import java.io.IOException; import java.util.ArrayList; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; @@ -35,7 +32,6 @@ import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.LineReader; /** * NLineInputFormat which splits N lines of input as one split. @@ -54,8 +50,10 @@ * a value to one map task, and key is the offset. * i.e. (k,v) is (LongWritable, Text). * The location hints will span the whole mapred cluster. + * @deprecated Use + * {@link org.apache.hadoop.mapreduce.lib.input.NLineInputFormat} instead */ - +@Deprecated public class NLineInputFormat extends FileInputFormat implements JobConfigurable { private int N = 1; @@ -79,46 +77,10 @@ throws IOException { ArrayList splits = new ArrayList(); for (FileStatus status : listStatus(job)) { - Path fileName = status.getPath(); - if (status.isDir()) { - throw new IOException("Not a file: " + fileName); - } - FileSystem fs = fileName.getFileSystem(job); - LineReader lr = null; - try { - FSDataInputStream in = fs.open(fileName); - lr = new LineReader(in, job); - Text line = new Text(); - int numLines = 0; - long begin = 0; - long length = 0; - int num = -1; - while ((num = lr.readLine(line)) > 0) { - numLines++; - length += num; - if (numLines == N) { - // NLineInputFormat uses LineRecordReader, which always reads (and consumes) - //at least one character out of its upper split boundary. So to make sure that - //each mapper gets N lines, we move back the upper split limits of each split - //by one character here. - if (begin == 0) { - splits.add(new FileSplit(fileName, begin, length - 1, new String[] {})); - } else { - splits.add(new FileSplit(fileName, begin - 1, length, new String[] {})); - } - begin += length; - length = 0; - numLines = 0; - } - } - if (numLines != 0) { - splits.add(new FileSplit(fileName, begin, length, new String[]{})); - } - - } finally { - if (lr != null) { - lr.close(); - } + for (org.apache.hadoop.mapreduce.lib.input.FileSplit split : + org.apache.hadoop.mapreduce.lib.input. + NLineInputFormat.getSplitsForFile(status, job, N)) { + splits.add(new FileSplit(split)); } } return splits.toArray(new FileSplit[splits.size()]); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=801959&r1=801958&r2=801959&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Fri Aug 7 11:54:25 2009 @@ -38,7 +38,7 @@ private long length; private String[] hosts; - FileSplit() {} + public FileSplit() {} /** Constructs a split with host information * Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=801959&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Fri Aug 7 11:54:25 2009 @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.LineReader; + +/** + * NLineInputFormat which splits N lines of input as one split. + * + * In many "pleasantly" parallel applications, each process/mapper + * processes the same input file (s), but with computations are + * controlled by different parameters.(Referred to as "parameter sweeps"). + * One way to achieve this, is to specify a set of parameters + * (one set per line) as input in a control file + * (which is the input path to the map-reduce application, + * where as the input dataset is specified + * via a config variable in JobConf.). + * + * The NLineInputFormat can be used in such applications, that splits + * the input file such that by default, one line is fed as + * a value to one map task, and key is the offset. + * i.e. (k,v) is (LongWritable, Text). + * The location hints will span the whole mapred cluster. + */ + +public class NLineInputFormat extends FileInputFormat { + + public RecordReader createRecordReader( + InputSplit genericSplit, TaskAttemptContext context) + throws IOException { + context.setStatus(genericSplit.toString()); + return new LineRecordReader(); + } + + /** + * Logically splits the set of input files for the job, splits N lines + * of the input as one split. + * + * @see FileInputFormat#getSplits(JobContext) + */ + public List getSplits(JobContext job) + throws IOException { + List splits = new ArrayList(); + int numLinesPerSplit = getNumLinesPerSplit(job); + for (FileStatus status : listStatus(job)) { + splits.addAll(getSplitsForFile(status, + job.getConfiguration(), numLinesPerSplit)); + } + return splits; + } + + public static List getSplitsForFile(FileStatus status, + Configuration conf, int numLinesPerSplit) throws IOException { + List splits = new ArrayList (); + Path fileName = status.getPath(); + if (status.isDir()) { + throw new IOException("Not a file: " + fileName); + } + FileSystem fs = fileName.getFileSystem(conf); + LineReader lr = null; + try { + FSDataInputStream in = fs.open(fileName); + lr = new LineReader(in, conf); + Text line = new Text(); + int numLines = 0; + long begin = 0; + long length = 0; + int num = -1; + while ((num = lr.readLine(line)) > 0) { + numLines++; + length += num; + if (numLines == numLinesPerSplit) { + // NLineInputFormat uses LineRecordReader, which always reads + // (and consumes) at least one character out of its upper split + // boundary. So to make sure that each mapper gets N lines, we + // move back the upper split limits of each split + // by one character here. + if (begin == 0) { + splits.add(new FileSplit(fileName, begin, length - 1, + new String[] {})); + } else { + splits.add(new FileSplit(fileName, begin - 1, length, + new String[] {})); + } + begin += length; + length = 0; + numLines = 0; + } + } + if (numLines != 0) { + splits.add(new FileSplit(fileName, begin, length, new String[]{})); + } + } finally { + if (lr != null) { + lr.close(); + } + } + return splits; + } + + /** + * Set the number of lines per split + * @param job the job to modify + * @param numLines the number of lines per split + */ + public static void setNumLinesPerSplit(Job job, int numLines) { + job.getConfiguration().setInt( + "mapred.line.input.format.linespermap", numLines); + } + + /** + * Get the number of lines per split + * @param job the job + * @return the number of lines per split + */ + public static int getNumLinesPerSplit(JobContext job) { + return job.getConfiguration().getInt( + "mapred.line.input.format.linespermap", 1); + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java?rev=801959&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java Fri Aug 7 11:54:25 2009 @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileUtil; + +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.conf.Configuration; + +/** + * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes + * {@link MapFile}s. + */ +public class MapFileOutputFormat + extends FileOutputFormat, Writable> { + + public RecordWriter, Writable> getRecordWriter( + TaskAttemptContext context) throws IOException { + Configuration conf = context.getConfiguration(); + CompressionCodec codec = null; + CompressionType compressionType = CompressionType.NONE; + if (getCompressOutput(context)) { + // find the kind of compression to do + compressionType = SequenceFileOutputFormat.getOutputCompressionType(context); + + // find the right codec + Class codecClass = getOutputCompressorClass(context, + DefaultCodec.class); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); + } + + Path file = getDefaultWorkFile(context, ""); + FileSystem fs = file.getFileSystem(conf); + // ignore the progress parameter, since MapFile is local + final MapFile.Writer out = + new MapFile.Writer(conf, fs, file.toString(), + context.getOutputKeyClass().asSubclass(WritableComparable.class), + context.getOutputValueClass().asSubclass(Writable.class), + compressionType, codec, context); + + return new RecordWriter, Writable>() { + public void write(WritableComparable key, Writable value) + throws IOException { + out.append(key, value); + } + + public void close(TaskAttemptContext context) throws IOException { + out.close(); + } + }; + } + + /** Open the output generated by this format. */ + public static MapFile.Reader[] getReaders(Path dir, + Configuration conf) throws IOException { + FileSystem fs = dir.getFileSystem(conf); + Path[] names = FileUtil.stat2Paths(fs.listStatus(dir)); + + // sort names, so that hash partitioning works + Arrays.sort(names); + + MapFile.Reader[] parts = new MapFile.Reader[names.length]; + for (int i = 0; i < names.length; i++) { + parts[i] = new MapFile.Reader(fs, names[i].toString(), conf); + } + return parts; + } + + /** Get an entry from output generated by this class. */ + public static , V extends Writable> + Writable getEntry(MapFile.Reader[] readers, + Partitioner partitioner, K key, V value) throws IOException { + int part = partitioner.getPartition(key, value, readers.length); + return readers[part].get(key, value); + } +} + Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java?rev=801959&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java Fri Aug 7 11:54:25 2009 @@ -0,0 +1,500 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.Iterator; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +/********************************************************** + * MapredLoadTest generates a bunch of work that exercises + * a Hadoop Map-Reduce system (and DFS, too). It goes through + * the following steps: + * + * 1) Take inputs 'range' and 'counts'. + * 2) Generate 'counts' random integers between 0 and range-1. + * 3) Create a file that lists each integer between 0 and range-1, + * and lists the number of times that integer was generated. + * 4) Emit a (very large) file that contains all the integers + * in the order generated. + * 5) After the file has been generated, read it back and count + * how many times each int was generated. + * 6) Compare this big count-map against the original one. If + * they match, then SUCCESS! Otherwise, FAILURE! + * + * OK, that's how we can think about it. What are the map-reduce + * steps that get the job done? + * + * 1) In a non-mapred thread, take the inputs 'range' and 'counts'. + * 2) In a non-mapread thread, generate the answer-key and write to disk. + * 3) In a mapred job, divide the answer key into K jobs. + * 4) A mapred 'generator' task consists of K map jobs. Each reads + * an individual "sub-key", and generates integers according to + * to it (though with a random ordering). + * 5) The generator's reduce task agglomerates all of those files + * into a single one. + * 6) A mapred 'reader' task consists of M map jobs. The output + * file is cut into M pieces. Each of the M jobs counts the + * individual ints in its chunk and creates a map of all seen ints. + * 7) A mapred job integrates all the count files into a single one. + * + **********************************************************/ +public class TestMapReduce extends TestCase { + + private static FileSystem fs; + + static { + try { + fs = FileSystem.getLocal(new Configuration()); + } catch (IOException ioe) { + fs = null; + } + } + + /** + * Modified to make it a junit test. + * The RandomGen Job does the actual work of creating + * a huge file of assorted numbers. It receives instructions + * as to how many times each number should be counted. Then + * it emits those numbers in a crazy order. + * + * The map() function takes a key/val pair that describes + * a value-to-be-emitted (the key) and how many times it + * should be emitted (the value), aka "numtimes". map() then + * emits a series of intermediate key/val pairs. It emits + * 'numtimes' of these. The key is a random number and the + * value is the 'value-to-be-emitted'. + * + * The system collates and merges these pairs according to + * the random number. reduce() function takes in a key/value + * pair that consists of a crazy random number and a series + * of values that should be emitted. The random number key + * is now dropped, and reduce() emits a pair for every intermediate value. + * The emitted key is an intermediate value. The emitted value + * is just a blank string. Thus, we've created a huge file + * of numbers in random order, but where each number appears + * as many times as we were instructed. + */ + static class RandomGenMapper + extends Mapper { + + public void map(IntWritable key, IntWritable val, + Context context) throws IOException, InterruptedException { + int randomVal = key.get(); + int randomCount = val.get(); + + for (int i = 0; i < randomCount; i++) { + context.write(new IntWritable(Math.abs(r.nextInt())), + new IntWritable(randomVal)); + } + } + } + /** + */ + static class RandomGenReducer + extends Reducer { + + public void reduce(IntWritable key, Iterable it, + Context context) throws IOException, InterruptedException { + for (IntWritable iw : it) { + context.write(iw, null); + } + } + } + + /** + * The RandomCheck Job does a lot of our work. It takes + * in a num/string keyspace, and transforms it into a + * key/count(int) keyspace. + * + * The map() function just emits a num/1 pair for every + * num/string input pair. + * + * The reduce() function sums up all the 1s that were + * emitted for a single key. It then emits the key/total + * pair. + * + * This is used to regenerate the random number "answer key". + * Each key here is a random number, and the count is the + * number of times the number was emitted. + */ + static class RandomCheckMapper + extends Mapper, Text, IntWritable, IntWritable> { + + public void map(WritableComparable key, Text val, + Context context) throws IOException, InterruptedException { + context.write(new IntWritable( + Integer.parseInt(val.toString().trim())), new IntWritable(1)); + } + } + + /** + */ + static class RandomCheckReducer + extends Reducer { + public void reduce(IntWritable key, Iterable it, + Context context) throws IOException, InterruptedException { + int keyint = key.get(); + int count = 0; + for (IntWritable iw : it) { + count++; + } + context.write(new IntWritable(keyint), new IntWritable(count)); + } + } + + /** + * The Merge Job is a really simple one. It takes in + * an int/int key-value set, and emits the same set. + * But it merges identical keys by adding their values. + * + * Thus, the map() function is just the identity function + * and reduce() just sums. Nothing to see here! + */ + static class MergeMapper + extends Mapper { + + public void map(IntWritable key, IntWritable val, + Context context) throws IOException, InterruptedException { + int keyint = key.get(); + int valint = val.get(); + context.write(new IntWritable(keyint), new IntWritable(valint)); + } + } + + static class MergeReducer + extends Reducer { + public void reduce(IntWritable key, Iterator it, + Context context) throws IOException, InterruptedException { + int keyint = key.get(); + int total = 0; + while (it.hasNext()) { + total += it.next().get(); + } + context.write(new IntWritable(keyint), new IntWritable(total)); + } + } + + private static int range = 10; + private static int counts = 100; + private static Random r = new Random(); + + public void testMapred() throws Exception { + launch(); + } + + private static void launch() throws Exception { + // + // Generate distribution of ints. This is the answer key. + // + Configuration conf = new Configuration(); + int countsToGo = counts; + int dist[] = new int[range]; + for (int i = 0; i < range; i++) { + double avgInts = (1.0 * countsToGo) / (range - i); + dist[i] = (int) Math.max(0, Math.round(avgInts + + (Math.sqrt(avgInts) * r.nextGaussian()))); + countsToGo -= dist[i]; + } + if (countsToGo > 0) { + dist[dist.length-1] += countsToGo; + } + + // + // Write the answer key to a file. + // + Path testdir = new Path("mapred.loadtest"); + if (!fs.mkdirs(testdir)) { + throw new IOException("Mkdirs failed to create " + testdir.toString()); + } + + Path randomIns = new Path(testdir, "genins"); + if (!fs.mkdirs(randomIns)) { + throw new IOException("Mkdirs failed to create " + randomIns.toString()); + } + + Path answerkey = new Path(randomIns, "answer.key"); + SequenceFile.Writer out = + SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class, + IntWritable.class, + SequenceFile.CompressionType.NONE); + try { + for (int i = 0; i < range; i++) { + out.append(new IntWritable(i), new IntWritable(dist[i])); + } + } finally { + out.close(); + } + + printFiles(randomIns, conf); + + // + // Now we need to generate the random numbers according to + // the above distribution. + // + // We create a lot of map tasks, each of which takes at least + // one "line" of the distribution. (That is, a certain number + // X is to be generated Y number of times.) + // + // A map task emits Y key/val pairs. The val is X. The key + // is a randomly-generated number. + // + // The reduce task gets its input sorted by key. That is, sorted + // in random order. It then emits a single line of text that + // for the given values. It does not emit the key. + // + // Because there's just one reduce task, we emit a single big + // file of random numbers. + // + Path randomOuts = new Path(testdir, "genouts"); + fs.delete(randomOuts, true); + + + Job genJob = new Job(conf); + FileInputFormat.setInputPaths(genJob, randomIns); + genJob.setInputFormatClass(SequenceFileInputFormat.class); + genJob.setMapperClass(RandomGenMapper.class); + + FileOutputFormat.setOutputPath(genJob, randomOuts); + genJob.setOutputKeyClass(IntWritable.class); + genJob.setOutputValueClass(IntWritable.class); + genJob.setReducerClass(RandomGenReducer.class); + genJob.setNumReduceTasks(1); + + genJob.waitForCompletion(true); + printFiles(randomOuts, conf); + + // + // Next, we read the big file in and regenerate the + // original map. It's split into a number of parts. + // (That number is 'intermediateReduces'.) + // + // We have many map tasks, each of which read at least one + // of the output numbers. For each number read in, the + // map task emits a key/value pair where the key is the + // number and the value is "1". + // + // We have a single reduce task, which receives its input + // sorted by the key emitted above. For each key, there will + // be a certain number of "1" values. The reduce task sums + // these values to compute how many times the given key was + // emitted. + // + // The reduce task then emits a key/val pair where the key + // is the number in question, and the value is the number of + // times the key was emitted. This is the same format as the + // original answer key (except that numbers emitted zero times + // will not appear in the regenerated key.) The answer set + // is split into a number of pieces. A final MapReduce job + // will merge them. + // + // There's not really a need to go to 10 reduces here + // instead of 1. But we want to test what happens when + // you have multiple reduces at once. + // + int intermediateReduces = 10; + Path intermediateOuts = new Path(testdir, "intermediateouts"); + fs.delete(intermediateOuts, true); + Job checkJob = new Job(conf); + FileInputFormat.setInputPaths(checkJob, randomOuts); + checkJob.setMapperClass(RandomCheckMapper.class); + + FileOutputFormat.setOutputPath(checkJob, intermediateOuts); + checkJob.setOutputKeyClass(IntWritable.class); + checkJob.setOutputValueClass(IntWritable.class); + checkJob.setOutputFormatClass(MapFileOutputFormat.class); + checkJob.setReducerClass(RandomCheckReducer.class); + checkJob.setNumReduceTasks(intermediateReduces); + checkJob.waitForCompletion(true); + printFiles(intermediateOuts, conf); + + // + // OK, now we take the output from the last job and + // merge it down to a single file. The map() and reduce() + // functions don't really do anything except reemit tuples. + // But by having a single reduce task here, we end up merging + // all the files. + // + Path finalOuts = new Path(testdir, "finalouts"); + fs.delete(finalOuts, true); + Job mergeJob = new Job(conf); + FileInputFormat.setInputPaths(mergeJob, intermediateOuts); + mergeJob.setInputFormatClass(SequenceFileInputFormat.class); + mergeJob.setMapperClass(MergeMapper.class); + + FileOutputFormat.setOutputPath(mergeJob, finalOuts); + mergeJob.setOutputKeyClass(IntWritable.class); + mergeJob.setOutputValueClass(IntWritable.class); + mergeJob.setOutputFormatClass(SequenceFileOutputFormat.class); + mergeJob.setReducerClass(MergeReducer.class); + mergeJob.setNumReduceTasks(1); + + mergeJob.waitForCompletion(true); + printFiles(finalOuts, conf); + + // + // Finally, we compare the reconstructed answer key with the + // original one. Remember, we need to ignore zero-count items + // in the original key. + // + boolean success = true; + Path recomputedkey = new Path(finalOuts, "part-r-00000"); + SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf); + int totalseen = 0; + try { + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + for (int i = 0; i < range; i++) { + if (dist[i] == 0) { + continue; + } + if (!in.next(key, val)) { + System.err.println("Cannot read entry " + i); + success = false; + break; + } else { + if (!((key.get() == i) && (val.get() == dist[i]))) { + System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i + + ", val=" + val.get() + ", dist[i]=" + dist[i]); + success = false; + } + totalseen += val.get(); + } + } + if (success) { + if (in.next(key, val)) { + System.err.println("Unnecessary lines in recomputed key!"); + success = false; + } + } + } finally { + in.close(); + } + int originalTotal = 0; + for (int i = 0; i < dist.length; i++) { + originalTotal += dist[i]; + } + System.out.println("Original sum: " + originalTotal); + System.out.println("Recomputed sum: " + totalseen); + + // + // Write to "results" whether the test succeeded or not. + // + Path resultFile = new Path(testdir, "results"); + BufferedWriter bw = new BufferedWriter( + new OutputStreamWriter(fs.create(resultFile))); + try { + bw.write("Success=" + success + "\n"); + System.out.println("Success=" + success); + } finally { + bw.close(); + } + assertTrue("testMapRed failed", success); + fs.delete(testdir, true); + } + + private static void printTextFile(FileSystem fs, Path p) throws IOException { + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(p))); + String line; + while ((line = in.readLine()) != null) { + System.out.println(" Row: " + line); + } + in.close(); + } + + private static void printSequenceFile(FileSystem fs, Path p, + Configuration conf) throws IOException { + SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf); + Object key = null; + Object value = null; + while ((key = r.next(key)) != null) { + value = r.getCurrentValue(value); + System.out.println(" Row: " + key + ", " + value); + } + r.close(); + } + + private static boolean isSequenceFile(FileSystem fs, + Path f) throws IOException { + DataInputStream in = fs.open(f); + byte[] seq = "SEQ".getBytes(); + for(int i=0; i < seq.length; ++i) { + if (seq[i] != in.read()) { + return false; + } + } + return true; + } + + private static void printFiles(Path dir, + Configuration conf) throws IOException { + FileSystem fs = dir.getFileSystem(conf); + for(FileStatus f: fs.listStatus(dir)) { + System.out.println("Reading " + f.getPath() + ": "); + if (f.isDir()) { + System.out.println(" it is a map file."); + printSequenceFile(fs, new Path(f.getPath(), "data"), conf); + } else if (isSequenceFile(fs, f.getPath())) { + System.out.println(" it is a sequence file."); + printSequenceFile(fs, f.getPath(), conf); + } else { + System.out.println(" it is a text file."); + printTextFile(fs, f.getPath()); + } + } + } + + /** + * Launches all the tasks in order. + */ + public static void main(String[] argv) throws Exception { + if (argv.length < 2) { + System.err.println("Usage: TestMapReduce "); + System.err.println(); + System.err.println("Note: a good test will have a value" + + " that is substantially larger than the "); + return; + } + + int i = 0; + range = Integer.parseInt(argv[i++]); + counts = Integer.parseInt(argv[i++]); + launch(); + } +} Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=801959&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Fri Aug 7 11:54:25 2009 @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; + +public class TestNLineInputFormat extends TestCase { + private static int MAX_LENGTH = 200; + + private static Configuration conf = new Configuration(); + private static FileSystem localFs = null; + + static { + try { + localFs = FileSystem.getLocal(conf); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + + private static Path workDir = + new Path(new Path(System.getProperty("test.build.data", "."), "data"), + "TestNLineInputFormat"); + + public void testFormat() throws Exception { + Job job = new Job(conf); + Path file = new Path(workDir, "test.txt"); + + int seed = new Random().nextInt(); + Random random = new Random(seed); + + localFs.delete(workDir, true); + FileInputFormat.setInputPaths(job, workDir); + int numLinesPerMap = 5; + NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap); + // for a variety of lengths + for (int length = 0; length < MAX_LENGTH; + length += random.nextInt(MAX_LENGTH / 10) + 1) { + // create a file with length entries + Writer writer = new OutputStreamWriter(localFs.create(file)); + try { + for (int i = 0; i < length; i++) { + writer.write(Integer.toString(i)); + writer.write("\n"); + } + } finally { + writer.close(); + } + checkFormat(job, numLinesPerMap); + } + } + + void checkFormat(Job job, int expectedN) + throws IOException, InterruptedException { + NLineInputFormat format = new NLineInputFormat(); + List splits = format.getSplits(job); + // check all splits except last one + int count = 0; + for (int i = 0; i < splits.size() -1; i++) { + assertEquals("There are no split locations", 0, + splits.get(i).getLocations().length); + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = format.createRecordReader( + splits.get(i), context); + Class clazz = reader.getClass(); + assertEquals("reader class is LineRecordReader.", + LineRecordReader.class, clazz); + MapContext mcontext = + new MapContext( + job.getConfiguration(), context.getTaskAttemptID(), reader, null, + null, MapReduceTestUtil.createDummyReporter(), splits.get(i)); + reader.initialize(splits.get(i), mcontext); + + try { + count = 0; + while (reader.nextKeyValue()) { + count++; + } + } finally { + reader.close(); + } + assertEquals("number of lines in split is " + expectedN , + expectedN, count); + } + } + + public static void main(String[] args) throws Exception { + new TestNLineInputFormat().testFormat(); + } +}