Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 7729 invoked from network); 9 Sep 2008 13:11:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 9 Sep 2008 13:11:43 -0000 Received: (qmail 40931 invoked by uid 500); 9 Sep 2008 13:11:38 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 40859 invoked by uid 500); 9 Sep 2008 13:11:38 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 40830 invoked by uid 99); 9 Sep 2008 13:11:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Sep 2008 06:11:38 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Sep 2008 13:10:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7A489238899E; Tue, 9 Sep 2008 06:11:08 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r693455 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Tue, 09 Sep 2008 13:11:07 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080909131108.7A489238899E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Tue Sep 9 06:11:05 2008 New Revision: 693455 URL: http://svn.apache.org/viewvc?rev=693455&view=rev Log: HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading it from a different .crc files. Contributed by Jothi Padmanabhan. Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue Sep 9 06:11:05 2008 @@ -316,6 +316,9 @@ GenericMRLoadGenerator public, so they can be used in other contexts. (Lingyun Yang via omalley) + HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading + it from a different .crc file. (Jothi Padmanabhan via ddas) + BUG FIXES HADOOP-3563. Refactor the distributed upgrade code so that it is Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java (original) +++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java Tue Sep 9 06:11:05 2008 @@ -29,13 +29,19 @@ public class LocalFileSystem extends ChecksumFileSystem { static final URI NAME = URI.create("file:///"); static private Random rand = new Random(); - + FileSystem rfs; + public LocalFileSystem() { - super(new RawLocalFileSystem()); + this(new RawLocalFileSystem()); + } + + public FileSystem getRaw() { + return rfs; } public LocalFileSystem(FileSystem rawLocalFileSystem) { super(rawLocalFileSystem); + rfs = rawLocalFileSystem; } /** Convert a path to a File. */ Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java (original) +++ hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java Tue Sep 9 06:11:05 2008 @@ -223,9 +223,6 @@ summer.update( b, off, len ); inSum += len; } - // Can be removed. - assert inSum <= bytesPerChecksum : "DataChecksum.update() : inSum " + - inSum + " > " + " bytesPerChecksum " + bytesPerChecksum ; } public void update( int b ) { summer.update( b ); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Sep 9 06:11:05 2008 @@ -66,6 +66,8 @@ long decompressedBytesWritten = 0; long compressedBytesWritten = 0; + IFileOutputStream checksumOut; + Class keyClass; Class valueClass; Serializer keySerializer; @@ -83,17 +85,18 @@ public Writer(Configuration conf, FSDataOutputStream out, Class keyClass, Class valueClass, CompressionCodec codec) throws IOException { + this.checksumOut = new IFileOutputStream(out); this.rawOut = out; this.start = this.rawOut.getPos(); if (codec != null) { this.compressor = CodecPool.getCompressor(codec); this.compressor.reset(); - this.compressedOut = codec.createOutputStream(out, compressor); + this.compressedOut = codec.createOutputStream(checksumOut, compressor); this.out = new FSDataOutputStream(this.compressedOut, null); this.compressOutput = true; } else { - this.out = out; + this.out = new FSDataOutputStream(checksumOut,null); } this.keyClass = keyClass; @@ -106,6 +109,7 @@ } public void close() throws IOException { + // Close the serializers keySerializer.close(); valueSerializer.close(); @@ -115,24 +119,25 @@ WritableUtils.writeVInt(out, EOF_MARKER); decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER); + //Flush the stream + out.flush(); + if (compressOutput) { - // Flush data from buffers into the compressor - out.flush(); - // Flush & return the compressor compressedOut.finish(); compressedOut.resetState(); CodecPool.returnCompressor(compressor); compressor = null; } - + // Close the stream - rawOut.flush(); + checksumOut.close(); + compressedBytesWritten = rawOut.getPos() - start; - + // Close the underlying stream iff we own it... if (ownOutputStream) { - out.close(); + rawOut.close(); } out = null; } @@ -216,43 +221,71 @@ private static final int DEFAULT_BUFFER_SIZE = 128*1024; private static final int MAX_VINT_SIZE = 9; - FSDataInputStream rawIn; // Raw InputStream from file InputStream in; // Possibly decompressed stream that we read Decompressor decompressor; long bytesRead = 0; long fileLength = 0; boolean eof = false; + IFileInputStream checksumIn; byte[] buffer = null; int bufferSize = DEFAULT_BUFFER_SIZE; DataInputBuffer dataIn = new DataInputBuffer(); int recNo = 1; - + + /** + * Construct an IFile Reader. + * + * @param conf Configuration File + * @param fs FileSystem + * @param file Path of the file to be opened. This file should have + * checksum bytes for the data at the end of the file. + * @param codec codec + * @throws IOException + */ + public Reader(Configuration conf, FileSystem fs, Path file, CompressionCodec codec) throws IOException { - this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec); + this(conf, fs.open(file), + fs.getFileStatus(file).getLen(), + codec); } protected Reader() {} + + /** + * Construct an IFile Reader. + * + * @param conf Configuration File + * @param in The input stream + * @param length Length of the data in the stream, including the checksum + * bytes. + * @param codec codec + * @throws IOException + */ public Reader(Configuration conf, FSDataInputStream in, long length, CompressionCodec codec) throws IOException { - this.rawIn = in; + checksumIn = new IFileInputStream(in,length); if (codec != null) { decompressor = CodecPool.getDecompressor(codec); - this.in = codec.createInputStream(in, decompressor); + this.in = codec.createInputStream(checksumIn, decompressor); } else { - this.in = in; + this.in = checksumIn; } this.fileLength = length; this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); } - public long getLength() { return fileLength; } + public long getLength() { + return fileLength - checksumIn.getSize(); + } - public long getPosition() throws IOException { return rawIn.getPos(); } + public long getPosition() throws IOException { + return checksumIn.getPosition(); + } /** * Read upto len bytes into buf starting at offset off. @@ -414,6 +447,11 @@ return bytesRead; } + @Override + public long getLength() { + return fileLength; + } + private void dumpOnError() { File dumpFile = new File("../output/" + taskAttemptId + ".dump"); System.err.println("Dumping corrupt map-output of " + taskAttemptId + Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=693455&view=auto ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (added) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Tue Sep 9 06:11:05 2008 @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DataChecksum; +/** + * A checksum input stream, used for IFiles. + * Used to validate the checksum of files created by {@link IFileOutputStream}. + */ + +class IFileInputStream extends InputStream { + + private final InputStream in; //The input stream to be verified for checksum. + private final long length; //The total length of the input file + private final long dataLength; + private DataChecksum sum; + private long currentOffset = 0; + private byte b[]; + private byte csum[] = null; + private int checksumSize; + + /** + * Create a checksum input stream that reads + * @param in The input stream to be verified for checksum. + * @param len The length of the input stream including checksum bytes. + */ + public IFileInputStream(InputStream in, long len) { + this.in = in; + sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, + Integer.MAX_VALUE); + checksumSize = sum.getChecksumSize(); + length = len; + dataLength = length - checksumSize; + b = new byte[1]; + } + + /** + * Close the input stream. + */ + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public long skip(long n) throws IOException { + throw new IOException("Skip not supported for IFileInputStream"); + } + + public long getPosition() { + return (currentOffset >= dataLength) ? dataLength : currentOffset; + } + + public long getSize() { + return checksumSize; + } + + /** + * Read bytes from the stream. + * At EOF, checksum is validated, but the checksum + * bytes are not passed back in the buffer. + */ + public int read(byte[] b, int off, int len) throws IOException { + + if (currentOffset >= dataLength) { + return -1; + } + + return doRead(b,off,len); + } + + /** + * Read bytes from the stream. + * At EOF, checksum is validated and sent back + * as the last four bytes of the buffer. The caller should handle + * these bytes appropriately + */ + public int readWithChecksum(byte[] b, int off, int len) throws IOException { + + if (currentOffset == length) { + return -1; + } + else if (currentOffset >= dataLength) { + // If the previous read drained off all the data, then just return + // the checksum now. Note that checksum validation would have + // happened in the earlier read + int lenToCopy = (int) (checksumSize - (currentOffset - dataLength)); + if (len < lenToCopy) { + lenToCopy = len; + } + System.arraycopy(csum, (int) (currentOffset - dataLength), b, off, + lenToCopy); + currentOffset += lenToCopy; + return lenToCopy; + } + + int bytesRead = doRead(b,off,len); + + if (currentOffset == dataLength) { + if (len >= bytesRead + checksumSize) { + System.arraycopy(csum, 0, b, off + bytesRead, checksumSize); + bytesRead += checksumSize; + currentOffset += checksumSize; + } + } + return bytesRead; + } + + private int doRead(byte[]b, int off, int len) throws IOException { + + // If we are trying to read past the end of data, just read + // the left over data + if (currentOffset + len > dataLength) { + len = (int) dataLength - (int)currentOffset; + } + + int bytesRead = in.read(b, off, len); + + if (bytesRead < 0) { + throw new ChecksumException("Checksum Error", 0); + } + + sum.update(b,off,bytesRead); + + currentOffset += bytesRead; + + if (currentOffset == dataLength) { + // The last four bytes are checksum. Strip them and verify + csum = new byte[checksumSize]; + IOUtils.readFully(in, csum, 0, checksumSize); + if (!sum.compare(csum, 0)) { + throw new ChecksumException("Checksum Error", 0); + } + } + return bytesRead; + } + + + @Override + public int read() throws IOException { + b[0] = 0; + int l = read(b,0,1); + if (l < 0) return l; + + // Upgrade the b[0] to an int so as not to misinterpret the + // first bit of the byte as a sign bit + int result = 0xFF & b[0]; + return result; + } + + public byte[] getChecksum() { + return csum; + } +} Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java?rev=693455&view=auto ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java (added) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java Tue Sep 9 06:11:05 2008 @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.FilterOutputStream; + +import org.apache.hadoop.util.DataChecksum; +/** + * A Checksum output stream. + * Checksum for the contents of the file is calculated and + * appended to the end of the file on close of the stream. + * Used for IFiles + */ +class IFileOutputStream extends FilterOutputStream { + /** + * The output stream to be checksummed. + */ + private final DataChecksum sum; + private byte[] barray; + private boolean closed = false; + + /** + * Create a checksum output stream that writes + * the bytes to the given stream. + * @param out + */ + public IFileOutputStream(OutputStream out) { + super(out); + sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, + Integer.MAX_VALUE); + barray = new byte[sum.getChecksumSize()]; + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + closed = true; + sum.writeValue(barray, 0, false); + out.write (barray, 0, sum.getChecksumSize()); + out.flush(); + } + + /** + * Write bytes to the stream. + */ + @Override + public void write(byte[] b, int off, int len) throws IOException { + sum.update(b, off,len); + out.write(b,off,len); + } + + @Override + public void write(int b) throws IOException { + barray[0] = (byte) (b & 0xFF); + write(barray,0,1); + } + +} Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Sep 9 06:11:05 2008 @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; @@ -425,7 +426,8 @@ private final SpillThread spillThread = new SpillThread(); private final FileSystem localFs; - + private final FileSystem rfs; + private final Counters.Counter mapOutputByteCounter; private final Counters.Counter mapOutputRecordCounter; private final Counters.Counter combineInputCounter; @@ -439,7 +441,10 @@ localFs = FileSystem.getLocal(job); partitions = job.getNumReduceTasks(); partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job); - // sanity checks + + rfs = ((LocalFileSystem)localFs).getRaw(); + + //sanity checks final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8); final float recper = job.getFloat("io.sort.record.percent",(float)0.05); final int sortmb = job.getInt("io.sort.mb", 100); @@ -891,7 +896,7 @@ // create spill file Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); - out = localFs.create(filename); + out = rfs.create(filename); // create spill index Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( getTaskID(), numSpills, @@ -972,7 +977,7 @@ // create spill file Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); - out = localFs.create(filename); + out = rfs.create(filename); // create spill index Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( getTaskID(), numSpills, @@ -1107,15 +1112,15 @@ for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(getTaskID(), i); indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i); - finalOutFileSize += localFs.getFileStatus(filename[i]).getLen(); + finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } if (numSpills == 1) { //the spill is the final output - localFs.rename(filename[0], - new Path(filename[0].getParent(), "file.out")); - localFs.rename(indexFileName[0], - new Path(indexFileName[0].getParent(),"file.out.index")); - return; + rfs.rename(filename[0], + new Path(filename[0].getParent(), "file.out")); + localFs.rename(indexFileName[0], + new Path(indexFileName[0].getParent(),"file.out.index")); + return; } //make correction in the length to include the sequence file header //lengths for each partition @@ -1129,9 +1134,10 @@ getTaskID(), finalIndexFileSize); //The output stream for the final single output file - FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, - 4096); - + + FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, + 4096); + //The final index file output stream FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true, 4096); @@ -1160,8 +1166,9 @@ long rawSegmentLength = indexIn.readLong(); long segmentLength = indexIn.readLong(); indexIn.close(); - FSDataInputStream in = localFs.open(filename[i]); + FSDataInputStream in = rfs.open(filename[i]); in.seek(segmentOffset); + Segment s = new Segment(new Reader(job, in, segmentLength, codec), true); @@ -1176,7 +1183,7 @@ //merge @SuppressWarnings("unchecked") RawKeyValueIterator kvIter = - Merger.merge(job, localFs, + Merger.merge(job, rfs, keyClass, valClass, segmentList, job.getInt("io.sort.factor", 100), new Path(getTaskID().toString()), @@ -1203,7 +1210,7 @@ finalIndexOut.close(); //cleanup for(int i = 0; i < numSpills; i++) { - localFs.delete(filename[i], true); + rfs.delete(filename[i],true); localFs.delete(indexFileName[i], true); } } @@ -1223,7 +1230,7 @@ //StringBuffer sb = new StringBuffer(); indexOut.writeLong(start); indexOut.writeLong(writer.getRawLength()); - long segmentLength = out.getPos() - start; + long segmentLength = writer.getCompressedLength(); indexOut.writeLong(segmentLength); LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + segmentLength + ")"); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Sep 9 06:11:05 2008 @@ -128,7 +128,10 @@ DataInputBuffer getKey() { return key; } DataInputBuffer getValue() { return value; } - long getLength() { return segmentLength; } + long getLength() { + return (reader == null) ? + segmentLength : reader.getLength(); + } boolean next() throws IOException { return reader.next(key, value); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Sep 9 06:11:05 2008 @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.Math; import java.net.URI; import java.net.URL; import java.net.URLClassLoader; @@ -335,7 +336,8 @@ } FileSystem lfs = FileSystem.getLocal(job); - + FileSystem rfs = ((LocalFileSystem)lfs).getRaw(); + // Initialize the codec codec = initCodec(); @@ -362,7 +364,7 @@ LOG.info("Initiating final on-disk merge with " + mapFiles.length + " files"); RawKeyValueIterator rIter = - Merger.merge(job, lfs, + Merger.merge(job,rfs, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), codec, mapFiles, !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), tempDir, @@ -509,6 +511,7 @@ */ private FileSystem localFileSys; + private FileSystem rfs; /** * Number of files to merge at a time */ @@ -1215,13 +1218,16 @@ compressedLength + " raw bytes) " + "into RAM from " + mapOutputLoc.getTaskAttemptId()); - mapOutput = shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength); + mapOutput = shuffleInMemory(mapOutputLoc, connection, input, + (int)decompressedLength, + (int)compressedLength); } else { LOG.info("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into Local-FS from " + mapOutputLoc.getTaskAttemptId()); - mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength); + mapOutput = shuffleToDisk(mapOutputLoc, input, filename, + compressedLength); } return mapOutput; @@ -1266,7 +1272,8 @@ private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc, URLConnection connection, InputStream input, - int mapOutputLength) + int mapOutputLength, + int compressedLength) throws IOException, InterruptedException { // Reserve ram for the map-output boolean createdNow = ramManager.reserve(mapOutputLength, input); @@ -1289,6 +1296,11 @@ throw ioe; } } + + IFileInputStream checksumIn = + new IFileInputStream(input,compressedLength); + + input = checksumIn; // Are map-outputs compressed? if (codec != null) { @@ -1402,7 +1414,7 @@ OutputStream output = null; long bytesRead = 0; try { - output = localFileSys.create(localFilename); + output = rfs.create(localFilename); byte[] buf = new byte[64 * 1024]; int n = input.read(buf, 0, buf.length); @@ -1541,7 +1553,9 @@ (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit()); localFileSys = FileSystem.getLocal(conf); - + + rfs = ((LocalFileSystem)localFileSys).getRaw(); + // hosts -> next contact time this.penaltyBox = new LinkedHashMap(); @@ -2187,7 +2201,7 @@ approxOutputSize, conf) .suffix(".merged"); Writer writer = - new Writer(conf, localFileSys, outputPath, + new Writer(conf,rfs, outputPath, conf.getMapOutputKeyClass(), conf.getMapOutputValueClass(), codec); @@ -2195,7 +2209,7 @@ Path tmpDir = new Path(reduceTask.getTaskID().toString()); final Reporter reporter = getReporter(umbilical); try { - iter = Merger.merge(conf, localFileSys, + iter = Merger.merge(conf, rfs, conf.getMapOutputKeyClass(), conf.getMapOutputValueClass(), codec, mapFiles.toArray(new Path[mapFiles.size()]), @@ -2275,7 +2289,7 @@ reduceTask.getTaskID(), ramfsMergeOutputSize); Writer writer = - new Writer(conf, localFileSys, outputPath, + new Writer(conf, rfs, outputPath, conf.getMapOutputKeyClass(), conf.getMapOutputValueClass(), codec); @@ -2289,7 +2303,7 @@ LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); - rIter = Merger.merge(conf, localFileSys, + rIter = Merger.merge(conf, rfs, (Class)conf.getMapOutputKeyClass(), (Class)conf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Sep 9 06:11:05 2008 @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableUtils; @@ -2538,6 +2539,8 @@ OutputStream outStream = null; FSDataInputStream indexIn = null; FSDataInputStream mapOutputIn = null; + + IFileInputStream checksumInputStream = null; long totalRead = 0; ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics) @@ -2598,8 +2601,9 @@ * send it to the reducer. */ //open the map-output file - mapOutputIn = fileSys.open(mapOutputFileName); - + FileSystem rfs = ((LocalFileSystem)fileSys).getRaw(); + + mapOutputIn = rfs.open(mapOutputFileName); // TODO: Remove this after a 'fix' for HADOOP-3647 // The clever trick here to reduce the impact of the extra seek for // logging the first key/value lengths is to read the lengths before @@ -2618,8 +2622,9 @@ //seek to the correct offset for the reduce mapOutputIn.seek(startOffset); + checksumInputStream = new IFileInputStream(mapOutputIn,partLength); - int len = mapOutputIn.read(buffer, 0, + int len = checksumInputStream.readWithChecksum(buffer, 0, partLength < MAX_BYTES_TO_READ ? (int)partLength : MAX_BYTES_TO_READ); while (len > 0) { @@ -2633,9 +2638,9 @@ } totalRead += len; if (totalRead == partLength) break; - len = mapOutputIn.read(buffer, 0, - (partLength - totalRead) < MAX_BYTES_TO_READ - ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ); + len = checksumInputStream.readWithChecksum(buffer, 0, + (partLength - totalRead) < MAX_BYTES_TO_READ + ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ); } LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + @@ -2660,8 +2665,9 @@ if (indexIn != null) { indexIn.close(); } - if (mapOutputIn != null) { - mapOutputIn.close(); + + if (checksumInputStream != null) { + checksumInputStream.close(); } shuffleMetrics.serverHandlerFree(); if (ClientTraceLog.isInfoEnabled()) { Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=693455&r1=693454&r2=693455&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Tue Sep 9 06:11:05 2008 @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; @@ -75,10 +76,11 @@ public void runValueIterator(Path tmpDir, Pair[] vals, Configuration conf, CompressionCodec codec) throws IOException { - FileSystem fs = tmpDir.getFileSystem(conf); + FileSystem localFs = FileSystem.getLocal(conf); + FileSystem rfs = ((LocalFileSystem)localFs).getRaw(); Path path = new Path(tmpDir, "data.in"); IFile.Writer writer = - new IFile.Writer(conf, fs, path, Text.class, Text.class, codec); + new IFile.Writer(conf, rfs, path, Text.class, Text.class, codec); for(Pair p: vals) { writer.append(new Text(p.key), new Text(p.value)); } @@ -86,7 +88,7 @@ @SuppressWarnings("unchecked") RawKeyValueIterator rawItr = - Merger.merge(conf, fs, Text.class, Text.class, codec, new Path[]{path}, + Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, false, conf.getInt("io.sort.factor", 100), tmpDir, new Text.Comparator(), new NullProgress()); @SuppressWarnings("unchecked") // WritableComparators are not generic