hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r663440 [1/3] - in /hadoop/core/trunk: ./ conf/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/io/compress/zlib/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test...
Date Thu, 05 Jun 2008 04:06:14 GMT
Author: acmurthy
Date: Wed Jun  4 21:06:13 2008
New Revision: 663440

URL: http://svn.apache.org/viewvc?rev=663440&view=rev
Log:
HADOOP-2095. Improve the Map-Reduce shuffle/merge by cutting down buffer-copies; changed intermediate sort/merge to use the new IFile format rather than SequenceFiles and compression of map-outputs is now implemented by compressing the entire file rather than SequenceFile compression. Shuffle also has been changed to use a simple byte-buffer manager rather than the InMemoryFileSystem.
Configuration changes to hadoop-default.xml:
  deprecated mapred.map.output.compression.type

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CodecPool.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/java/org/apache/hadoop/io/DataInputBuffer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/DataOutputBuffer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
    hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun  4 21:06:13 2008
@@ -221,6 +221,16 @@
     layout versions are consistent in the data node. (Steve Loughran
     via omalley)
 
+    HADOOP-2095. Improve the Map-Reduce shuffle/merge by cutting down
+    buffer-copies; changed intermediate sort/merge to use the new IFile format
+    rather than SequenceFiles and compression of map-outputs is now
+    implemented by compressing the entire file rather than SequenceFile
+    compression. Shuffle also has been changed to use a simple byte-buffer
+    manager rather than the InMemoryFileSystem. 
+    Configuration changes to hadoop-default.xml:
+      deprecated mapred.map.output.compression.type 
+    (acmurthy)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Wed Jun  4 21:06:13 2008
@@ -921,14 +921,6 @@
 </property>
 
 <property>
-  <name>mapred.map.output.compression.type</name>
-  <value>RECORD</value>
-  <description>If the map outputs are to compressed, how should they
-               be compressed? Should be one of NONE, RECORD or BLOCK.
-  </description>
-</property>
-
-<property>
   <name>mapred.map.output.compression.codec</name>
   <value>org.apache.hadoop.io.compress.DefaultCodec</value>
   <description>If the map outputs are compressed, how should they be 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/DataInputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/DataInputBuffer.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/DataInputBuffer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/DataInputBuffer.java Wed Jun  4 21:06:13 2008
@@ -20,7 +20,6 @@
 
 import java.io.*;
 
