Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 34953 invoked from network); 16 Apr 2007 19:35:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Apr 2007 19:35:42 -0000 Received: (qmail 32994 invoked by uid 500); 16 Apr 2007 19:35:48 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 32979 invoked by uid 500); 16 Apr 2007 19:35:48 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 32970 invoked by uid 99); 16 Apr 2007 19:35:48 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Apr 2007 12:35:48 -0700 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Apr 2007 12:35:39 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 88BAE1A9838; Mon, 16 Apr 2007 12:35:19 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r529378 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Mon, 16 Apr 2007 19:35:18 -0000 To: hadoop-commits@lucene.apache.org From: tomwhite@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070416193519.88BAE1A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tomwhite Date: Mon Apr 16 12:35:15 2007 New Revision: 529378 URL: http://svn.apache.org/viewvc?view=rev&rev=529378 Log: HADOOP-1214. Replace streaming classes with new counterparts from Hadoop core. Contributed by Runping Qi. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Apr 16 12:35:15 2007 @@ -201,6 +201,9 @@ 60. HADOOP-1256. Fix NameNode so that multiple DataNodeDescriptors can no longer be created on startup. (Hairong Kuang via cutting) +61. HADOOP-1214. Replace streaming classes with new counterparts + from Hadoop core. (Runping Qi via tomwhite) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Mon Apr 16 12:35:15 2007 @@ -20,85 +20,39 @@ import java.io.*; import java.lang.reflect.*; -import java.util.ArrayList; -import org.apache.commons.logging.*; - -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.*; -/** An input format that performs globbing on DFS paths and - * selects a RecordReader based on a JobConf property. - * @author Michel Tourn +/** An input format that selects a RecordReader based on a JobConf property. + * This should be used only for non-standard record reader such as + * StreamXmlRecordReader. For all other standard + * record readers, the appropriate input format classes should be used. */ -public class StreamInputFormat extends TextInputFormat { - - // an InputFormat should be public with the synthetic public default constructor - // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader) - - protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName()); - - static boolean isGzippedInput(JobConf job) { - String val = job.get(StreamBaseRecordReader.CONF_NS + "compression"); - return "gzip".equals(val); - } +public class StreamInputFormat extends KeyValueTextInputFormat { - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - - if (isGzippedInput(job)) { - return getFullFileSplits(job); - } else { - return super.getSplits(job, numSplits); - } - } - /** For the compressed-files case: override InputFormatBase to produce one split. */ - FileSplit[] getFullFileSplits(JobConf job) throws IOException { - Path[] files = listPaths(job); - int numSplits = files.length; - ArrayList splits = new ArrayList(numSplits); - for (int i = 0; i < files.length; i++) { - Path file = files[i]; - long splitSize = file.getFileSystem(job).getLength(file); - splits.add(new FileSplit(file, 0, splitSize, job)); + public RecordReader getRecordReader(final InputSplit genericSplit, + JobConf job, Reporter reporter) throws IOException { + String c = job.get("stream.recordreader.class"); + if (c == null || c.indexOf("LineRecordReader") >= 0) { + return super.getRecordReader(genericSplit, job, reporter); } - return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]); - } - public RecordReader getRecordReader(final InputSplit genericSplit, - JobConf job, - Reporter reporter) throws IOException { + // handling non-standard record reader (likely StreamXmlRecordReader) FileSplit split = (FileSplit) genericSplit; LOG.info("getRecordReader start.....split=" + split); reporter.setStatus(split.toString()); - long start = split.getStart(); - long length = split.getLength(); - // Open the file and seek to the start of the split FileSystem fs = split.getPath().getFileSystem(job); FSDataInputStream in = fs.open(split.getPath()); - if (isGzippedInput(job)) { - length = Long.MAX_VALUE; - } else if (start != 0) { - in.seek(start-1); - LineRecordReader.readLine(in, null); - long oldStart = start; - start = in.getPos(); - length -= (start - oldStart); - } - // Ugly hack! - split = new FileSplit(split.getPath(), start, length, job); // Factory dispatch based on available params.. Class readerClass; - String c = job.get("stream.recordreader.class"); - if (c == null) { - readerClass = StreamLineRecordReader.class; - } else { + + { readerClass = StreamUtil.goodClassOrNull(c, null); if (readerClass == null) { throw new RuntimeException("Class not found: " + c); @@ -107,27 +61,19 @@ Constructor ctor; try { - ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, FileSplit.class, - Reporter.class, JobConf.class, FileSystem.class }); + ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, + FileSplit.class, Reporter.class, JobConf.class, FileSystem.class }); } catch (NoSuchMethodException nsm) { throw new RuntimeException(nsm); } RecordReader reader; try { - reader = (RecordReader) ctor.newInstance(new Object[] { in, split, reporter, job, - fs }); + reader = (RecordReader) ctor.newInstance(new Object[] { in, split, + reporter, job, fs }); } catch (Exception nsm) { throw new RuntimeException(nsm); } - - if (reader instanceof StreamSequenceRecordReader) { - // override k/v class types with types stored in SequenceFile - StreamSequenceRecordReader ss = (StreamSequenceRecordReader) reader; - job.setInputKeyClass(ss.rin_.getKeyClass()); - job.setInputValueClass(ss.rin_.getValueClass()); - } - return reader; } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Apr 16 12:35:15 2007 @@ -59,7 +59,11 @@ import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.filecache.*; import org.apache.hadoop.util.*; import org.apache.log4j.helpers.OptionConverter; @@ -235,6 +239,10 @@ userJobConfProps_.put("fs.default.name", jt); } + additionalConfSpec_ = (String)cmdLine.getValue("-additionalconfspec"); + inputFormatSpec_ = (String)cmdLine.getValue("-inputformat"); + outputFormatSpec_ = (String)cmdLine.getValue("-outputformat"); + partitionerSpec_ = (String)cmdLine.getValue("-partitioner"); inReaderSpec_ = (String)cmdLine.getValue("-inputreader"); List car = cmdLine.getValues("-cacheArchive"); @@ -381,6 +389,14 @@ "Optional. Override DFS configuration", "|local", 1, false); Option jt = createOption("jt", "Optional. Override JobTracker configuration", "|local",1, false); + Option additionalconfspec = createOption("additionalconfspec", + "Optional.", "spec",1, false ); + Option inputformat = createOption("inputformat", + "Optional.", "spec",1, false ); + Option outputformat = createOption("outputformat", + "Optional.", "spec",1, false ); + Option partitioner = createOption("partitioner", + "Optional.", "spec",1, false ); Option inputreader = createOption("inputreader", "Optional.", "spec",1, false ); Option cacheFile = createOption("cacheFile", @@ -405,6 +421,10 @@ withOption(file). withOption(dfs). withOption(jt). + withOption(additionalconfspec). + withOption(inputformat). + withOption(outputformat). + withOption(partitioner). withOption(inputreader). withOption(jobconf). withOption(cmdenv). @@ -438,6 +458,10 @@ //System.out.println(" -config Optional. One or more paths to xml config files"); System.out.println(" -dfs |local Optional. Override DFS configuration"); System.out.println(" -jt |local Optional. Override JobTracker configuration"); + System.out.println(" -additionalconfspec specfile Optional."); + System.out.println(" -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat Optional."); + System.out.println(" -outputformat specfile Optional."); + System.out.println(" -partitioner specfile Optional."); System.out.println(" -inputreader Optional."); System.out.println(" -jobconf = Optional. Add or override a JobConf property"); System.out.println(" -cmdenv = Optional. Pass env.var to streaming commands"); @@ -645,6 +669,10 @@ } else { // use only defaults: hadoop-default.xml and hadoop-site.xml } + System.out.println("additionalConfSpec_:" + additionalConfSpec_); + if (additionalConfSpec_ != null) { + config_.addDefaultResource(new Path(additionalConfSpec_)); + } Iterator it = configPath_.iterator(); while (it.hasNext()) { String pathName = (String) it.next(); @@ -670,29 +698,53 @@ jobConf_.setBoolean("stream.inputtagged", inputTagged_); jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size()); - Class fmt; - if (testMerge_ && false == hasSimpleInputSpecs_) { - // this ignores -inputreader - fmt = MergerInputFormat.class; - } else { - // need to keep this case to support custom -inputreader - // and their parameters ,n=v,n=v - fmt = StreamInputFormat.class; + String defaultPackage = this.getClass().getPackage().getName(); + Class c; + Class fmt = null; + if (inReaderSpec_ == null && inputFormatSpec_ == null) { + fmt = KeyValueTextInputFormat.class; + } else if (inputFormatSpec_ != null) { + if ((inputFormatSpec_.compareToIgnoreCase("KeyValueTextInputFormat") == 0) + || (inputFormatSpec_ + .compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) { + fmt = KeyValueTextInputFormat.class; + } else if ((inputFormatSpec_ + .compareToIgnoreCase("SequenceFileInputFormat") == 0) + || (inputFormatSpec_ + .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) { + fmt = SequenceFileInputFormat.class; + } else if ((inputFormatSpec_ + .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0) + || (inputFormatSpec_ + .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) { + fmt = SequenceFileAsTextInputFormat.class; + } else { + c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage); + if (c != null) { + fmt = c; + } else { + + } + } + } + if (fmt == null) { + if (testMerge_ && false == hasSimpleInputSpecs_) { + // this ignores -inputreader + fmt = MergerInputFormat.class; + } else { + // need to keep this case to support custom -inputreader + // and their parameters ,n=v,n=v + fmt = StreamInputFormat.class; + } } - jobConf_.setInputFormat(fmt); - // for SequenceFile, input classes may be overriden in getRecordReader - jobConf_.setInputKeyClass(Text.class); - jobConf_.setInputValueClass(Text.class); + jobConf_.setInputFormat(fmt); jobConf_.setOutputKeyClass(Text.class); jobConf_.setOutputValueClass(Text.class); jobConf_.set("stream.addenvironment", addTaskEnvironment_); - String defaultPackage = this.getClass().getPackage().getName(); - - Class c; if (mapCmd_ != null) { c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage); if (c != null) { @@ -748,13 +800,29 @@ // output setup is done late so we can customize for reducerNone_ //jobConf_.setOutputDir(new File(output_)); setOutputSpec(); - if (testMerge_) { - fmt = MuxOutputFormat.class; - } else { - fmt = StreamOutputFormat.class; + fmt = null; + if (outputFormatSpec_!= null) { + c = StreamUtil.goodClassOrNull(outputFormatSpec_, defaultPackage); + if (c != null) { + fmt = c; + } + } + if (fmt == null) { + if (testMerge_) { + fmt = MuxOutputFormat.class; + } else { + fmt = TextOutputFormat.class; + } } jobConf_.setOutputFormat(fmt); + if (partitionerSpec_!= null) { + c = StreamUtil.goodClassOrNull(partitionerSpec_, defaultPackage); + if (c != null) { + jobConf_.setPartitionerClass(c); + } + } + // last, allow user to override anything // (although typically used with properties we didn't touch) @@ -1042,6 +1110,10 @@ protected ArrayList configPath_ = new ArrayList(); // protected String hadoopAliasConf_; protected String inReaderSpec_; + protected String inputFormatSpec_; + protected String outputFormatSpec_; + protected String partitionerSpec_; + protected String additionalConfSpec_; protected boolean testMerge_; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Mon Apr 16 12:35:15 2007 @@ -18,128 +18,21 @@ package org.apache.hadoop.streaming; -import java.io.*; +import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.mapred.LineRecordReader; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.KeyValueLineRecordReader; import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobConf; /** - * Similar to org.apache.hadoop.mapred.TextRecordReader, - * but delimits key and value with a TAB. - * @author Michel Tourn + * same as org.apache.hadoop.mapred.KeyValueLineRecordReader + * + * @deprecated */ -public class StreamLineRecordReader extends LineRecordReader { - - private String splitName; - private Reporter reporter; - private FileSplit split; - private int numRec = 0; - private int nextStatusRec = 1; - private int statusMaxRecordChars; - protected static final Log LOG = LogFactory.getLog(StreamLineRecordReader.class); - // base class uses LongWritable as key, use this. - private WritableComparable dummyKey = super.createKey(); - private Text innerValue = (Text)super.createValue(); +public class StreamLineRecordReader extends KeyValueLineRecordReader { - public StreamLineRecordReader(FSDataInputStream in, FileSplit split, - Reporter reporter, - JobConf job, FileSystem fs) throws IOException { - super(createStream(in, job), split.getStart(), - (split.getStart() + split.getLength())); - this.split = split ; - this.reporter = reporter ; - } - - private static InputStream createStream(FSDataInputStream in, JobConf job) - throws IOException{ - InputStream finalStream = in ; - boolean gzipped = StreamInputFormat.isGzippedInput(job); - if ( gzipped ) { - GzipCodec codec = new GzipCodec(); - codec.setConf(job); - finalStream = codec.createInputStream(in); - } - return finalStream; - } - - public WritableComparable createKey() { - return new Text(); - } - - public Writable createValue() { - return new Text(); - } - - public synchronized boolean next(Writable key, Writable value) throws IOException { - if (!(key instanceof Text)) { - throw new IllegalArgumentException("Key should be of type Text but: " - + key.getClass().getName()); - } - if (!(value instanceof Text)) { - throw new IllegalArgumentException("Value should be of type Text but: " - + value.getClass().getName()); - } - - Text tKey = (Text) key; - Text tValue = (Text) value; - byte[] line = null ; - int lineLen = -1; - if( super.next(dummyKey, innerValue) ){ - line = innerValue.getBytes(); - lineLen = innerValue.getLength(); - }else{ - return false; - } - if (line == null) return false; - int tab = UTF8ByteArrayUtils.findTab(line, 0, lineLen); - if (tab == -1) { - tKey.set(line, 0, lineLen); - tValue.set(""); - } else { - UTF8ByteArrayUtils.splitKeyVal(line, 0, lineLen, tKey, tValue, tab); - } - numRecStats(line, 0, lineLen); - return true; - } - - private void numRecStats(byte[] record, int start, int len) throws IOException { - numRec++; - if (numRec == nextStatusRec) { - String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars), "UTF-8"); - nextStatusRec += 100;//*= 10; - String status = getStatus(recordStr); - LOG.info(status); - reporter.setStatus(status); - } - } - - private String getStatus(CharSequence record) { - long pos = -1; - try { - pos = getPos(); - } catch (IOException io) { - } - String recStr; - if (record.length() > statusMaxRecordChars) { - recStr = record.subSequence(0, statusMaxRecordChars) + "..."; - } else { - recStr = record.toString(); - } - String unqualSplit = split.getFile().getName() + ":" + split.getStart() + "+" - + split.getLength(); - String status = "HSTR " + StreamUtil.HOST + " " + numRec + ". pos=" + pos + " " + unqualSplit - + " Processing record=" + recStr; - status += " " + splitName; - return status; + public StreamLineRecordReader(Configuration job, FileSplit split) + throws IOException { + super(job, split); } } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Mon Apr 16 12:35:15 2007 @@ -18,56 +18,11 @@ package org.apache.hadoop.streaming; -import java.io.IOException; +import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.hadoop.mapred.*; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.Writable; - -import org.apache.hadoop.util.Progressable; - -/** Similar to org.apache.hadoop.mapred.TextOutputFormat, - * but delimits key and value with a TAB. - * @author Michel Tourn +/** Same as org.apache.hadoop.mapred.TextOutputFormat, + * @deprecated */ -public class StreamOutputFormat implements OutputFormat { - - public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progr) throws IOException { - - Path file = new Path(job.getOutputPath(), name); - - final FSDataOutputStream out = fs.create(file); - - return new RecordWriter() { - - public synchronized void write(WritableComparable key, Writable value) throws IOException { - out.write(key.toString().getBytes("UTF-8")); - out.writeByte('\t'); - out.write(value.toString().getBytes("UTF-8")); - out.writeByte('\n'); - } - - public synchronized void close(Reporter reporter) throws IOException { - out.close(); - } - }; - } - - /** Check whether the output specification for a job is appropriate. Called - * when a job is submitted. Typically checks that it does not already exist, - * throwing an exception when it already exists, so that output is not - * overwritten. - * - * @param job the job whose output will be written - * @throws IOException when output should not be attempted - */ - public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { - // allow existing data (for app-level restartability) - } +public class StreamOutputFormat extends TextOutputFormat { } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Mon Apr 16 12:35:15 2007 @@ -20,80 +20,19 @@ import java.io.*; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SequenceFileRecordReader; -import org.apache.hadoop.util.ReflectionUtils; - -public class StreamSequenceRecordReader extends StreamBaseRecordReader { - - public StreamSequenceRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter, - JobConf job, FileSystem fs) throws IOException { - super(in, split, reporter, job, fs); - numFailed_ = 0; - seekNextRecordBoundary(); - // super.in_ ignored, using rin_ instead - } - - public synchronized boolean next(Writable key, Writable value) throws IOException { - boolean success; - do { - if (!more_) return false; - success = false; - try { - long pos = rin_.getPosition(); - boolean eof = rin_.next(key, value); - if (pos >= end_ && rin_.syncSeen()) { - more_ = false; - } else { - more_ = eof; - } - success = true; - } catch (IOException io) { - numFailed_++; - if (numFailed_ < 100 || numFailed_ % 100 == 0) { - err_.println("StreamSequenceRecordReader: numFailed_/numRec_=" + numFailed_ + "/" - + numRec_); - } - io.printStackTrace(err_); - success = false; - } - } while (!success); - - numRecStats(new byte[0], 0, 0); - return more_; +/** + * same as org.apache.hadoop.mapred.SequenceFileRecordReader + * + * @deprecated + */ +public class StreamSequenceRecordReader extends SequenceFileRecordReader { + + public StreamSequenceRecordReader(Configuration conf, FileSplit split) + throws IOException { + super(conf, split); } - - public void seekNextRecordBoundary() throws IOException { - rin_ = new SequenceFile.Reader(fs_, split_.getPath(), job_); - end_ = split_.getStart() + split_.getLength(); - - if (split_.getStart() > rin_.getPosition()) rin_.sync(split_.getStart()); // sync to start - - more_ = rin_.getPosition() < end_; - - reporter_.setStatus(split_.toString()); - - //return new SequenceFileRecordReader(job_, split_); - } - - public WritableComparable createKey() { - return (WritableComparable) ReflectionUtils.newInstance(rin_.getKeyClass(), null); - } - - public Writable createValue() { - return (Writable) ReflectionUtils.newInstance(rin_.getValueClass(), null); - } - - boolean more_; - SequenceFile.Reader rin_; - int numFailed_; - PrintStream err_ = System.err; - } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java Mon Apr 16 12:35:15 2007 @@ -18,6 +18,7 @@ package org.apache.hadoop.streaming; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.zip.GZIPOutputStream; @@ -29,6 +30,7 @@ { public TestGzipInput() throws IOException { + INPUT_FILE = new File("input.txt.gz"); } protected void createInput() throws IOException @@ -38,6 +40,7 @@ out.write(input.getBytes("UTF-8")); out.close(); } + protected String[] genArgs() { return new String[] { Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Mon Apr 16 12:35:15 2007 @@ -77,6 +77,11 @@ public void testCommandLine() { try { + try { + OUTPUT_DIR.getAbsoluteFile().delete(); + } catch (Exception e) { + } + createInput(); boolean mayExit = false; @@ -93,8 +98,10 @@ } catch(Exception e) { failTrace(e); } finally { + File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile(); INPUT_FILE.delete(); - OUTPUT_DIR.delete(); + outFileCRC.delete(); + OUTPUT_DIR.getAbsoluteFile().delete(); } } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Mon Apr 16 12:35:15 2007 @@ -79,7 +79,6 @@ }; fileSys.delete(new Path(OUTPUT_DIR)); - fileSys.mkdirs(new Path(OUTPUT_DIR)); DataOutputStream file = fileSys.create(new Path(INPUT_FILE)); file.writeBytes(mapString); Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Mon Apr 16 12:35:15 2007 @@ -41,13 +41,11 @@ // test that some JobConf properties are exposed as expected // Note the dots translated to underscore: // property names have been escaped in PipeMapRed.safeEnvVarName() - expect("mapred_input_format_class", "org.apache.hadoop.streaming.StreamInputFormat"); + expect("mapred_input_format_class", "org.apache.hadoop.mapred.KeyValueTextInputFormat"); expect("mapred_job_tracker", "local"); - expect("mapred_input_key_class", "org.apache.hadoop.io.Text"); - expect("mapred_input_value_class", "org.apache.hadoop.io.Text"); //expect("mapred_local_dir", "build/test/mapred/local"); expectDefined("mapred_local_dir"); - expect("mapred_output_format_class", "org.apache.hadoop.streaming.StreamOutputFormat"); + expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat"); expect("mapred_output_key_class", "org.apache.hadoop.io.Text"); expect("mapred_output_value_class", "org.apache.hadoop.io.Text"); Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?view=auto&rev=529378 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Mon Apr 16 12:35:15 2007 @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * This class treats a line in the input as a key/value pair separated by a + * separator character. The separator can be specified in config file + * under the attribute name key.value.separator.in.input.line. The default + * separator is the tab character ('\t'). + * + */ +public class KeyValueLineRecordReader extends LineRecordReader { + + private byte separator = (byte) '\t'; + + private WritableComparable dummyKey = super.createKey(); + + private Text innerValue = (Text) super.createValue(); + + public Class getKeyClass() { return Text.class; } + + public Text createKey() { + return new Text(); + } + + public KeyValueLineRecordReader(Configuration job, FileSplit split) + throws IOException { + super(job, split); + String sepStr = job.get("key.value.separator.in.input.line", "\t"); + this.separator = (byte) sepStr.charAt(0); + } + + public static int findSeparator(byte[] utf, int start, int length, byte sep) { + for (int i = start; i < (start + length); i++) { + if (utf[i] == sep) { + return i; + } + } + return -1; + } + + /** Read key/value pair in a line. */ + public synchronized boolean next(Writable key, Writable value) + throws IOException { + Text tKey = (Text) key; + Text tValue = (Text) value; + byte[] line = null; + int lineLen = -1; + if (super.next(dummyKey, innerValue)) { + line = innerValue.getBytes(); + lineLen = innerValue.getLength(); + } else { + return false; + } + if (line == null) + return false; + int pos = findSeparator(line, 0, lineLen, this.separator); + if (pos == -1) { + tKey.set(line, 0, lineLen); + tValue.set(""); + } else { + int keyLen = pos; + byte[] keyBytes = new byte[keyLen]; + System.arraycopy(line, 0, keyBytes, 0, keyLen); + int valLen = lineLen - keyLen - 1; + byte[] valBytes = new byte[valLen]; + System.arraycopy(line, pos + 1, valBytes, 0, valLen); + tKey.set(keyBytes); + tValue.set(valBytes); + } + return true; + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?view=auto&rev=529378 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Mon Apr 16 12:35:15 2007 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +/** + * An {@link InputFormat} for plain text files. Files are broken into lines. + * Either linefeed or carriage-return are used to signal end of line. Each line + * is divided into key and value parts by a separator byte. If no such a byte + * exists, the key will be the entire line and value will be empty. + */ +public class KeyValueTextInputFormat extends TextInputFormat { + + public RecordReader getRecordReader(InputSplit genericSplit, JobConf job, + Reporter reporter) throws IOException { + reporter.setStatus(genericSplit.toString()); + return new KeyValueLineRecordReader(job, (FileSplit) genericSplit); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java?view=auto&rev=529378 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java Mon Apr 16 12:35:15 2007 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +/** + * This class is similar to SequenceFileInputFormat, except it generates SequenceFileAsTextRecordReader + * which converts the input keys and values to their String forms by calling toString() method. + * + */ +public class SequenceFileAsTextInputFormat extends SequenceFileInputFormat { + + public SequenceFileAsTextInputFormat() { + super(); + } + + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + + reporter.setStatus(split.toString()); + + return new SequenceFileAsTextRecordReader(job, (FileSplit) split); + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java?view=auto&rev=529378 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java Mon Apr 16 12:35:15 2007 @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * This class converts the input keys and values to their String forms by calling toString() + * method. This class to SequenceFileAsTextInputFormat class is as LineRecordReader + * class to TextInputFormat class. + * + */ +public class SequenceFileAsTextRecordReader extends SequenceFileRecordReader { + + private Writable innerKey = super.createKey(); + private Writable innerValue = super.createValue(); + + public SequenceFileAsTextRecordReader(Configuration conf, FileSplit split) + throws IOException { + super(conf, split); + } + + public WritableComparable createKey() { + return new Text(); + } + + public Writable createValue() { + return new Text(); + } + + /** Read key/value pair in a line. */ + public synchronized boolean next(Writable key, Writable value) + throws IOException { + Text tKey = (Text) key; + Text tValue = (Text) value; + if (!super.next(innerKey, innerValue)) { + return false; + } + tKey.set(innerKey.toString()); + tValue.set(innerValue.toString()); + return true; + } +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Mon Apr 16 12:35:15 2007 @@ -32,7 +32,7 @@ private long start; private long end; private boolean more = true; - private Configuration conf; + protected Configuration conf; public SequenceFileRecordReader(Configuration conf, FileSplit split) throws IOException { Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?view=auto&rev=529378 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Mon Apr 16 12:35:15 2007 @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; + +import org.apache.commons.logging.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.util.ReflectionUtils; + +public class TestKeyValueTextInputFormat extends TestCase { + private static final Log LOG = + LogFactory.getLog(TestKeyValueTextInputFormat.class.getName()); + + private static int MAX_LENGTH = 10000; + + private static JobConf defaultConf = new JobConf(); + private static FileSystem localFs = null; + static { + try { + localFs = FileSystem.getLocal(defaultConf); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + private static Path workDir = + new Path(new Path(System.getProperty("test.build.data", "."), "data"), + "TestKeyValueTextInputFormat"); + + public void testFormat() throws Exception { + JobConf job = new JobConf(); + Path file = new Path(workDir, "test.txt"); + + // A reporter that does nothing + Reporter reporter = Reporter.NULL; + + int seed = new Random().nextInt(); + LOG.info("seed = "+seed); + Random random = new Random(seed); + + localFs.delete(workDir); + job.setInputPath(workDir); + + // for a variety of lengths + for (int length = 0; length < MAX_LENGTH; + length+= random.nextInt(MAX_LENGTH/10)+1) { + + LOG.debug("creating; entries = " + length); + + // create a file with length entries + Writer writer = new OutputStreamWriter(localFs.create(file)); + try { + for (int i = 0; i < length; i++) { + writer.write(Integer.toString(i*2)); + writer.write("\t"); + writer.write(Integer.toString(i)); + writer.write("\n"); + } + } finally { + writer.close(); + } + + // try splitting the file in a variety of sizes + TextInputFormat format = new KeyValueTextInputFormat(); + format.configure(job); + for (int i = 0; i < 3; i++) { + int numSplits = random.nextInt(MAX_LENGTH/20)+1; + LOG.debug("splitting: requesting = " + numSplits); + InputSplit[] splits = format.getSplits(job, numSplits); + LOG.debug("splitting: got = " + splits.length); + + // check each split + BitSet bits = new BitSet(length); + for (int j = 0; j < splits.length; j++) { + LOG.debug("split["+j+"]= " + splits[j]); + RecordReader reader = + format.getRecordReader(splits[j], job, reporter); + Class readerClass = reader.getClass(); + assertEquals("reader class is KeyValueLineRecordReader.", KeyValueLineRecordReader.class, readerClass); + + Writable key = reader.createKey(); + Class keyClass = key.getClass(); + Writable value = reader.createValue(); + Class valueClass = value.getClass(); + assertEquals("Key class is Text.", Text.class, keyClass); + assertEquals("Value class is Text.", Text.class, valueClass); + try { + int count = 0; + while (reader.next(key, value)) { + int v = Integer.parseInt(value.toString()); + LOG.debug("read " + v); + if (bits.get(v)) { + LOG.warn("conflict with " + v + + " in split " + j + + " at position "+reader.getPos()); + } + assertFalse("Key in multiple partitions.", bits.get(v)); + bits.set(v); + count++; + } + LOG.debug("splits["+j+"]="+splits[j]+" count=" + count); + } finally { + reader.close(); + } + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); + } + + } + } + + private InputStream makeStream(String str) throws IOException { + Text text = new Text(str); + return new ByteArrayInputStream(text.getBytes(), 0, text.getLength()); + } + + public void testUTF8() throws Exception { + InputStream in = makeStream("abcd\u20acbdcd\u20ac"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + LineRecordReader.readLine(in, out); + Text line = new Text(); + line.set(out.toByteArray()); + assertEquals("readLine changed utf8 characters", + "abcd\u20acbdcd\u20ac", line.toString()); + in = makeStream("abc\u200axyz"); + out.reset(); + LineRecordReader.readLine(in, out); + line.set(out.toByteArray()); + assertEquals("split on fake newline", "abc\u200axyz", line.toString()); + } + + public void testNewLines() throws Exception { + InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + LineRecordReader.readLine(in, out); + assertEquals("line1 length", 1, out.size()); + out.reset(); + LineRecordReader.readLine(in, out); + assertEquals("line2 length", 2, out.size()); + out.reset(); + LineRecordReader.readLine(in, out); + assertEquals("line3 length", 0, out.size()); + out.reset(); + LineRecordReader.readLine(in, out); + assertEquals("line4 length", 3, out.size()); + out.reset(); + LineRecordReader.readLine(in, out); + assertEquals("line5 length", 4, out.size()); + out.reset(); + LineRecordReader.readLine(in, out); + assertEquals("line5 length", 5, out.size()); + assertEquals("end of file", 0, LineRecordReader.readLine(in, out)); + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static final Reporter voidReporter = Reporter.NULL; + + private static List readSplit(InputFormat format, + InputSplit split, + JobConf job) throws IOException { + List result = new ArrayList(); + RecordReader reader = format.getRecordReader(split, job, + voidReporter); + Text key = (Text) reader.createKey(); + Text value = (Text) reader.createValue(); + while (reader.next(key, value)) { + result.add(value); + value = (Text) reader.createValue(); + } + return result; + } + + /** + * Test using the gzip codec for reading + */ + public static void testGzip() throws IOException { + JobConf job = new JobConf(); + CompressionCodec gzip = new GzipCodec(); + ReflectionUtils.setConf(gzip, job); + localFs.delete(workDir); + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "line-1\tthe quick\nline-2\tbrown\nline-3\tfox jumped\nline-4\tover\nline-5\t the lazy\nline-6\t dog\n"); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "line-1\tthis is a test\nline-1\tof gzip\n"); + job.setInputPath(workDir); + KeyValueTextInputFormat format = new KeyValueTextInputFormat(); + format.configure(job); + InputSplit[] splits = format.getSplits(job, 100); + assertEquals("compressed splits == 2", 2, splits.length); + FileSplit tmp = (FileSplit) splits[0]; + if (tmp.getPath().getName().equals("part2.txt.gz")) { + splits[0] = splits[1]; + splits[1] = tmp; + } + List results = readSplit(format, splits[0], job); + assertEquals("splits[0] length", 6, results.size()); + assertEquals("splits[0][5]", " dog", results.get(5).toString()); + results = readSplit(format, splits[1], job); + assertEquals("splits[1] length", 2, results.size()); + assertEquals("splits[1][0]", "this is a test", + results.get(0).toString()); + assertEquals("splits[1][1]", "of gzip", + results.get(1).toString()); + } + + public static void main(String[] args) throws Exception { + new TestKeyValueTextInputFormat().testFormat(); + } +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java?view=auto&rev=529378 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java Mon Apr 16 12:35:15 2007 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.conf.*; + +public class TestSequenceFileAsTextInputFormat extends TestCase { + private static final Log LOG = InputFormatBase.LOG; + + private static int MAX_LENGTH = 10000; + private static Configuration conf = new Configuration(); + + public void testFormat() throws Exception { + JobConf job = new JobConf(conf); + FileSystem fs = FileSystem.getLocal(conf); + Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); + Path file = new Path(dir, "test.seq"); + + Reporter reporter = Reporter.NULL; + + int seed = new Random().nextInt(); + //LOG.info("seed = "+seed); + Random random = new Random(seed); + + fs.delete(dir); + + job.setInputPath(dir); + + // for a variety of lengths + for (int length = 0; length < MAX_LENGTH; + length+= random.nextInt(MAX_LENGTH/10)+1) { + + //LOG.info("creating; entries = " + length); + + // create a file with length entries + SequenceFile.Writer writer = + SequenceFile.createWriter(fs, conf, file, + IntWritable.class, LongWritable.class); + try { + for (int i = 0; i < length; i++) { + IntWritable key = new IntWritable(i); + LongWritable value = new LongWritable(10 * i); + writer.append(key, value); + } + } finally { + writer.close(); + } + + // try splitting the file in a variety of sizes + InputFormat format = new SequenceFileAsTextInputFormat(); + + for (int i = 0; i < 3; i++) { + int numSplits = + random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1; + //LOG.info("splitting: requesting = " + numSplits); + InputSplit[] splits = format.getSplits(job, numSplits); + //LOG.info("splitting: got = " + splits.length); + + // check each split + BitSet bits = new BitSet(length); + for (int j = 0; j < splits.length; j++) { + RecordReader reader = + format.getRecordReader(splits[j], job, reporter); + Class readerClass = reader.getClass(); + assertEquals("reader class is SequenceFileAsTextRecordReader.", SequenceFileAsTextRecordReader.class, readerClass); + Text value = (Text)reader.createValue(); + Text key = (Text)reader.createKey(); + try { + int count = 0; + while (reader.next(key, value)) { + // if (bits.get(key.get())) { + // LOG.info("splits["+j+"]="+splits[j]+" : " + key.get()); + // LOG.info("@"+reader.getPos()); + // } + int keyInt = Integer.parseInt(key.toString()); + assertFalse("Key in multiple partitions.", bits.get(keyInt)); + bits.set(keyInt); + count++; + } + //LOG.info("splits["+j+"]="+splits[j]+" count=" + count); + } finally { + reader.close(); + } + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); + } + + } + } + + public static void main(String[] args) throws Exception { + new TestSequenceFileAsTextInputFormat().testFormat(); + } +}