From hadoop-commits-return-621-apmail-lucene-hadoop-commits-archive=lucene.apache.org@lucene.apache.org Fri Sep 08 21:20:08 2006 Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 9435 invoked from network); 8 Sep 2006 21:20:07 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 8 Sep 2006 21:20:07 -0000 Received: (qmail 13014 invoked by uid 500); 8 Sep 2006 21:20:07 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 13002 invoked by uid 500); 8 Sep 2006 21:20:07 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 12993 invoked by uid 99); 8 Sep 2006 21:20:07 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Sep 2006 14:20:07 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Sep 2006 14:20:04 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 68A501A981A; Fri, 8 Sep 2006 14:19:44 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r441653 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/io/ src/test/org/apache/hadoop/io/compress/ src/test/org/apache... Date: Fri, 08 Sep 2006 21:19:43 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20060908211944.68A501A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Fri Sep 8 14:19:41 2006 New Revision: 441653 URL: http://svn.apache.org/viewvc?view=rev&rev=441653 Log: HADOOP-474. Add CompressionCodecFactory and use it in TextInputFormat and TextOutputFormat. Also add gzip codec and fix some problems with UTF8 text inputs. Contributed by Owen. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Sep 8 14:19:41 2006 @@ -161,6 +161,13 @@ 40. HADOOP-517. Fix a contrib/streaming bug in end-of-line detection. (Hairong Kuang via cutting) +41. HADOOP-474. Add CompressionCodecFactory, and use it in + TextInputFormat and TextOutputFormat. Compressed input files are + automatically decompressed when they have the correct extension. + Output files will, when output compression is specified, be + generated with an approprate extension. Also add a gzip codec and + fix problems with UTF8 text inputs. (omalley via cutting) + Release 0.5.0 - 2006-08-04 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri Sep 8 14:19:41 2006 @@ -87,6 +87,13 @@ facilitate opening large map files using less memory. + + io.compression.codecs + org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec + A list of the compression codec classes that can be used + for compression/decompression. + + @@ -441,6 +448,21 @@ --> + mapred.output.compress + false + Should the outputs of the reduces be compressed? + + + + + mapred.output.compression.codec + org.apache.hadoop.io.compress.DefaultCodec + If the reduce outputs are compressed, how should they be + compressed? + + + + mapred.compress.map.output false Should the outputs of the maps be compressed before being @@ -449,7 +471,7 @@ - mapred.seqfile.compress.blocksize + io.seqfile.compress.blocksize 1000000 The minimum block size for compression in block compressed SequenceFiles. @@ -457,7 +479,7 @@ - mapred.seqfile.lazydecompress + io.seqfile.lazydecompress true Should values of block-compressed SequenceFiles be decompressed only when necessary. @@ -465,7 +487,7 @@ - mapred.seqfile.sorter.recordlimit + io.seqfile.sorter.recordlimit 1000000 The limit on number of records to be kept in memory in a spill in SequenceFiles.Sorter @@ -473,8 +495,8 @@ - mapred.seqfile.compression.type - NONE + io.seqfile.compression.type + RECORD The default compression type for SequenceFile.Writer. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Sep 8 14:19:41 2006 @@ -28,6 +28,7 @@ import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -63,6 +64,27 @@ /** Compress sequences of records together in blocks. */ BLOCK } + + /** + * Get the compression type for the reduce outputs + * @param job the job config to look in + * @return the kind of compression to use + */ + static public CompressionType getCompressionType(Configuration job) { + String name = job.get("io.seqfile.compression.type"); + return name == null ? CompressionType.RECORD : + CompressionType.valueOf(name); + } + + /** + * Set the compression type for sequence files. + * @param job the configuration to modify + * @param val the new compression type (none, block, record) + */ + static public void setCompressionType(Configuration job, + CompressionType val) { + job.set("io.seqfile.compression.type", val.toString()); + } /** * Construct the preferred type of SequenceFile Writer. @@ -685,7 +707,7 @@ Class keyClass, Class valClass, CompressionCodec codec) throws IOException { super.init(name, fs.create(name), keyClass, valClass, true, codec); - init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000)); + init(conf.getInt("io.seqfile.compress.blocksize", 1000000)); initializeFileHeader(); writeFileHeader(); @@ -699,7 +721,7 @@ throws IOException { super.init(name, fs.create(name, progress), keyClass, valClass, true, codec); - init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000)); + init(conf.getInt("io.seqfile.compress.blocksize", 1000000)); initializeFileHeader(); writeFileHeader(); @@ -998,7 +1020,7 @@ } - lazyDecompress = conf.getBoolean("mapred.seqfile.lazydecompress", true); + lazyDecompress = conf.getBoolean("io.seqfile.lazydecompress", true); } /** Close the file. */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java Fri Sep 8 14:19:41 2006 @@ -173,18 +173,29 @@ * @exception CharacterCodingException if the array contains invalid UTF8 code */ public void set(byte[] utf8) throws CharacterCodingException { - validateUTF8(utf8); - set(utf8, utf8.length); + set(utf8, 0, utf8.length); } /** copy a text. */ public void set(Text other) { - set(other.bytes, other.length); + try { + set(other.bytes, 0, other.length); + } catch (CharacterCodingException e) { + throw new RuntimeException("bad Text UTF8 encoding", e); + } } - private void set(byte[] utf8, int len ) { + /** + * Set the Text to range of bytes + * @param utf8 the data to copy from + * @param start the first position of the new string + * @param len the number of bytes of the new string + */ + public void set(byte[] utf8, int start, int len + ) throws CharacterCodingException{ + validateUTF8(utf8, start, len); setCapacity(len); - System.arraycopy(utf8, 0, bytes, 0, len); + System.arraycopy(utf8, start, bytes, 0, len); this.length = len; } @@ -416,10 +427,17 @@ * @exception MalformedInputException if the byte array contains invalid utf-8 */ public static void validateUTF8(byte[] utf8) throws MalformedInputException { - validateUTF(utf8, 0, utf8.length); + validateUTF8(utf8, 0, utf8.length); } - public static void validateUTF(byte[] utf8, int start, int len) + /** + * Check to see if a byte array is valid utf-8 + * @param utf8 the array of bytes + * @param start the offset of the first byte in the array + * @param len the length of the byte sequence + * @throws MalformedInputException if the byte array contains invalid bytes + */ + public static void validateUTF8(byte[] utf8, int start, int len) throws MalformedInputException { int count = start; int leadByte = 0; Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java?view=auto&rev=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java Fri Sep 8 14:19:41 2006 @@ -0,0 +1,229 @@ +/* + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * A factory that will find the correct codec for a given filename. + * @author Owen O'Malley + */ +public class CompressionCodecFactory { + + public static final Log LOG = + LogFactory.getLog(CompressionCodecFactory.class.getName()); + + /** + * A map from the reversed filename suffixes to the codecs. + * This is probably overkill, because the maps should be small, but it + * automatically supports finding the longest matching suffix. + */ + private SortedMap codecs = null; + + private void addCodec(CompressionCodec codec) { + String suffix = codec.getDefaultExtension(); + codecs.put(new StringBuffer(suffix).reverse().toString(), codec); + } + + /** + * Print the extension map out as a string. + */ + public String toString() { + StringBuffer buf = new StringBuffer(); + Iterator> itr = + codecs.entrySet().iterator(); + buf.append("{ "); + if (itr.hasNext()) { + Map.Entry entry = itr.next(); + buf.append(entry.getKey()); + buf.append(": "); + buf.append(entry.getValue().getClass().getName()); + while (itr.hasNext()) { + entry = itr.next(); + buf.append(", "); + buf.append(entry.getKey()); + buf.append(": "); + buf.append(entry.getValue().getClass().getName()); + } + } + buf.append(" }"); + return buf.toString(); + } + + /** + * Get the list of codecs listed in the configuration + * @param conf the configuration to look in + * @return a list of the Configuration classes or null if the attribute + * was not set + */ + public static List getCodecClasses(Configuration conf) { + String codecsString = conf.get("io.compression.codecs"); + if (codecsString != null) { + List result = new ArrayList(); + StringTokenizer codecSplit = new StringTokenizer(codecsString, ","); + while (codecSplit.hasMoreElements()) { + String codecSubstring = codecSplit.nextToken(); + if (codecSubstring.length() != 0) { + try { + Class cls = conf.getClassByName(codecSubstring); + if (!CompressionCodec.class.isAssignableFrom(cls)) { + throw new IllegalArgumentException("Class " + codecSubstring + + " is not a CompressionCodec"); + } + result.add(cls); + } catch (ClassNotFoundException ex) { + throw new IllegalArgumentException("Compression codec " + + codecSubstring + " not found.", + ex); + } + } + } + return result; + } else { + return null; + } + } + + /** + * Sets a list of codec classes in the configuration. + * @param conf the configuration to modify + * @param classes the list of classes to set + */ + public static void setCodecClasses(Configuration conf, + List classes) { + StringBuffer buf = new StringBuffer(); + Iterator itr = classes.iterator(); + if (itr.hasNext()) { + Class cls = itr.next(); + buf.append(cls.getName()); + while(itr.hasNext()) { + buf.append(','); + buf.append(itr.next().getName()); + } + } + conf.set("io.compression.codecs",buf.toString()); + } + + /** + * Find the codecs specified in the config value io.compression.codecs + * and register them. Defaults to gzip and zip. + */ + public CompressionCodecFactory(Configuration conf) { + codecs = new TreeMap(); + List codecClasses = getCodecClasses(conf); + if (codecClasses == null) { + addCodec(new GzipCodec()); + addCodec(new DefaultCodec()); + } else { + Iterator itr = codecClasses.iterator(); + while (itr.hasNext()) { + CompressionCodec codec = + (CompressionCodec) ReflectionUtils.newInstance(itr.next(), conf); + addCodec(codec); + } + } + } + + /** + * Find the relevant compression codec for the given file based on its + * filename suffix. + * @param file the filename to check + * @return the codec object + */ + public CompressionCodec getCodec(Path file) { + CompressionCodec result = null; + if (codecs != null) { + String filename = file.getName(); + String reversedFilename = new StringBuffer(filename).reverse().toString(); + SortedMap subMap = + codecs.headMap(reversedFilename); + if (!subMap.isEmpty()) { + String potentialSuffix = subMap.lastKey(); + if (reversedFilename.startsWith(potentialSuffix)) { + result = codecs.get(potentialSuffix); + } + } + } + return result; + } + + /** + * Removes a suffix from a filename, if it has it. + * @param filename the filename to strip + * @param suffix the suffix to remove + * @return the shortened filename + */ + public static String removeSuffix(String filename, String suffix) { + if (filename.endsWith(suffix)) { + return filename.substring(0, filename.length() - suffix.length()); + } + return filename; + } + + /** + * A little test program. + * @param args + */ + public static void main(String[] args) throws Exception { + Configuration conf = new org.apache.hadoop.mapred.JobConf(); + CompressionCodecFactory factory = new CompressionCodecFactory(conf); + boolean encode = false; + for(int i=0; i < args.length; ++i) { + if ("-in".equals(args[i])) { + encode = true; + } else if ("-out".equals(args[i])) { + encode = false; + } else { + CompressionCodec codec = factory.getCodec(new Path(args[i])); + if (codec == null) { + System.out.println("Codec for " + args[i] + " not found."); + } else { + if (encode) { + CompressionOutputStream out = + codec.createOutputStream(new java.io.FileOutputStream(args[i])); + byte[] buffer = new byte[100]; + String inFilename = removeSuffix(args[i], + codec.getDefaultExtension()); + java.io.InputStream in = new java.io.FileInputStream(inFilename); + int len = in.read(buffer); + while (len > 0) { + out.write(buffer, 0, len); + len = in.read(buffer); + } + in.close(); + out.close(); + } else { + CompressionInputStream in = + codec.createInputStream(new java.io.FileInputStream(args[i])); + byte[] buffer = new byte[100]; + int len = in.read(buffer); + while (len > 0) { + System.out.write(buffer, 0, len); + len = in.read(buffer); + } + in.close(); + } + } + } + } + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?view=auto&rev=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Fri Sep 8 14:19:41 2006 @@ -0,0 +1,118 @@ +/* + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.*; +import java.util.zip.GZIPOutputStream; +import java.util.zip.GZIPInputStream; + +import org.apache.hadoop.io.compress.DefaultCodec; + +/** + * This class creates gzip compressors/decompressors. + * @author Owen O'Malley + */ +public class GzipCodec extends DefaultCodec { + + /** + * A bridge that wraps around a DeflaterOutputStream to make it + * a CompressionOutputStream. + * @author Owen O'Malley + */ + protected static class GzipOutputStream extends DefaultCompressionOutputStream { + private static class ResetableGZIPOutputStream extends GZIPOutputStream { + public ResetableGZIPOutputStream(OutputStream out) throws IOException { + super(out); + } + + public void resetState() throws IOException { + def.reset(); + } + } + + public GzipOutputStream(OutputStream out) throws IOException { + super(new ResetableGZIPOutputStream(out)); + } + + /** + * Allow children types to put a different type in here. + * @param out the Deflater stream to use + */ + protected GzipOutputStream(DefaultCompressionOutputStream out) { + super(out); + } + + + public void resetState() throws IOException { + ((ResetableGZIPOutputStream) out).resetState(); + } + + } + + protected static class GzipInputStream extends DefaultCompressionInputStream { + + private static class ResetableGZIPInputStream extends GZIPInputStream { + public ResetableGZIPInputStream(InputStream in) throws IOException { + super(in); + } + + public void resetState() throws IOException { + inf.reset(); + } + } + + public GzipInputStream(InputStream in) throws IOException { + super(new ResetableGZIPInputStream(in)); + } + + /** + * Allow subclasses to directly set the inflater stream. + */ + protected GzipInputStream(DefaultCompressionInputStream in) { + super(in); + } + } + + /** + * Create a stream compressor that will write to the given output stream. + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to + */ + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return new GzipOutputStream(out); + } + + /** + * Create a stream decompressor that will read from the given input stream. + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + */ + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return new GzipInputStream(in); + } + + /** + * Get the default filename extension for this kind of compression. + * @return the extension including the '.' + */ + public String getDefaultExtension() { + return ".gz"; + } + +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Fri Sep 8 14:19:41 2006 @@ -40,6 +40,17 @@ this.minSplitSize = minSplitSize; } + /** + * Is the given filename splitable? Usually, true, but if the file is + * stream compressed, it will not be. + * @param fs the file system that the file is on + * @param filename the file name to check + * @return is this file splitable? + */ + protected boolean isSplitable(FileSystem fs, Path filename) { + return true; + } + public abstract RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, @@ -117,15 +128,12 @@ Path[] files = listPaths(fs, job); + long totalSize = 0; // compute total size for (int i = 0; i < files.length; i++) { // check we have valid files Path file = files[i]; if (fs.isDirectory(file) || !fs.exists(file)) { throw new IOException("Not a file: "+files[i]); } - } - - long totalSize = 0; // compute total size - for (int i = 0; i < files.length; i++) { totalSize += fs.getLength(files[i]); } @@ -138,19 +146,24 @@ for (int i = 0; i < files.length; i++) { Path file = files[i]; long length = fs.getLength(file); - long blockSize = fs.getBlockSize(file); - long splitSize = computeSplitSize(goalSize, minSize, blockSize); - - long bytesRemaining = length; - while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { - splits.add(new FileSplit(file, length-bytesRemaining, splitSize)); - bytesRemaining -= splitSize; - } - - if (bytesRemaining != 0) { - splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining)); + if (isSplitable(fs, file)) { + long blockSize = fs.getBlockSize(file); + long splitSize = computeSplitSize(goalSize, minSize, blockSize); + + long bytesRemaining = length; + while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { + splits.add(new FileSplit(file, length-bytesRemaining, splitSize)); + bytesRemaining -= splitSize; + } + + if (bytesRemaining != 0) { + splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining)); + } + } else { + if (length != 0) { + splits.add(new FileSplit(file, 0, length)); + } } - //LOG.info( "Generating splits for " + i + "th file: " + file.getName() ); } //LOG.info( "Total # of splits: " + splits.size() ); return (FileSplit[])splits.toArray(new FileSplit[splits.size()]); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Fri Sep 8 14:19:41 2006 @@ -37,6 +37,7 @@ import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; @@ -332,6 +333,38 @@ */ public boolean getCompressMapOutput() { return getBoolean("mapred.compress.map.output", false); + } + + /** + * Set the given class as the compression codec for the map outputs. + * @param codecClass the CompressionCodec class that will compress the + * map outputs + */ + public void setMapOutputCompressorClass(Class codecClass) { + setCompressMapOutput(true); + setClass("mapred.output.compression.codec", codecClass, + CompressionCodec.class); + } + + /** + * Get the codec for compressing the map outputs + * @param defaultValue the value to return if it is not set + * @return the CompressionCodec class that should be used to compress the + * map outputs + * @throws IllegalArgumentException if the class was specified, but not found + */ + public Class getMapOutputCompressorClass(Class defaultValue) { + String name = get("mapred.output.compression.codec"); + if (name == null) { + return defaultValue; + } else { + try { + return getClassByName(name); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Compression codec " + name + + " was not found.", e); + } + } } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Sep 8 14:19:41 2006 @@ -20,8 +20,11 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.commons.logging.*; @@ -123,21 +126,27 @@ final int partitions = job.getNumReduceTasks(); final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions]; try { + Reporter reporter = getReporter(umbilical, getProgress()); FileSystem localFs = FileSystem.getNamed("local", job); - /** TODO: Figure out a way to deprecate 'mapred.compress.map.output' */ - boolean compressTemps = job.getBoolean("mapred.compress.map.output", - false); + CompressionCodec codec = null; + CompressionType compressionType = CompressionType.NONE; + if (job.getCompressMapOutput()) { + // find the kind of compression to do, defaulting to record + compressionType = SequenceFile.getCompressionType(job); + + // find the right codec + Class codecClass = + job.getMapOutputCompressorClass(DefaultCodec.class); + codec = (CompressionCodec) + ReflectionUtils.newInstance(codecClass, job); + } for (int i = 0; i < partitions; i++) { + Path filename = mapOutputFile.getOutputFile(getTaskId(), i); outs[i] = - SequenceFile.createWriter(localFs, job, - this.mapOutputFile.getOutputFile(getTaskId(), i), - job.getMapOutputKeyClass(), - job.getMapOutputValueClass(), - compressTemps ? CompressionType.RECORD : - CompressionType.valueOf( - job.get("mapred.seqfile.compression.type", - "NONE")) - ); + SequenceFile.createWriter(localFs, job, filename, + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + compressionType, codec, reporter); LOG.info("opened "+this.mapOutputFile.getOutputFile(getTaskId(), i).getName()); } @@ -157,7 +166,6 @@ }; OutputCollector collector = partCollector; - Reporter reporter = getReporter(umbilical, getProgress()); boolean combining = job.getCombinerClass() != null; if (combining) { // add combining collector Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Fri Sep 8 14:19:41 2006 @@ -20,10 +20,63 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.util.Progressable; /** A base class for {@link OutputFormat}. */ public abstract class OutputFormatBase implements OutputFormat { + + /** + * Set whether the output of the reduce is compressed + * @param val the new setting + */ + public static void setCompressOutput(JobConf conf, boolean val) { + conf.setBoolean("mapred.output.compress", val); + } + + /** + * Is the reduce output compressed? + * @return true, if the output should be compressed + */ + public static boolean getCompressOutput(JobConf conf) { + return conf.getBoolean("mapred.output.compress", false); + } + + /** + * Set the given class as the output compression codec. + * @param conf the JobConf to modify + * @param codecClass the CompressionCodec class that will compress the + * reduce outputs + */ + public static void setOutputCompressorClass(JobConf conf, Class codecClass) { + setCompressOutput(conf, true); + conf.setClass("mapred.output.compression.codec", codecClass, + CompressionCodec.class); + } + + /** + * Get the codec for compressing the reduce outputs + * @param conf the Configuration to look in + * @param defaultValue the value to return if it is not set + * @return the CompressionCodec class that should be used to compress the + * reduce outputs + * @throws IllegalArgumentException if the class was specified, but not found + */ + public static Class getOutputCompressorClass(JobConf conf, + Class defaultValue) { + String name = conf.get("mapred.output.compression.codec"); + if (name == null) { + return defaultValue; + } else { + try { + return conf.getClassByName(name); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Compression codec " + name + + " was not found.", e); + } + } + } + public abstract RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Sep 8 14:19:41 2006 @@ -27,8 +27,10 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.*; /** An {@link OutputFormat} that writes {@link SequenceFile}s. */ public class SequenceFileOutputFormat extends OutputFormatBase { @@ -38,18 +40,23 @@ throws IOException { Path file = new Path(job.getOutputPath(), name); + CompressionCodec codec = null; + CompressionType compressionType = CompressionType.NONE; + if (getCompressOutput(job)) { + // find the kind of compression to do + compressionType = SequenceFile.getCompressionType(job); - /** TODO: Figure out a way to deprecate 'mapred.output.compress' */ + // find the right codec + Class codecClass = getOutputCompressorClass(job, DefaultCodec.class); + codec = (CompressionCodec) + ReflectionUtils.newInstance(codecClass, job); + } final SequenceFile.Writer out = SequenceFile.createWriter(fs, job, file, job.getOutputKeyClass(), job.getOutputValueClass(), - job.getBoolean("mapred.output.compress", false) ? - CompressionType.RECORD : - CompressionType.valueOf( - job.get("mapred.seqfile.compression.type", - "NONE") - ), + compressionType, + codec, progress); return new RecordWriter() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Fri Sep 8 14:19:41 2006 @@ -16,106 +16,148 @@ package org.apache.hadoop.mapred; -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataInputStream; +import java.io.*; +import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.io.compress.*; /** An {@link InputFormat} for plain text files. Files are broken into lines. * Either linefeed or carriage-return are used to signal end of line. Keys are * the position in the file, and values are the line of text.. */ -public class TextInputFormat extends InputFormatBase { +public class TextInputFormat extends InputFormatBase implements JobConfigurable { + + private CompressionCodecFactory compressionCodecs = null; + + public void configure(JobConf conf) { + compressionCodecs = new CompressionCodecFactory(conf); + } + + protected boolean isSplitable(FileSystem fs, Path file) { + return compressionCodecs.getCodec(file) == null; + } + + protected static class LineRecordReader implements RecordReader { + private long pos; + private long end; + private BufferedInputStream in; + private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256); + /** + * Provide a bridge to get the bytes from the ByteArrayOutputStream + * without creating a new byte array. + */ + private static class TextStuffer extends OutputStream { + public Text target; + public void write(int b) { + throw new UnsupportedOperationException("write(byte) not supported"); + } + public void write(byte[] data, int offset, int len) throws IOException { + target.set(data, offset, len); + } + } + private TextStuffer bridge = new TextStuffer(); + + public LineRecordReader(InputStream in, long offset, long endOffset) { + this.in = new BufferedInputStream(in); + this.pos = offset; + this.end = endOffset; + } + + public WritableComparable createKey() { + return new LongWritable(); + } + + public Writable createValue() { + return new Text(); + } + + /** Read a line. */ + public synchronized boolean next(Writable key, Writable value) + throws IOException { + if (pos >= end) + return false; + + ((LongWritable)key).set(pos); // key is position + buffer.reset(); + long bytesRead = readLine(in, buffer); + if (bytesRead == 0) { + return false; + } + pos += bytesRead; + bridge.target = (Text) value; + buffer.writeTo(bridge); + return true; + } + + public synchronized long getPos() throws IOException { + return pos; + } + + public synchronized void close() throws IOException { + in.close(); + } + } + public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(split.toString()); - final long start = split.getStart(); - final long end = start + split.getLength(); + long start = split.getStart(); + long end = start + split.getLength(); + final Path file = split.getPath(); + final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split - final FSDataInputStream in = fs.open(split.getPath()); + FSDataInputStream fileIn = fs.open(split.getPath()); + InputStream in = fileIn; - if (start != 0) { - in.seek(start-1); - while (in.getPos() < end) { // scan to the next newline in the file - char c = (char)in.read(); - if (c == '\n') - break; - - if (c == '\r') { - long curPos = in.getPos(); - char nextC = (char)in.read(); - if (nextC != '\n') { - in.seek(curPos); - } - - break; - } - } + if (codec != null) { + in = codec.createInputStream(fileIn); + end = Long.MAX_VALUE; + } else if (start != 0) { + fileIn.seek(start-1); + readLine(fileIn, null); + start = fileIn.getPos(); } - - return new RecordReader() { - - public WritableComparable createKey() { - return new LongWritable(); - } - - public Writable createValue() { - return new Text(); - } - - /** Read a line. */ - public synchronized boolean next(Writable key, Writable value) - throws IOException { - long pos = in.getPos(); - if (pos >= end) - return false; - - ((LongWritable)key).set(pos); // key is position - ((Text)value).set(readLine(in)); // value is line - return true; - } - - public synchronized long getPos() throws IOException { - return in.getPos(); - } - - public synchronized void close() throws IOException { in.close(); } - - }; + + return new LineRecordReader(in, start, end); } - private static String readLine(FSDataInputStream in) throws IOException { - StringBuffer buffer = new StringBuffer(); + public static long readLine(InputStream in, + OutputStream out) throws IOException { + long bytes = 0; while (true) { int b = in.read(); - if (b == -1) + if (b == -1) { break; - - char c = (char)b; // bug: this assumes eight-bit characters. - if (c == '\n') + } + bytes += 1; + + byte c = (byte)b; + if (c == '\n') { break; - - if (c == '\r') { - long curPos = in.getPos(); - char nextC = (char)in.read(); + } + + if (c == '\r') { + in.mark(1); + byte nextC = (byte)in.read(); if (nextC != '\n') { - in.seek(curPos); + in.reset(); + } else { + bytes += 1; } - break; } - buffer.append(c); + if (out != null) { + out.write(c); + } } - - return buffer.toString(); + return bytes; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Fri Sep 8 14:19:41 2006 @@ -16,6 +16,7 @@ package org.apache.hadoop.mapred; +import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; @@ -24,30 +25,51 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.util.*; /** An {@link OutputFormat} that writes plain text files. */ public class TextOutputFormat extends OutputFormatBase { + protected static class LineRecordWriter implements RecordWriter { + private DataOutputStream out; + + public LineRecordWriter(DataOutputStream out) { + this.out = out; + } + + public synchronized void write(WritableComparable key, Writable value) + throws IOException { + out.write(key.toString().getBytes("UTF-8")); + out.writeByte('\t'); + out.write(value.toString().getBytes("UTF-8")); + out.writeByte('\n'); + } + public synchronized void close(Reporter reporter) throws IOException { + out.close(); + } + } + public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress) throws IOException { - Path file = new Path(job.getOutputPath(), name); - - final FSDataOutputStream out = fs.create(file, progress); - - return new RecordWriter() { - public synchronized void write(WritableComparable key, Writable value) - throws IOException { - out.write(key.toString().getBytes("UTF-8")); - out.writeByte('\t'); - out.write(value.toString().getBytes("UTF-8")); - out.writeByte('\n'); - } - public synchronized void close(Reporter reporter) throws IOException { - out.close(); - } - }; + Path dir = job.getOutputPath(); + boolean isCompressed = getCompressOutput(job); + if (!isCompressed) { + FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress); + return new LineRecordWriter(fileOut); + } else { + Class codecClass = getOutputCompressorClass(job, GzipCodec.class); + // create the named codec + CompressionCodec codec = (CompressionCodec) + ReflectionUtils.newInstance(codecClass, job); + // build the filename including the extension + Path filename = new Path(dir, name + codec.getDefaultExtension()); + FSDataOutputStream fileOut = fs.create(filename, progress); + return new LineRecordWriter(new DataOutputStream + (codec.createOutputStream(fileOut))); + } } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java Fri Sep 8 14:19:41 2006 @@ -198,7 +198,7 @@ Text text = new Text("abcd\u20acbdcd\u20ac"); byte [] utf8 = text.getBytes(); int length = text.getLength(); - Text.validateUTF(utf8, 0, length); + Text.validateUTF8(utf8, 0, length); } public void testTextText() throws CharacterCodingException { Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java?view=auto&rev=441653 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Fri Sep 8 14:19:41 2006 @@ -0,0 +1,102 @@ +/* + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.*; + +import junit.framework.TestCase; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; + +public class TestCodecFactory extends TestCase { + + private static class BaseCodec implements CompressionCodec { + public CompressionOutputStream createOutputStream(OutputStream out) { + return null; + } + + public CompressionInputStream createInputStream(InputStream in) { + return null; + } + + public String getDefaultExtension() { + return ".base"; + } + } + + private static class BarCodec extends BaseCodec { + public String getDefaultExtension() { + return "bar"; + } + } + + private static class FooBarCodec extends BaseCodec { + public String getDefaultExtension() { + return ".foo.bar"; + } + } + + private static class FooCodec extends BaseCodec { + public String getDefaultExtension() { + return ".foo"; + } + } + + /** + * Returns a factory for a given set of codecs + * @param classes the codec classes to include + * @return a new factory + */ + private static CompressionCodecFactory setClasses(Class[] classes) { + Configuration conf = new Configuration(); + CompressionCodecFactory.setCodecClasses(conf, Arrays.asList(classes)); + return new CompressionCodecFactory(conf); + } + + private static void checkCodec(String msg, + Class expected, CompressionCodec actual) { + assertEquals(msg + " unexpected codec found", + expected.getName(), + actual.getClass().getName()); + } + + public static void testFinding() { + CompressionCodecFactory factory = + new CompressionCodecFactory(new Configuration()); + CompressionCodec codec = factory.getCodec(new Path("/tmp/foo.bar")); + assertEquals("default factory foo codec", null, codec); + codec = factory.getCodec(new Path("/tmp/foo.gz")); + checkCodec("default factory for .gz", GzipCodec.class, codec); + factory = setClasses(new Class[0]); + codec = factory.getCodec(new Path("/tmp/foo.bar")); + assertEquals("empty codec bar codec", null, codec); + codec = factory.getCodec(new Path("/tmp/foo.gz")); + assertEquals("empty codec gz codec", null, codec); + factory = setClasses(new Class[]{BarCodec.class, FooCodec.class, + FooBarCodec.class}); + codec = factory.getCodec(new Path("/tmp/.foo.bar.gz")); + assertEquals("full factory gz codec", null, codec); + codec = factory.getCodec(new Path("/tmp/foo.bar")); + checkCodec("full factory bar codec", BarCodec.class, codec); + codec = factory.getCodec(new Path("/tmp/foo/baz.foo.bar")); + checkCodec("full factory foo bar codec", FooBarCodec.class, codec); + codec = factory.getCodec(new Path("/tmp/foo.foo")); + checkCodec("full factory foo codec", FooCodec.class, codec); + } +} Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Fri Sep 8 14:19:41 2006 @@ -17,7 +17,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; -import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.lib.*; import junit.framework.TestCase; import java.io.*; @@ -83,7 +82,6 @@ * as many times as we were instructed. */ static class RandomGenMapper implements Mapper { - Random r = new Random(); public void configure(JobConf job) { } @@ -105,7 +103,6 @@ } public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException { - int keyint = ((IntWritable) key).get(); while (it.hasNext()) { int val = ((IntWritable) it.next()).get(); out.collect(new Text("" + val), new Text("")); @@ -136,7 +133,6 @@ } public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException { - long pos = ((LongWritable) key).get(); Text str = (Text) val; out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1)); @@ -203,7 +199,6 @@ private static int range = 10; private static int counts = 100; private static Random r = new Random(); - private static Configuration conf = new Configuration(); /** public TestMapRed(int range, int counts, Configuration conf) throws IOException { @@ -252,19 +247,14 @@ private static class MyReduce extends IdentityReducer { private JobConf conf; private boolean compressInput; - private boolean compressOutput; private String taskId; - private int partition; private boolean first = true; public void configure(JobConf conf) { this.conf = conf; compressInput = conf.getBoolean("mapred.compress.map.output", false); - compressOutput = conf.getBoolean("mapred.compress.output", - false); taskId = conf.get("mapred.task.id"); - partition = conf.getInt("mapred.task.partition", -1); } public void reduce(WritableComparable key, Iterator values, @@ -295,6 +285,7 @@ Path inDir = new Path(testdir, "in"); Path outDir = new Path(testdir, "out"); FileSystem fs = FileSystem.get(conf); + fs.delete(testdir); conf.setInputPath(inDir); conf.setOutputPath(outDir); conf.setMapperClass(MyMap.class); @@ -306,10 +297,10 @@ conf.setCombinerClass(IdentityReducer.class); } if (compressMapOutput) { - conf.setBoolean("mapred.compress.map.output", true); + conf.setCompressMapOutput(true); } if (compressReduceOutput) { - conf.setBoolean("mapred.output.compress", true); + SequenceFileOutputFormat.setCompressOutput(conf, true); } try { fs.mkdirs(testdir); @@ -354,6 +345,7 @@ // // Generate distribution of ints. This is the answer key. // + JobConf conf = new JobConf(); int countsToGo = counts; int dist[] = new int[range]; for (int i = 0; i < range; i++) { @@ -376,7 +368,10 @@ fs.mkdirs(randomIns); Path answerkey = new Path(randomIns, "answer.key"); - SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey, IntWritable.class, IntWritable.class); + SequenceFile.Writer out = + SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class, + IntWritable.class, + SequenceFile.CompressionType.NONE); try { for (int i = 0; i < range; i++) { out.append(new IntWritable(i), new IntWritable(dist[i])); @@ -409,8 +404,6 @@ JobConf genJob = new JobConf(conf); genJob.setInputPath(randomIns); - genJob.setInputKeyClass(IntWritable.class); - genJob.setInputValueClass(IntWritable.class); genJob.setInputFormat(SequenceFileInputFormat.class); genJob.setMapperClass(RandomGenMapper.class); @@ -479,8 +472,6 @@ fs.delete(finalOuts); JobConf mergeJob = new JobConf(conf); mergeJob.setInputPath(intermediateOuts); - mergeJob.setInputKeyClass(IntWritable.class); - mergeJob.setInputValueClass(IntWritable.class); mergeJob.setInputFormat(SequenceFileInputFormat.class); mergeJob.setMapperClass(MergeMapper.class); @@ -564,8 +555,8 @@ } int i = 0; - int range = Integer.parseInt(argv[i++]); - int counts = Integer.parseInt(argv[i++]); - launch(); + range = Integer.parseInt(argv[i++]); + counts = Integer.parseInt(argv[i++]); + launch(); } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=441653&r1=441652&r2=441653 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Fri Sep 8 14:19:41 2006 @@ -20,20 +20,33 @@ import java.util.*; import junit.framework.TestCase; +import org.apache.commons.logging.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; -import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.compress.*; public class TestTextInputFormat extends TestCase { + private static final Log LOG = + LogFactory.getLog(TestTextInputFormat.class.getName()); private static int MAX_LENGTH = 10000; - private static Configuration conf = new Configuration(); + + private static JobConf defaultConf = new JobConf(); + private static FileSystem localFs = null; + static { + try { + localFs = FileSystem.getNamed("local", defaultConf); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + private static Path workDir = + new Path(new Path(System.getProperty("test.build.data", "."), "data"), + "TestTextInputFormat"); public void testFormat() throws Exception { - JobConf job = new JobConf(conf); - FileSystem fs = FileSystem.getNamed("local", conf); - Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); - Path file = new Path(dir, "test.txt"); + JobConf job = new JobConf(); + Path file = new Path(workDir, "test.txt"); Reporter reporter = new Reporter() { public void setStatus(String status) throws IOException {} @@ -41,20 +54,20 @@ }; int seed = new Random().nextInt(); - //LOG.info("seed = "+seed); + LOG.info("seed = "+seed); Random random = new Random(seed); - fs.delete(dir); - job.setInputPath(dir); + localFs.delete(workDir); + job.setInputPath(workDir); // for a variety of lengths for (int length = 0; length < MAX_LENGTH; length+= random.nextInt(MAX_LENGTH/10)+1) { - //LOG.info("creating; entries = " + length); + LOG.debug("creating; entries = " + length); // create a file with length entries - Writer writer = new OutputStreamWriter(fs.create(file)); + Writer writer = new OutputStreamWriter(localFs.create(file)); try { for (int i = 0; i < length; i++) { writer.write(Integer.toString(i)); @@ -65,33 +78,38 @@ } // try splitting the file in a variety of sizes - InputFormat format = new TextInputFormat(); + TextInputFormat format = new TextInputFormat(); + format.configure(job); LongWritable key = new LongWritable(); Text value = new Text(); for (int i = 0; i < 3; i++) { int numSplits = random.nextInt(MAX_LENGTH/20)+1; - //LOG.info("splitting: requesting = " + numSplits); - FileSplit[] splits = format.getSplits(fs, job, numSplits); - //LOG.info("splitting: got = " + splits.length); + LOG.debug("splitting: requesting = " + numSplits); + FileSplit[] splits = format.getSplits(localFs, job, numSplits); + LOG.debug("splitting: got = " + splits.length); // check each split BitSet bits = new BitSet(length); for (int j = 0; j < splits.length; j++) { + LOG.debug("split["+j+"]= " + splits[j].getStart() + "+" + + splits[j].getLength()); RecordReader reader = - format.getRecordReader(fs, splits[j], job, reporter); + format.getRecordReader(localFs, splits[j], job, reporter); try { int count = 0; while (reader.next(key, value)) { int v = Integer.parseInt(value.toString()); - // if (bits.get(v)) { - // LOG.info("splits["+j+"]="+splits[j]+" : " + v); - // LOG.info("@"+reader.getPos()); - // } + LOG.debug("read " + v); + if (bits.get(v)) { + LOG.warn("conflict with " + v + + " in split " + j + + " at position "+reader.getPos()); + } assertFalse("Key in multiple partitions.", bits.get(v)); bits.set(v); count++; } - //LOG.info("splits["+j+"]="+splits[j]+" count=" + count); + LOG.debug("splits["+j+"]="+splits[j]+" count=" + count); } finally { reader.close(); } @@ -102,6 +120,110 @@ } } + private InputStream makeStream(String str) throws IOException { + Text text = new Text(str); + return new ByteArrayInputStream(text.getBytes(), 0, text.getLength()); + } + + public void testUTF8() throws Exception { + InputStream in = makeStream("abcd\u20acbdcd\u20ac"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TextInputFormat.readLine(in, out); + Text line = new Text(); + line.set(out.toByteArray()); + assertEquals("readLine changed utf8 characters", + "abcd\u20acbdcd\u20ac", line.toString()); + in = makeStream("abc\u200axyz"); + out.reset(); + TextInputFormat.readLine(in, out); + line.set(out.toByteArray()); + assertEquals("split on fake newline", "abc\u200axyz", line.toString()); + } + + public void testNewLines() throws Exception { + InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TextInputFormat.readLine(in, out); + assertEquals("line1 length", 1, out.size()); + out.reset(); + TextInputFormat.readLine(in, out); + assertEquals("line2 length", 2, out.size()); + out.reset(); + TextInputFormat.readLine(in, out); + assertEquals("line3 length", 0, out.size()); + out.reset(); + TextInputFormat.readLine(in, out); + assertEquals("line4 length", 3, out.size()); + out.reset(); + TextInputFormat.readLine(in, out); + assertEquals("line5 length", 4, out.size()); + out.reset(); + TextInputFormat.readLine(in, out); + assertEquals("line5 length", 5, out.size()); + assertEquals("end of file", 0, TextInputFormat.readLine(in, out)); + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static class VoidReporter implements Reporter { + public void progress() {} + public void setStatus(String msg) {} + } + private static final Reporter voidReporter = new VoidReporter(); + + private static List readSplit(InputFormat format, + FileSplit split, + JobConf job) throws IOException { + List result = new ArrayList(); + RecordReader reader = format.getRecordReader(localFs, split, job, + voidReporter); + LongWritable key = (LongWritable) reader.createKey(); + Text value = (Text) reader.createValue(); + while (reader.next(key, value)) { + result.add(value); + value = (Text) reader.createValue(); + } + return result; + } + + /** + * Test using the gzip codec for reading + */ + public static void testGzip() throws IOException { + CompressionCodec gzip = new GzipCodec(); + localFs.delete(workDir); + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n"); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "this is a test\nof gzip\n"); + JobConf job = new JobConf(); + job.setInputPath(workDir); + TextInputFormat format = new TextInputFormat(); + format.configure(job); + FileSplit[] splits = format.getSplits(localFs, job, 100); + assertEquals("compressed splits == 2", 2, splits.length); + List results = readSplit(format, splits[0], job); + assertEquals("splits[0] length", 6, results.size()); + assertEquals("splits[0][5]", " dog", results.get(5).toString()); + results = readSplit(format, splits[1], job); + assertEquals("splits[1] length", 2, results.size()); + assertEquals("splits[1][0]", "this is a test", + results.get(0).toString()); + assertEquals("splits[1][1]", "of gzip", + results.get(1).toString()); + } + public static void main(String[] args) throws Exception { new TestTextInputFormat().testFormat(); }