-
 /** A reusable {@link DataInput} implementation that reads from an in-memory
  * buffer.
  *
@@ -40,7 +39,6 @@
  *  
  */
 public class DataInputBuffer extends DataInputStream {
-
   private static class Buffer extends ByteArrayInputStream {
     public Buffer() {
       super(new byte[] {});
@@ -53,6 +51,7 @@
       this.pos = start;
     }
 
+    public byte[] getData() { return buf; }
     public int getPosition() { return pos; }
     public int getLength() { return count; }
   }
@@ -78,6 +77,10 @@
   public void reset(byte[] input, int start, int length) {
     buffer.reset(input, start, length);
   }
+  
+  public byte[] getData() {
+    return buffer.getData();
+  }
 
   /** Returns the current position in the input. */
   public int getPosition() { return buffer.getPosition(); }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/DataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/DataOutputBuffer.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/DataOutputBuffer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/DataOutputBuffer.java Wed Jun  4 21:06:13 2008
@@ -46,6 +46,14 @@
     public int getLength() { return count; }
     public void reset() { count = 0; }
 
+    public Buffer() {
+      super();
+    }
+    
+    public Buffer(int size) {
+      super(size);
+    }
+    
     public void write(DataInput in, int len) throws IOException {
       int newcount = count + len;
       if (newcount > buf.length) {
@@ -65,6 +73,10 @@
     this(new Buffer());
   }
   
+  public DataOutputBuffer(int size) {
+    this(new Buffer(size));
+  }
+  
   private DataOutputBuffer(Buffer buffer) {
     super(buffer);
     this.buffer = buffer;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Wed Jun  4 21:06:13 2008
@@ -24,6 +24,7 @@
 import java.security.MessageDigest;
 import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
@@ -778,13 +779,6 @@
   
   /** Write key/value pairs to a sequence-format file. */
   public static class Writer implements java.io.Closeable {
-    /**
-     * A global compressor pool used to save the expensive 
-     * construction/destruction of (possibly native) compression codecs.
-     */
-    private static final CodecPool<Compressor> compressorPool = 
-      new CodecPool<Compressor>();
-    
     Configuration conf;
     FSDataOutputStream out;
     boolean ownOutputStream = true;
@@ -919,10 +913,7 @@
       this.uncompressedValSerializer.open(buffer);
       if (this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
-        compressor = compressorPool.getCodec(this.codec.getCompressorType());
-        if (compressor == null) {
-          compressor = this.codec.createCompressor();
-        }
+        this.compressor = CodecPool.getCompressor(this.codec);
         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
         this.deflateOut = 
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
@@ -954,7 +945,7 @@
     
     /** Close the file. */
     public synchronized void close() throws IOException {
-      compressorPool.returnCodec(compressor);
+      CodecPool.returnCompressor(compressor);
       
       keySerializer.close();
       uncompressedValSerializer.close();
@@ -1361,13 +1352,6 @@
   
   /** Reads key/value pairs from a sequence-format file. */
   public static class Reader implements java.io.Closeable {
-    /**
-     * A global decompressor pool used to save the expensive 
-     * construction/destruction of (possibly native) decompression codecs.
-     */
-    private static final CodecPool<Decompressor> decompressorPool = 
-      new CodecPool<Decompressor>();
-    
     private Path file;
     private FSDataInputStream in;
     private DataOutputBuffer outBuf = new DataOutputBuffer();
@@ -1451,16 +1435,6 @@
       return fs.open(file, bufferSize);
     }
     
-    private Decompressor getPooledOrNewDecompressor() {
-      Decompressor decompressor = null;
-      decompressor = decompressorPool.getCodec(codec.getDecompressorType());
-      if (decompressor == null) {
-        decompressor = codec.createDecompressor();
-      }
-      return decompressor;
-    }
-    
-
     /**
      * Initialize the {@link Reader}
      * @param tmpReader <code>true</code> if we are constructing a temporary
@@ -1540,7 +1514,7 @@
       if (!tempReader) {
         valBuffer = new DataInputBuffer();
         if (decompress) {
-          valDecompressor = getPooledOrNewDecompressor();
+          valDecompressor = CodecPool.getDecompressor(codec);
           valInFilter = codec.createInputStream(valBuffer, valDecompressor);
           valIn = new DataInputStream(valInFilter);
         } else {
@@ -1552,16 +1526,16 @@
           keyBuffer = new DataInputBuffer();
           valLenBuffer = new DataInputBuffer();
 
-          keyLenDecompressor = getPooledOrNewDecompressor();
+          keyLenDecompressor = CodecPool.getDecompressor(codec);
           keyLenInFilter = codec.createInputStream(keyLenBuffer, 
                                                    keyLenDecompressor);
           keyLenIn = new DataInputStream(keyLenInFilter);
 
-          keyDecompressor = getPooledOrNewDecompressor();
+          keyDecompressor = CodecPool.getDecompressor(codec);
           keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
           keyIn = new DataInputStream(keyInFilter);
 
-          valLenDecompressor = getPooledOrNewDecompressor();
+          valLenDecompressor = CodecPool.getDecompressor(codec);
           valLenInFilter = codec.createInputStream(valLenBuffer, 
                                                    valLenDecompressor);
           valLenIn = new DataInputStream(valLenInFilter);
@@ -1572,10 +1546,10 @@
     /** Close the file. */
     public synchronized void close() throws IOException {
       // Return the decompressors to the pool
-      decompressorPool.returnCodec(keyLenDecompressor);
-      decompressorPool.returnCodec(keyDecompressor);
-      decompressorPool.returnCodec(valLenDecompressor);
-      decompressorPool.returnCodec(valDecompressor);
+      CodecPool.returnDecompressor(keyLenDecompressor);
+      CodecPool.returnDecompressor(keyDecompressor);
+      CodecPool.returnDecompressor(valLenDecompressor);
+      CodecPool.returnDecompressor(valDecompressor);
       
       // Close the input-stream
       in.close();
@@ -2100,49 +2074,6 @@
 
   }
 
-  private static class CodecPool<T> {
-
-    private Map<Class, List<T>> pool = new HashMap<Class, List<T>>();
-    
-    public T getCodec(Class codecClass) {
-      T codec = null;
-      
-      // Check if an appropriate codec is available
-      synchronized (pool) {
-        if (pool.containsKey(codecClass)) {
-          List<T> codecList = pool.get(codecClass);
-          
-          if (codecList != null) {
-            synchronized (codecList) {
-              if (!codecList.isEmpty()) {
-                codec = codecList.remove(0);
-              }
-            }
-          }
-        }
-      }
-      
-      return codec;
-    }
-
-    public void returnCodec(T codec) {
-      if (codec != null) {
-        Class codecClass = codec.getClass();
-        synchronized (pool) {
-          if (!pool.containsKey(codecClass)) {
-            pool.put(codecClass, new ArrayList<T>());
-          }
-
-          List<T> codecList = pool.get(codecClass);
-          synchronized (codecList) {
-            codecList.add(codec);
-          }
-        }
-      }
-    }
-
-  }
-  
   /** Sorts key/value pairs in a sequence-format file.
    *
    * <p>For best performance, applications should make sure that the {@link

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CodecPool.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CodecPool.java?rev=663440&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CodecPool.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CodecPool.java Wed Jun  4 21:06:13 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse 
+ * (possibly native) compression/decompression codecs.
+ */
+public class CodecPool {
+  private static final Log LOG = LogFactory.getLog(CodecPool.class);
+  
+  /**
+   * A global compressor pool used to save the expensive 
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 
+    new HashMap<Class<Compressor>, List<Compressor>>();
+  
+  /**
+   * A global decompressor pool used to save the expensive 
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
+    new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+  private static <T> T borrow(Map<Class<T>, List<T>> pool,
+                             Class<? extends T> codecClass) {
+    T codec = null;
+    
+    // Check if an appropriate codec is available
+    synchronized (pool) {
+      if (pool.containsKey(codecClass)) {
+        List<T> codecList = pool.get(codecClass);
+        
+        if (codecList != null) {
+          synchronized (codecList) {
+            if (!codecList.isEmpty()) {
+              codec = codecList.remove(0);
+            }
+          }
+        }
+      }
+    }
+    
+    return codec;
+  }
+
+  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+    if (codec != null) {
+      Class<T> codecClass = ReflectionUtils.getClass(codec);
+      synchronized (pool) {
+        if (!pool.containsKey(codecClass)) {
+          pool.put(codecClass, new ArrayList<T>());
+        }
+
+        List<T> codecList = pool.get(codecClass);
+        synchronized (codecList) {
+          codecList.add(codec);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
+   * pool or a new one.
+   *
+   * @param codec the <code>CompressionCodec</code> for which to get the 
+   *              <code>Compressor</code>
+   * @return <code>Compressor</code> for the given 
+   *         <code>CompressionCodec</code> from the pool or a new one
+   */
+  public static Compressor getCompressor(CompressionCodec codec) {
+    Compressor compressor = borrow(compressorPool, codec.getCompressorType());
+    if (compressor == null) {
+      compressor = codec.createCompressor();
+      LOG.debug("Got brand-new compressor");
+    } else {
+      LOG.debug("Got recycled compressor");
+    }
+    return compressor;
+  }
+  
+  /**
+   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *  
+   * @param codec the <code>CompressionCodec</code> for which to get the 
+   *              <code>Decompressor</code>
+   * @return <code>Decompressor</code> for the given 
+   *         <code>CompressionCodec</code> the pool or a new one
+   */
+  public static Decompressor getDecompressor(CompressionCodec codec) {
+    Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
+    if (decompressor == null) {
+      decompressor = codec.createDecompressor();
+      LOG.debug("Got brand-new decompressor");
+    } else {
+      LOG.debug("Got recycled decompressor");
+    }
+    return decompressor;
+  }
+  
+  /**
+   * Return the {@link Compressor} to the pool.
+   * 
+   * @param compressor the <code>Compressor</code> to be returned to the pool
+   */
+  public static void returnCompressor(Compressor compressor) {
+    if (compressor == null) {
+      return;
+    }
+    compressor.reset();
+    payback(compressorPool, compressor);
+  }
+  
+  /**
+   * Return the {@link Decompressor} to the pool.
+   * 
+   * @param decompressor the <code>Decompressor</code> to be returned to the 
+   *                     pool
+   */
+  public static void returnDecompressor(Decompressor decompressor) {
+    if (decompressor == null) {
+      return;
+    }
+    decompressor.reset();
+    payback(decompressorPool, decompressor);
+  }
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java Wed Jun  4 21:06:13 2008
@@ -56,7 +56,7 @@
    * 
    * @return the type of compressor needed by this codec.
    */
-  Class getCompressorType();
+  Class<? extends Compressor> getCompressorType();
   
   /**
    * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
@@ -93,7 +93,7 @@
    * 
    * @return the type of decompressor needed by this codec.
    */
-  Class getDecompressorType();
+  Class<? extends Decompressor> getDecompressorType();
   
   /**
    * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java Wed Jun  4 21:06:13 2008
@@ -51,7 +51,7 @@
                                 conf.getInt("io.file.buffer.size", 4*1024));
   }
 
-  public Class getCompressorType() {
+  public Class<? extends Compressor> getCompressorType() {
     return ZlibFactory.getZlibCompressorType(conf);
   }
 
@@ -72,7 +72,7 @@
                                   conf.getInt("io.file.buffer.size", 4*1024));
   }
 
-  public Class getDecompressorType() {
+  public Class<? extends Decompressor> getDecompressorType() {
     return ZlibFactory.getZlibDecompressorType(conf);
   }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Wed Jun  4 21:06:13 2008
@@ -161,7 +161,7 @@
                null;
   }
 
-  public Class getCompressorType() {
+  public Class<? extends Compressor> getCompressorType() {
     return ZlibFactory.getZlibCompressorType(conf);
   }
 
@@ -191,7 +191,7 @@
                null;                               
   }
 
-  public Class getDecompressorType() {
+  public Class<? extends Decompressor> getDecompressorType() {
     return ZlibFactory.getZlibDecompressorType(conf);
   }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java Wed Jun  4 21:06:13 2008
@@ -124,7 +124,7 @@
                                      compressionOverhead);
   }
 
-  public Class getCompressorType() {
+  public Class<? extends Compressor> getCompressorType() {
     // Ensure native-lzo library is loaded & initialized
     if (!isNativeLzoLoaded(conf)) {
       throw new RuntimeException("native-lzo library not available");
@@ -164,7 +164,7 @@
         conf.getInt("io.compression.codec.lzo.buffersize", 64*1024));
   }
 
-  public Class getDecompressorType() {
+  public Class<? extends Decompressor> getDecompressorType() {
     // Ensure native-lzo library is loaded & initialized
     if (!isNativeLzoLoaded(conf)) {
       throw new RuntimeException("native-lzo library not available");

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java Wed Jun  4 21:06:13 2008
@@ -67,7 +67,8 @@
    * @param conf configuration
    * @return the appropriate type of the zlib compressor.
    */
-  public static Class getZlibCompressorType(Configuration conf) {
+  public static Class<? extends Compressor> 
+  getZlibCompressorType(Configuration conf) {
     return (isNativeZlibLoaded(conf)) ? 
             ZlibCompressor.class : BuiltInZlibDeflater.class;
   }
@@ -89,7 +90,8 @@
    * @param conf configuration
    * @return the appropriate type of the zlib decompressor.
    */
-  public static Class getZlibDecompressorType(Configuration conf) {
+  public static Class<? extends Decompressor> 
+  getZlibDecompressorType(Configuration conf) {
     return (isNativeZlibLoaded(conf)) ? 
             ZlibDecompressor.class : BuiltInZlibInflater.class;
   }

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IFile.java?rev=663440&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IFile.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IFile.java Wed Jun  4 21:06:13 2008
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+
+/**
+ * <code>IFile</code> is the simple <key-len, key, value-len, value> format
+ * for the intermediate map-outputs in Map-Reduce.
+ * 
+ * There is a <code>Writer</code> to write out map-outputs in this format and 
+ * a <code>Reader</code> to read files of this format.
+ */
+class IFile {
+
+  private static int EOF_MARKER = -1;
+  
+  /**
+   * <code>IFile.Writer</code> to write out intermediate map-outputs. 
+   */
+  public static class Writer<K extends Object, V extends Object> {
+    FSDataOutputStream out;
+    boolean ownOutputStream = false;
+    long start = 0;
+    
+    CompressionOutputStream compressedOut;
+    Compressor compressor;
+    boolean compressOutput = false;
+    
+    long decompressedBytesWritten = 0;
+    long compressedBytesWritten = 0;
+    
+    Class<K> keyClass;
+    Class<V> valueClass;
+    Serializer<K> keySerializer;
+    Serializer<V> valueSerializer;
+    
+    DataOutputBuffer buffer = new DataOutputBuffer();
+
+    public Writer(Configuration conf, FileSystem fs, Path file, 
+                  Class<K> keyClass, Class<V> valueClass,
+                  CompressionCodec codec) throws IOException {
+      this(conf, fs.create(file), keyClass, valueClass, codec);
+      ownOutputStream = true;
+    }
+    
+    public Writer(Configuration conf, FSDataOutputStream out, 
+        Class<K> keyClass, Class<V> valueClass,
+        CompressionCodec codec) throws IOException {
+      if (codec != null) {
+        this.compressor = CodecPool.getCompressor(codec);
+        this.compressor.reset();
+        this.compressedOut = codec.createOutputStream(out, compressor);
+        this.out = new FSDataOutputStream(this.compressedOut,  null);
+        this.compressOutput = true;
+      } else {
+        this.out = out;
+      }
+      this.start = this.out.getPos();
+      
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keySerializer = serializationFactory.getSerializer(keyClass);
+      this.keySerializer.open(buffer);
+      this.valueSerializer = serializationFactory.getSerializer(valueClass);
+      this.valueSerializer.open(buffer);
+    }
+    
+    public void close() throws IOException {
+      // Write EOF_MARKER for key/value length
+      WritableUtils.writeVInt(out, EOF_MARKER);
+      WritableUtils.writeVInt(out, EOF_MARKER);
+      decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
+      
+      if (compressOutput) {
+        // Return the compressor
+        compressedOut.finish();
+        compressedOut.resetState();
+        CodecPool.returnCompressor(compressor);
+      }
+      
+      // Close the serializers
+      keySerializer.close();
+      valueSerializer.close();
+
+      // Close the stream
+      if (out != null) {
+        out.flush();
+        compressedBytesWritten = out.getPos() - start;
+        
+        // Close the underlying stream iff we own it...
+        if (ownOutputStream) {
+          out.close();
+        }
+        
+        out = null;
+      }
+    }
+
+    public void append(K key, V value) throws IOException {
+      if (key.getClass() != keyClass)
+        throw new IOException("wrong key class: "+ key.getClass()
+                              +" is not "+ keyClass);
+      if (value.getClass() != valueClass)
+        throw new IOException("wrong value class: "+ value.getClass()
+                              +" is not "+ valueClass);
+
+      // Append the 'key'
+      keySerializer.serialize(key);
+      int keyLength = buffer.getLength();
+      if (keyLength == 0)
+        throw new IOException("zero length keys not allowed: " + key);
+
+      // Append the 'value'
+      valueSerializer.serialize(value);
+      int valueLength = buffer.getLength() - keyLength;
+      
+      // Write the record out
+      WritableUtils.writeVInt(out, keyLength);                  // key length
+      WritableUtils.writeVInt(out, valueLength);                // value length
+      out.write(buffer.getData(), 0, buffer.getLength());       // data
+
+      // Reset
+      buffer.reset();
+      
+      // Update bytes written
+      decompressedBytesWritten += keyLength + valueLength + 
+                                  WritableUtils.getVIntSize(keyLength) + 
+                                  WritableUtils.getVIntSize(valueLength);
+    }
+    
+    public void append(DataInputBuffer key, DataInputBuffer value)
+    throws IOException {
+      int keyLength = key.getLength() - key.getPosition();
+      int valueLength = value.getLength() - value.getPosition();
+      
+      WritableUtils.writeVInt(out, keyLength);
+      WritableUtils.writeVInt(out, valueLength);
+      out.write(key.getData(), key.getPosition(), keyLength); 
+      out.write(value.getData(), value.getPosition(), valueLength); 
+
+      // Update bytes written
+      decompressedBytesWritten += keyLength + valueLength + 
+                      WritableUtils.getVIntSize(keyLength) + 
+                      WritableUtils.getVIntSize(valueLength);
+}
+    
+    public long getRawLength() {
+      return decompressedBytesWritten;
+    }
+    
+    public long getCompressedLength() {
+      return compressedBytesWritten;
+    }
+  }
+
+  /**
+   * <code>IFile.Reader</code> to read intermediate map-outputs. 
+   */
+  public static class Reader<K extends Object, V extends Object> {
+    private static final int DEFAULT_BUFFER_SIZE = 128*1024;
+    private static final int MAX_VINT_SIZE = 5;
+
+    InputStream in;
+    Decompressor decompressor;
+    long bytesRead = 0;
+    long fileLength = 0;
+    boolean eof = false;
+    
+    byte[] buffer = null;
+    int bufferSize = DEFAULT_BUFFER_SIZE;
+    DataInputBuffer dataIn = new DataInputBuffer();
+
+    public Reader(Configuration conf, FileSystem fs, Path file,
+                  CompressionCodec codec) throws IOException {
+      this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
+    }
+    
+    protected Reader() {}
+    
+    public Reader(Configuration conf, InputStream in, long length, 
+                  CompressionCodec codec) throws IOException {
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+        this.in = codec.createInputStream(in, decompressor);
+      } else {
+        this.in = in;
+      }
+      this.fileLength = length;
+      
+      this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+    }
+    
+    public long getLength() { return fileLength; }
+    
+    private int readData(byte[] buf, int off, int len) throws IOException {
+      int bytesRead = 0;
+      while (bytesRead < len) {
+        int n = in.read(buf, off+bytesRead, len-bytesRead);
+        if (n < 0) {
+          return bytesRead;
+        }
+        bytesRead += n;
+      }
+      return len;
+    }
+    
+    void readNextBlock(int minSize) throws IOException {
+      if (buffer == null) {
+        buffer = new byte[bufferSize];
+        dataIn.reset(buffer, 0, 0);
+      }
+      buffer = 
+        rejigData(buffer, 
+                  (bufferSize < minSize) ? new byte[minSize << 1] : buffer);
+      bufferSize = buffer.length;
+    }
+    
+    private byte[] rejigData(byte[] source, byte[] destination) 
+    throws IOException{
+      // Copy remaining data into the destination array
+      int bytesRemaining = dataIn.getLength()-dataIn.getPosition();
+      if (bytesRemaining > 0) {
+        System.arraycopy(source, dataIn.getPosition(), 
+            destination, 0, bytesRemaining);
+      }
+      
+      // Read as much data as will fit from the underlying stream 
+      int n = readData(destination, bytesRemaining, 
+                       (destination.length - bytesRemaining));
+      dataIn.reset(destination, 0, (bytesRemaining + n));
+      
+      return destination;
+    }
+    
+    public boolean next(DataInputBuffer key, DataInputBuffer value) 
+    throws IOException {
+      // Sanity check
+      if (eof) {
+        throw new EOFException("Completed reading " + bytesRead);
+      }
+      
+      // Check if we have enough data to read lengths
+      if ((dataIn.getLength() - dataIn.getPosition()) < 2*MAX_VINT_SIZE) {
+        readNextBlock(2*MAX_VINT_SIZE);
+      }
+      
+      // Read key and value lengths
+      int oldPos = dataIn.getPosition();
+      int keyLength = WritableUtils.readVInt(dataIn);
+      int valueLength = WritableUtils.readVInt(dataIn);
+      int pos = dataIn.getPosition();
+      bytesRead += pos - oldPos;
+      
+      // Check for EOF
+      if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
+        eof = true;
+        return false;
+      }
+      
+      final int recordLength = keyLength + valueLength;
+      
+      // Check if we have the raw key/value in the buffer
+      if ((dataIn.getLength()-pos) < recordLength) {
+        readNextBlock(recordLength);
+        
+        // Sanity check
+        if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
+          throw new EOFException("Could read the next record");
+        }
+      }
+
+      // Setup the key and value
+      pos = dataIn.getPosition();
+      byte[] data = dataIn.getData();
+      key.reset(data, pos, keyLength);
+      value.reset(data, (pos + keyLength), valueLength);
+      
+      // Position for the next record
+      dataIn.skip(recordLength);
+      bytesRead += recordLength;
+
+      return true;
+    }
+
+    public void close() throws IOException {
+      // Return the decompressor
+      if (decompressor != null) {
+        decompressor.reset();
+        CodecPool.returnDecompressor(decompressor);
+      }
+      
+      // Close the underlying stream
+      if (in != null) {
+        in.close();
+      }
+      
+      // Release the buffer
+      dataIn = null;
+      buffer = null;
+    }
+  }    
+  
+  /**
+   * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
+   */
+  public static class InMemoryReader<K, V> extends Reader<K, V> {
+    RamManager ramManager;
+    
+    public InMemoryReader(RamManager ramManager, 
+                          byte[] data, int start, int length) {
+      this.ramManager = ramManager;
+      
+      buffer = data;
+      fileLength = bufferSize = (length - start);
+      dataIn.reset(buffer, start, length);
+    }
+    
+    public boolean next(DataInputBuffer key, DataInputBuffer value) 
+    throws IOException {
+      // Sanity check
+      if (eof) {
+        throw new EOFException("Completed reading " + bytesRead);
+      }
+      
+      // Read key and value lengths
+      int oldPos = dataIn.getPosition();
+      int keyLength = WritableUtils.readVInt(dataIn);
+      int valueLength = WritableUtils.readVInt(dataIn);
+      int pos = dataIn.getPosition();
+      bytesRead += pos - oldPos;
+      
+      // Check for EOF
+      if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
+        eof = true;
+        return false;
+      }
+      
+      final int recordLength = keyLength + valueLength;
+      
+      // Setup the key and value
+      pos = dataIn.getPosition();
+      byte[] data = dataIn.getData();
+      key.reset(data, pos, keyLength);
+      value.reset(data, (pos + keyLength), valueLength);
+      
+      // Position for the next record
+      long skipped = dataIn.skip(recordLength);
+      if (skipped != recordLength) {
+        throw new IOException("Failed to skip past record of length: " + 
+                              recordLength);
+      }
+      
+      // Record the byte
+      bytesRead += recordLength;
+
+      return true;
+    }
+      
+    public void close() {
+      // Release
+      dataIn = null;
+      buffer = null;
+      
+      // Inform the RamManager
+      ramManager.unreserve(bufferSize);
+    }
+  }
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Jun  4 21:06:13 2008
@@ -919,11 +919,11 @@
 
   private static void downloadProfile(TaskCompletionEvent e
                                       ) throws IOException  {
-    URLConnection connection = new URL(e.getTaskTrackerHttp() + 
-                                       "&plaintext=true&filter=profile"
-                                       ).openConnection();
+    URLConnection connection = 
+      new URL(getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) + 
+              "&filter=profile").openConnection();
     InputStream in = connection.getInputStream();
-    OutputStream out = new FileOutputStream(e.getTaskID() + ".profile");
+    OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
     IOUtils.copyBytes(in, out, 64 * 1024, true);
   }
 
@@ -1005,7 +1005,7 @@
               if (event.getTaskStatus() == 
                 TaskCompletionEvent.Status.SUCCEEDED){
                 LOG.info(event.toString());
-                displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
+                displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
               }
               break; 
             case FAILED:
@@ -1013,7 +1013,7 @@
                 TaskCompletionEvent.Status.FAILED){
                 LOG.info(event.toString());
                 // Displaying the task diagnostic information
-                TaskAttemptID taskId = event.getTaskID();
+                TaskAttemptID taskId = event.getTaskAttemptId();
                 String[] taskDiagnostics = 
                   jc.jobSubmitClient.getTaskDiagnostics(taskId); 
                 if (taskDiagnostics != null) {
@@ -1022,7 +1022,7 @@
                   }
                 }
                 // Displaying the task logs
-                displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
+                displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
               }
               break; 
             case KILLED:
@@ -1032,7 +1032,7 @@
               break; 
             case ALL:
               LOG.info(event.toString());
-              displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
+              displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
               break;
             }
           }
@@ -1061,15 +1061,22 @@
     return running;
   }
 
+  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
+    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+  }
+  
   private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
     throws IOException {
     // The tasktracker for a 'failed/killed' job might not be around...
     if (baseUrl != null) {
+      // Construct the url for the tasklogs
+      String taskLogUrl = getTaskLogURL(taskId, baseUrl);
+      
       // Copy tasks's stdout of the JobClient
-      getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
         
       // Copy task's stderr to stderr of the JobClient 
-      getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
     }
   }
     
@@ -1370,8 +1377,9 @@
     System.out.println("Number of events (from " + fromEventId + 
                        ") are: " + events.length);
     for(TaskCompletionEvent event: events) {
-      System.out.println(event.getTaskStatus() + " " + event.getTaskID() + 
-                         " " + event.getTaskTrackerHttp());
+      System.out.println(event.getTaskStatus() + " " + event.getTaskAttemptId() + " " + 
+                         getTaskLogURL(event.getTaskAttemptId(), 
+                                       event.getTaskTrackerHttp()));
     }
   }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Wed Jun  4 21:06:13 2008
@@ -461,21 +461,31 @@
    * 
    * @param style the {@link CompressionType} to control how the map outputs  
    *              are compressed.
+   * @deprecated {@link CompressionType} is no longer valid for intermediate
+   *             map-outputs. 
    */
+  @Deprecated
   public void setMapOutputCompressionType(CompressionType style) {
     setCompressMapOutput(true);
     set("mapred.map.output.compression.type", style.toString());
+    LOG.warn("SequenceFile compression is no longer valid for intermediate " +
+    		     "map-outputs!");
   }
   
   /**
    * Get the {@link CompressionType} for the map outputs.
    * 
    * @return the {@link CompressionType} for map outputs, defaulting to 
-   *         {@link CompressionType#RECORD}. 
+   *         {@link CompressionType#RECORD}.
+   * @deprecated {@link CompressionType} is no longer valid for intermediate
+   *             map-outputs. 
    */
+  @Deprecated
   public CompressionType getMapOutputCompressionType() {
     String val = get("mapred.map.output.compression.type", 
                      CompressionType.RECORD.toString());
+    LOG.warn("SequenceFile compression is no longer valid for intermediate " +
+    "map-outputs!");
     return CompressionType.valueOf(val);
   }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jun  4 21:06:13 2008
@@ -497,9 +497,8 @@
         } else {
           host = ttStatus.getHost();
         }
-        httpTaskLogLocation = "http://" + host + ":" + 
-          ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
-          status.getTaskID();
+        httpTaskLogLocation = "http://" + host + ":" + ttStatus.getHttpPort(); 
+           //+ "/tasklog?plaintext=true&taskid=" + status.getTaskID();
       }
 
       TaskCompletionEvent taskEvent = null;
@@ -530,7 +529,7 @@
         if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
           TaskCompletionEvent t = 
             this.taskCompletionEvents.get(eventNumber);
-          if (t.getTaskID().equals(status.getTaskID()))
+          if (t.getTaskAttemptId().equals(status.getTaskID()))
             t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
         }
         

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Jun  4 21:06:13 2008
@@ -166,7 +166,7 @@
             for (int i = 0; i < mapIds.size(); i++) {
               TaskAttemptID mapId = mapIds.get(i);
               Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-              Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId,
+              Path reduceIn = this.mapoutputFile.getInputFileForWrite(mapId.getTaskID(),reduceId,
                   localFs.getLength(mapOut));
               if (!localFs.mkdirs(reduceIn.getParent())) {
                 throw new IOException("Mkdirs failed to create "

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Wed Jun  4 21:06:13 2008
@@ -36,6 +36,7 @@
    * Constant denoting when a merge of in memory files will be triggered 
    */
   public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+  
   /**
    * Constant denoting the max size (in terms of the fraction of the total 
    * size of the filesys) of a map output file that we will try
@@ -56,6 +57,11 @@
   public static final String MAP_OUTPUT_LENGTH = "Map-Output-Length";
 
   /**
+   * The custom http header used for the "raw" map output length.
+   */
+  public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
+
+  /**
    * Temporary directory name 
    */
   public static final String TEMP_DIR_NAME = "_temporary";

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Wed Jun  4 21:06:13 2008
@@ -153,12 +153,14 @@
    * @param reduceTaskId a reduce task id
    * @param size the size of the file
    */
-  public Path getInputFileForWrite(int mapId, TaskAttemptID reduceTaskId, long size)
+  public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId, 
+                                   long size)
     throws IOException {
     // TODO *oom* should use a format here
     return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
                                           reduceTaskId + Path.SEPARATOR +
-                                          "output" + "/map_" + mapId + ".out",
+                                          ("output" + "/map_" + mapId.getId() + 
+                                           ".out"), 
                                           size, conf);
   }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Wed Jun  4 21:06:13 2008
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.net.URLConnection;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InMemoryFileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.mapred.ReduceTask.ReduceCopier.ShuffleClientMetrics;
-import org.apache.hadoop.util.Progressable;
-
-/** The location of a map output file, as passed to a reduce task via the
- * {@link InterTrackerProtocol}. */ 
-class MapOutputLocation implements Writable, MRConstants {
-
-  static {                                      // register a ctor
-    WritableFactories.setFactory
-      (MapOutputLocation.class,
-       new WritableFactory() {
-         public Writable newInstance() { return new MapOutputLocation(); }
-       });
-  }
-
-  private TaskAttemptID mapTaskId;
-  private int mapId;
-  private String host;
-  private int port;
-  
-  // basic/unit connection timeout (in milliseconds)
-  private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
-  // default read timeout (in milliseconds)
-  private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
-
-  /** RPC constructor **/
-  public MapOutputLocation() {
-  }
-
-  /** Construct a location. */
-  public MapOutputLocation(TaskAttemptID mapTaskId, int mapId, 
-                           String host, int port) {
-    this.mapTaskId = mapTaskId;
-    this.mapId = mapId;
-    this.host = host;
-    this.port = port;
-  }
-
-  /** The map task id. */
-  public TaskAttemptID getMapTaskID() { return mapTaskId; }
-  
-  /**
-   * Get the map's id number.
-   * @return The numeric id for this map
-   */
-  public int getMapId() {
-    return mapId;
-  }
-
-  /** The host the task completed on. */
-  public String getHost() { return host; }
-
-  /** The port listening for {@link MapOutputProtocol} connections. */
-  public int getPort() { return port; }
-
-  public void write(DataOutput out) throws IOException {
-    mapTaskId.write(out);
-    out.writeInt(mapId);
-    Text.writeString(out, host);
-    out.writeInt(port);
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    this.mapTaskId = TaskAttemptID.read(in);
-    this.mapId = in.readInt();
-    this.host = Text.readString(in);
-    this.port = in.readInt();
-  }
-
-  @Override
-  public String toString() {
-    return "http://" + host + ":" + port + "/mapOutput?job=" + mapTaskId.getJobID() +
-           "&map=" + mapTaskId;
-  }
-  
-  /** 
-   * The connection establishment is attempted multiple times and is given up 
-   * only on the last failure. Instead of connecting with a timeout of 
-   * X, we try connecting with a timeout of x < X but multiple times. 
-   */
-  private InputStream getInputStream(URLConnection connection, 
-                                     int connectionTimeout, 
-                                     int readTimeout) 
-  throws IOException {
-    int unit = 0;
-    if (connectionTimeout < 0) {
-      throw new IOException("Invalid timeout "
-                            + "[timeout = " + connectionTimeout + " ms]");
-    } else if (connectionTimeout > 0) {
-      unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
-             ? connectionTimeout
-             : UNIT_CONNECT_TIMEOUT;
-    }
-    // set the read timeout to the total timeout
-    connection.setReadTimeout(readTimeout);
-    // set the connect timeout to the unit-connect-timeout
-    connection.setConnectTimeout(unit);
-    while (true) {
-      try {
-        return connection.getInputStream();
-      } catch (IOException ioe) {
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-
-        // throw an exception if we have waited for timeout amount of time
-        // note that the updated value if timeout is used here
-        if (connectionTimeout == 0) {
-          throw ioe;
-        }
-
-        // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
-          // reset the connect time out for the final connect
-          connection.setConnectTimeout(unit);
-        }
-      }
-    }
-  }
-
-  /**
-   * Get the map output into a local file (either in the inmemory fs or on the 
-   * local fs) from the remote server.
-   * We use the file system so that we generate checksum files on the data.
-   * @param inMemFileSys the inmemory filesystem to write the file to
-   * @param localFileSys the local filesystem to write the file to
-   * @param shuffleMetrics the metrics context
-   * @param localFilename the filename to write the data into
-   * @param lDirAlloc the LocalDirAllocator object
-   * @param conf the Configuration object
-   * @param reduce the reduce id to get for
-   * @param timeout number of milliseconds for connection timeout
-   * @return the path of the file that got created
-   * @throws IOException when something goes wrong
-   */
-  public Path getFile(InMemoryFileSystem inMemFileSys,
-                      FileSystem localFileSys,
-                      ShuffleClientMetrics shuffleMetrics,
-                      Path localFilename, 
-                      LocalDirAllocator lDirAlloc,
-                      Configuration conf, int reduce,
-                      int timeout, Progressable progressable) 
-  throws IOException, InterruptedException {
-    return getFile(inMemFileSys, localFileSys, shuffleMetrics, localFilename, 
-                   lDirAlloc, conf, reduce, timeout, DEFAULT_READ_TIMEOUT, 
-                   progressable);
-  }
-
-  /**
-   * Get the map output into a local file (either in the inmemory fs or on the 
-   * local fs) from the remote server.
-   * We use the file system so that we generate checksum files on the data.
-   * @param inMemFileSys the inmemory filesystem to write the file to
-   * @param localFileSys the local filesystem to write the file to
-   * @param shuffleMetrics the metrics context
-   * @param localFilename the filename to write the data into
-   * @param lDirAlloc the LocalDirAllocator object
-   * @param conf the Configuration object
-   * @param reduce the reduce id to get for
-   * @param connectionTimeout number of milliseconds for connection timeout
-   * @param readTimeout number of milliseconds for read timeout
-   * @return the path of the file that got created
-   * @throws IOException when something goes wrong
-   */
-  public Path getFile(InMemoryFileSystem inMemFileSys,
-                      FileSystem localFileSys,
-                      ShuffleClientMetrics shuffleMetrics,
-                      Path localFilename, 
-                      LocalDirAllocator lDirAlloc,
-                      Configuration conf, int reduce,
-                      int connectionTimeout, int readTimeout, 
-                      Progressable progressable) 
-  throws IOException, InterruptedException {
-    boolean good = false;
-    long totalBytes = 0;
-    FileSystem fileSys = localFileSys;
-    Thread currentThread = Thread.currentThread();
-    URL path = new URL(toString() + "&reduce=" + reduce);
-    try {
-      URLConnection connection = path.openConnection();
-      InputStream input = getInputStream(connection, connectionTimeout, 
-                                         readTimeout); 
-      OutputStream output = null;
-      
-      //We will put a file in memory if it meets certain criteria:
-      //1. The size of the file should be less than 25% of the total inmem fs
-      //2. There is space available in the inmem fs
-      
-      long length = Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-      long inMemFSSize = inMemFileSys.getFSSize();
-      long checksumLength = (int)inMemFileSys.getChecksumFileLength(
-                                                  localFilename, length);
-      
-      boolean createInMem = false; 
-      if (inMemFSSize > 0)  
-        createInMem = (((float)(length + checksumLength) / inMemFSSize <= 
-                        MAX_INMEM_FILESIZE_FRACTION) && 
-                       inMemFileSys.reserveSpaceWithCheckSum(localFilename, length));
-      if (createInMem) {
-        fileSys = inMemFileSys;
-      }
-      else {
-        //now hit the localFS to find out a suitable location for the output
-        localFilename = lDirAlloc.getLocalPathForWrite(
-            localFilename.toUri().getPath(), length + checksumLength, conf);
-      }
-      
-      output = fileSys.create(localFilename);
-      try {  
-        try {
-          byte[] buffer = new byte[64 * 1024];
-          if (currentThread.isInterrupted()) {
-            throw new InterruptedException();
-          }
-          int len = input.read(buffer);
-          while (len > 0) {
-            totalBytes += len;
-            shuffleMetrics.inputBytes(len);
-            output.write(buffer, 0 , len);
-            if (currentThread.isInterrupted()) {
-              throw new InterruptedException();
-            }
-            // indicate we're making progress
-            progressable.progress();
-            len = input.read(buffer);
-          }
-        } finally {
-          output.close();
-        }
-      } finally {
-        input.close();
-      }
-      good = (totalBytes == length);
-      if (!good) {
-        throw new IOException("Incomplete map output received for " + path +
-                              " (" + totalBytes + " instead of " + length + ")"
-                              );
-      }
-    } finally {
-      if (!good) {
-        try {
-          fileSys.delete(localFilename, true);
-          totalBytes = 0;
-        } catch (Throwable th) {
-          // IGNORED because we are cleaning up
-        }
-      }
-    }
-    return fileSys.makeQualified(localFilename);
-  }
-
-}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jun  4 21:06:13 2008
@@ -41,22 +41,15 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Sorter;
-import org.apache.hadoop.io.SequenceFile.ValueBytes;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
-import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.QuickSort;
@@ -64,6 +57,11 @@
 
 /** A Map task. */
 class MapTask extends Task {
+  /**
+   * The size of each record in the index file for the map-outputs.
+   */
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+  
 
   private BytesWritable split = new BytesWritable();
   private String splitClass;
@@ -281,29 +279,23 @@
     
   }
 
-  class MapOutputBuffer implements MapOutputCollector, IndexedSortable {
+  class MapOutputBuffer<K extends Object, V extends Object> 
+  implements MapOutputCollector<K, V>, IndexedSortable {
     private final int partitions;
-    private final Partitioner partitioner;
+    private final Partitioner<K, V> partitioner;
     private final JobConf job;
     private final Reporter reporter;
-    private final Class keyClass;
-    private final Class valClass;
-    private final RawComparator comparator;
+    private final Class<K> keyClass;
+    private final Class<V> valClass;
+    private final RawComparator<K> comparator;
     private final SerializationFactory serializationFactory;
-    private final Serializer keySerializer;
-    private final Serializer valSerializer;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valSerializer;
     private final Class<? extends Reducer> combinerClass;
-    private final CombineOutputCollector combineCollector;
-    private final boolean compressMapOutput;
-    private final CompressionCodec codec;
-    private final CompressionType compressionType;
-
-    // used if compressMapOutput && compressionType == RECORD
-    // DataOutputBuffer req b/c compression codecs req continguous buffer
-    private final DataOutputBuffer rawBuffer;
-    private final CompressionOutputStream deflateFilter;
-    private final DataOutputStream deflateStream;
-    private final Compressor compressor;
+    private final CombineOutputCollector<K, V> combineCollector;
+    
+    // Compression for map-outputs
+    private CompressionCodec codec = null;
 
     // k/v accounting
     private volatile int kvstart = 0;  // marks beginning of spill
@@ -380,8 +372,8 @@
       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
       comparator = job.getOutputKeyComparator();
-      keyClass = job.getMapOutputKeyClass();
-      valClass = job.getMapOutputValueClass();
+      keyClass = (Class<K>)job.getMapOutputKeyClass();
+      valClass = (Class<V>)job.getMapOutputValueClass();
       serializationFactory = new SerializationFactory(job);
       keySerializer = serializationFactory.getSerializer(keyClass);
       keySerializer.open(bb);
@@ -393,45 +385,23 @@
       mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
       combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
-      // combiner and compression
-      compressMapOutput = job.getCompressMapOutput();
-      combinerClass = job.getCombinerClass();
-      minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
-      if (compressMapOutput) {
-        compressionType = job.getMapOutputCompressionType();
+      // compression
+      if (job.getCompressMapOutput()) {
         Class<? extends CompressionCodec> codecClass =
           job.getMapOutputCompressorClass(DefaultCodec.class);
         codec = (CompressionCodec)
           ReflectionUtils.newInstance(codecClass, job);
-        if (CompressionType.RECORD == compressionType
-            && null == combinerClass) {
-          compressor = codec.createCompressor();
-          rawBuffer = new DataOutputBuffer();
-          deflateFilter = codec.createOutputStream(rawBuffer, compressor);
-          deflateStream = new DataOutputStream(deflateFilter);
-          valSerializer.close();
-          valSerializer.open(deflateStream);
-        } else {
-          rawBuffer = null;
-          compressor = null;
-          deflateStream = null;
-          deflateFilter = null;
-        }
-      } else {
-        compressionType = CompressionType.NONE;
-        codec = null;
-        rawBuffer = null;
-        compressor = null;
-        deflateStream = null;
-        deflateFilter = null;
       }
+      // combiner
+      combinerClass = job.getCombinerClass();
       combineCollector = (null != combinerClass)
         ? new CombineOutputCollector(combineOutputCounter)
         : null;
+      minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
     }
 
     @SuppressWarnings("unchecked")
-    public synchronized void collect(Object key, Object value)
+    public synchronized void collect(K key, V value)
         throws IOException {
       reporter.progress();
       if (key.getClass() != keyClass) {
@@ -449,6 +419,7 @@
             ).initCause(sortSpillException);
       }
       try {
+          // serialize key bytes into buffer
         int keystart = bufindex;
         keySerializer.serialize(key);
         if (bufindex < keystart) {
@@ -456,27 +427,14 @@
           bb.reset();
           keystart = 0;
         }
+        // serialize value bytes into buffer
         int valstart = bufindex;
-        if (compressMapOutput && CompressionType.RECORD == compressionType
-            && null == combinerClass) {
-          // compress serialized value bytes
-          rawBuffer.reset();
-          deflateFilter.resetState();
-          valSerializer.serialize(value);
-          deflateStream.flush();
-          deflateFilter.finish();
-          bb.write(rawBuffer.getData(), 0, rawBuffer.getLength());
-          bb.markRecord();
-          mapOutputByteCounter.increment((valstart - keystart) +
-              compressor.getBytesRead());
-        } else {
-          // serialize value bytes into buffer
-          valSerializer.serialize(value);
-          int valend = bb.markRecord();
-          mapOutputByteCounter.increment(valend > keystart
-              ? valend - keystart
-              : (bufvoid - keystart) + valend);
-        }
+        valSerializer.serialize(value);
+        int valend = bb.markRecord();
+        mapOutputByteCounter.increment(valend > keystart
+                ? valend - keystart
+                        : (bufvoid - keystart) + valend);
+
         int partition = partitioner.getPartition(key, value, partitions);
         if (partition < 0 || partition >= partitions) {
           throw new IOException("Illegal partition for " + key + " (" +
@@ -766,31 +724,34 @@
         out = localFs.create(filename);
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                             getTaskID(), numSpills, partitions * 16);
+                             getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
         indexOut = localFs.create(indexFilename);
         final int endPosition = (kvend > kvstart)
           ? kvend
           : kvoffsets.length + kvend;
         sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
         int spindex = kvstart;
-        InMemValBytes vbytes = new InMemValBytes();
+        InMemValBytes value = new InMemValBytes();
         for (int i = 0; i < partitions; ++i) {
-          SequenceFile.Writer writer = null;
+          IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            writer = SequenceFile.createWriter(job, out,
-                keyClass, valClass, compressionType, codec);
+            writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
             if (null == combinerClass) {
               // spill directly
+              DataInputBuffer key = new DataInputBuffer();
+              long recordNo = 0;
               while (spindex < endPosition &&
                   kvindices[kvoffsets[spindex % kvoffsets.length]
                             + PARTITION] == i) {
                 final int kvoff = kvoffsets[spindex % kvoffsets.length];
-                getVBytesForOffset(kvoff, vbytes);
-                writer.appendRaw(kvbuffer, kvindices[kvoff + KEYSTART],
-                    kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART],
-                    vbytes);
+                getVBytesForOffset(kvoff, value);
+                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
+                          (kvindices[kvoff + VALSTART] - 
+                           kvindices[kvoff + KEYSTART]));
+                writer.append(key, value);
                 ++spindex;
+                ++recordNo;
               }
             } else {
               int spstart = spindex;
@@ -812,12 +773,13 @@
                 combineAndSpill(kvIter, combineInputCounter);
               }
             }
-            // we need to close the writer to flush buffered data, obtaining
-            // the correct offset
+
+            // close the writer
             writer.close();
+            
+            // write the index as <offset, raw-length, compressed-length> 
+            writeIndexRecord(indexOut, out, segmentStart, writer);
             writer = null;
-            indexOut.writeLong(segmentStart);
-            indexOut.writeLong(out.getPos() - segmentStart);
           } finally {
             if (null != writer) writer.close();
           }
@@ -836,7 +798,7 @@
      * directly to a spill file. Consider this "losing".
      */
     @SuppressWarnings("unchecked")
-    private void spillSingleRecord(final Object key, final Object value) 
+    private void spillSingleRecord(final K key, final V value) 
         throws IOException {
       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
@@ -849,15 +811,16 @@
         out = localFs.create(filename);
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                             getTaskID(), numSpills, partitions * 16);
+                             getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
         indexOut = localFs.create(indexFilename);
         // we don't run the combiner for a single record
         for (int i = 0; i < partitions; ++i) {
-          SequenceFile.Writer writer = null;
+          IFile.Writer writer = null;
           try {
             long segmentStart = out.getPos();
-            writer = SequenceFile.createWriter(job, out,
-                keyClass, valClass, compressionType, codec);
+            // Create a new codec, don't care!
+            writer = new IFile.Writer(job, out, keyClass, valClass, codec);
+
             if (i == partition) {
               final long recordStart = out.getPos();
               writer.append(key, value);
@@ -866,8 +829,9 @@
               mapOutputByteCounter.increment(out.getPos() - recordStart);
             }
             writer.close();
-            indexOut.writeLong(segmentStart);
-            indexOut.writeLong(out.getPos() - segmentStart);
+
+            // index record
+            writeIndexRecord(indexOut, out, segmentStart, writer);
           } catch (IOException e) {
             if (null != writer) writer.close();
             throw e;
@@ -891,7 +855,7 @@
       int vallen = (nextindex > kvindices[kvoff + VALSTART])
         ? nextindex - kvindices[kvoff + VALSTART]
         : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
-      vbytes.reset(kvindices[kvoff + VALSTART], vallen);
+      vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
     }
 
     @SuppressWarnings("unchecked")
@@ -917,36 +881,30 @@
     /**
      * Inner class wrapping valuebytes, used for appendRaw.
      */
-    protected class InMemValBytes implements ValueBytes {
+    protected class InMemValBytes extends DataInputBuffer {
+      private byte[] buffer;
       private int start;
-      private int len;
-      public void reset(int start, int len) {
+      private int length;
+            
+      public void reset(byte[] buffer, int start, int length) {
+        this.buffer = buffer;
         this.start = start;
-        this.len = len;
-      }
-      public int getSize() {
-        return len;
-      }
-      public void writeUncompressedBytes(DataOutputStream outStream)
-          throws IOException {
-        if (start + len > bufvoid) {
+        this.length = length;
+        
+        if (start + length > bufvoid) {
+          this.buffer = new byte[this.length];
           final int taillen = bufvoid - start;
-          outStream.write(kvbuffer, start, taillen);
-          outStream.write(kvbuffer, 0, len - taillen);
-          return;
+          System.arraycopy(buffer, start, this.buffer, 0, taillen);
+          System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
+          this.start = 0;
         }
-        outStream.write(kvbuffer, start, len);
-      }
-      public void writeCompressedBytes(DataOutputStream outStream)
-          throws IOException {
-        // If writing record-compressed data, kvbuffer vals rec-compressed
-        // and may be written directly. Note: not contiguous
-        writeUncompressedBytes(outStream);
+        
+        super.reset(this.buffer, this.start, this.length);
       }
     }
 
     protected class MRResultIterator implements RawKeyValueIterator {
-      private final DataOutputBuffer keybuf = new DataOutputBuffer();
+      private final DataInputBuffer keybuf = new DataInputBuffer();
       private final InMemValBytes vbytes = new InMemValBytes();
       private final int end;
       private int current;
@@ -957,14 +915,13 @@
       public boolean next() throws IOException {
         return ++current < end;
       }
-      public DataOutputBuffer getKey() throws IOException {
+      public DataInputBuffer getKey() throws IOException {
         final int kvoff = kvoffsets[current % kvoffsets.length];
-        keybuf.reset();
-        keybuf.write(kvbuffer, kvindices[kvoff + KEYSTART],
-            kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
+        keybuf.reset(kvbuffer, kvindices[kvoff + KEYSTART],
+                     kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
         return keybuf;
       }
-      public ValueBytes getValue() throws IOException {
+      public DataInputBuffer getValue() throws IOException {
         getVBytesForOffset(kvoffsets[current % kvoffsets.length], vbytes);
         return vbytes;
       }
@@ -999,7 +956,7 @@
       //lengths for each partition
       finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
       
-      finalIndexFileSize = partitions * 16;
+      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
       
       Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskID(), 
                              finalOutFileSize);
@@ -1009,6 +966,7 @@
       //The output stream for the final single output file
       FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
                                                    4096);
+      
       //The final index file output stream
       FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
                                                         4096);
@@ -1018,12 +976,11 @@
         //create dummy files
         for (int i = 0; i < partitions; i++) {
           segmentStart = finalOut.getPos();
-          Writer writer = SequenceFile.createWriter(job, finalOut, 
-                                                    job.getMapOutputKeyClass(), 
-                                                    job.getMapOutputValueClass(), 
-                                                    compressionType, codec);
+          Writer<K, V> writer = new Writer<K, V>(job, finalOut, 
+                                                 keyClass, valClass, null);
           finalIndexOut.writeLong(segmentStart);
           finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
+          finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
           writer.close();
         }
         finalOut.close();
@@ -1031,48 +988,55 @@
         return;
       }
       {
-        //create a sorter object as we need access to the SegmentDescriptor
-        //class and merge methods
-        Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(),
-                                   keyClass, valClass, job);
-        sorter.setProgressable(reporter);
-        
         for (int parts = 0; parts < partitions; parts++){
-          List<SegmentDescriptor> segmentList =
-            new ArrayList<SegmentDescriptor>(numSpills);
+          //create the segments to be merged
+          List<Segment<K, V>> segmentList =
+            new ArrayList<Segment<K, V>>(numSpills);
           for(int i = 0; i < numSpills; i++) {
             FSDataInputStream indexIn = localFs.open(indexFileName[i]);
-            indexIn.seek(parts * 16);
+            indexIn.seek(parts * MAP_OUTPUT_INDEX_RECORD_LENGTH);
             long segmentOffset = indexIn.readLong();
+            long rawSegmentLength = indexIn.readLong();
             long segmentLength = indexIn.readLong();
             indexIn.close();
-            SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
-                                                               segmentLength, filename[i]);
-            s.preserveInput(true);
-            s.doSync();
+            FSDataInputStream in = localFs.open(filename[i]);
+            in.seek(segmentOffset);
+            Segment<K, V> s = 
+              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
+                                true);
             segmentList.add(i, s);
+            
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Index: (" + indexFileName[i] + ", " + segmentOffset + 
+                        rawSegmentLength + ", " + segmentLength + ")");
+            }
           }
+          
+          //merge
+          @SuppressWarnings("unchecked")
+          RawKeyValueIterator kvIter = 
+            Merger.merge(job, localFs, 
+                         keyClass, valClass,
+                         segmentList, job.getInt("io.sort.factor", 100), 
+                         new Path(getTaskID().toString()), 
+                         job.getOutputKeyComparator(), reporter);
+
+          //write merged output to disk
           segmentStart = finalOut.getPos();
-          RawKeyValueIterator kvIter = sorter.merge(segmentList, new Path(getTaskID().toString())); 
-          SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
-                                                                 job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
-                                                                 compressionType, codec);
+          Writer<K, V> writer = 
+              new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
           if (null == combinerClass || numSpills < minSpillsForCombine) {
-            sorter.writeFile(kvIter, writer);
+            Merger.writeFile(kvIter, writer, reporter);
           } else {
             combineCollector.setWriter(writer);
             combineAndSpill(kvIter, combineInputCounter);
           }
-          //close the file - required esp. for block compression to ensure
-          //partition data don't span partition boundaries
+
+          //close
           writer.close();
-          //when we write the offset/length to the final index file, we write
-          //longs for both. This helps us to reliably seek directly to the
-          //offset/length for a partition when we start serving the byte-ranges
-          //to the reduces. We probably waste some space in the file by doing
-          //this as opposed to writing VLong but it helps us later on.
-          finalIndexOut.writeLong(segmentStart);
-          finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
+          
+          //write index record
+          writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
         }
         finalOut.close();
         finalIndexOut.close();
@@ -1084,8 +1048,26 @@
       }
     }
 
-  }
-
+    private void writeIndexRecord(FSDataOutputStream indexOut, 
+                                  FSDataOutputStream out, long start, 
+                                  Writer<K, V> writer) 
+    throws IOException {
+      //when we write the offset/decompressed-length/compressed-length to  
+      //the final index file, we write longs for both compressed and 
+      //decompressed lengths. This helps us to reliably seek directly to 
+      //the offset/length for a partition when we start serving the 
+      //byte-ranges to the reduces. We probably waste some space in the 
+      //file by doing this as opposed to writing VLong but it helps us later on.
+      // index record: <offset, raw-length, compressed-length> 
+      //StringBuffer sb = new StringBuffer();
+      indexOut.writeLong(start);
+      indexOut.writeLong(writer.getRawLength());
+      long segmentLength = out.getPos() - start;
+      indexOut.writeLong(segmentLength);
+    }
+    
+  } // MapOutputBuffer
+  
   /**
    * Exception indicating that the allocated sort buffer is insufficient
    * to hold the current record.



Mime
View raw message